Skip to content

Commit e174d32

Browse files
authored
[ENH] Wire up proto defs for sysdb fork endpoint (#4299)
## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - N/A - New functionality - Defines the SysDB fork service request and response - Wires up the request upto `table_catalog.go` ## Test plan *How are these changes tested?* - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes *Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?*
1 parent 59687d7 commit e174d32

File tree

5 files changed

+104
-40
lines changed

5 files changed

+104
-40
lines changed

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ func (s *Coordinator) UpdateCollection(ctx context.Context, collection *model.Up
157157
return s.catalog.UpdateCollection(ctx, collection, collection.Ts)
158158
}
159159

160+
func (s *Coordinator) ForkCollection(ctx context.Context, forkCollection *model.ForkCollection) (*model.Collection, []*model.Segment, error) {
161+
return s.catalog.ForkCollection(ctx, forkCollection)
162+
}
163+
160164
func (s *Coordinator) CreateSegment(ctx context.Context, segment *model.CreateSegment) error {
161165
if err := verifyCreateSegment(segment); err != nil {
162166
return err

go/pkg/sysdb/coordinator/model/collection.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ type UpdateCollection struct {
6060
Ts types.Timestamp
6161
}
6262

63+
type ForkCollection struct {
64+
SourceCollectionID types.UniqueID
65+
SourceCollectionLogCompactionOffset uint64
66+
SourceCollectionLogEnumerationOffset uint64
67+
TargetCollectionID types.UniqueID
68+
TargetCollectionName string
69+
}
70+
6371
type FlushCollectionCompaction struct {
6472
ID types.UniqueID
6573
TenantID string

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,25 @@ func (tc *Catalog) UpdateCollection(ctx context.Context, updateCollection *model
813813
return result, nil
814814
}
815815

816+
func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.ForkCollection) (*model.Collection, []*model.Segment, error) {
817+
log.Info("Forking collection", zap.String("sourceCollectionId", forkCollection.SourceCollectionID.String()), zap.String("targetCollectionName", forkCollection.TargetCollectionName))
818+
819+
var source_collection *model.Collection
820+
var source_segments []*model.Segment
821+
822+
err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
823+
var err error
824+
source_collection, source_segments, err = tc.GetCollectionWithSegments(ctx, forkCollection.SourceCollectionID)
825+
return err
826+
})
827+
if err != nil {
828+
return nil, nil, err
829+
}
830+
831+
// TODO: Implement forking logic
832+
return source_collection, source_segments, nil
833+
}
834+
816835
func (tc *Catalog) CreateSegment(ctx context.Context, createSegment *model.CreateSegment, ts types.Timestamp) (*model.Segment, error) {
817836
var result *model.Segment
818837

go/pkg/sysdb/grpc/collection_service.go

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package grpc
33
import (
44
"context"
55
"encoding/json"
6-
"fmt"
76

87
"github.com/chroma-core/chroma/go/pkg/grpcutils"
98

@@ -250,31 +249,15 @@ func (s *Server) GetCollectionWithSegments(ctx context.Context, req *coordinator
250249
}
251250
return res, grpcutils.BuildInternalGrpcError(err.Error())
252251
}
253-
254252
res.Collection = convertCollectionToProto(collection)
253+
255254
segmentpbList := make([]*coordinatorpb.Segment, 0, len(segments))
256-
scopeToSegmentMap := map[coordinatorpb.SegmentScope]*coordinatorpb.Segment{}
257255
for _, segment := range segments {
258256
segmentpb := convertSegmentToProto(segment)
259-
scopeToSegmentMap[segmentpb.GetScope()] = segmentpb
260257
segmentpbList = append(segmentpbList, segmentpb)
261258
}
262-
263-
if len(segmentpbList) != 3 {
264-
log.Error("GetCollectionWithSegments failed. Unexpected number of collection segments", zap.String("collection_id", collectionID))
265-
return res, grpcutils.BuildInternalGrpcError(fmt.Sprintf("Unexpected number of segments for collection %s: %d", collectionID, len(segmentpbList)))
266-
}
267-
268-
scopes := []coordinatorpb.SegmentScope{coordinatorpb.SegmentScope_METADATA, coordinatorpb.SegmentScope_RECORD, coordinatorpb.SegmentScope_VECTOR}
269-
270-
for _, scope := range scopes {
271-
if _, exists := scopeToSegmentMap[scope]; !exists {
272-
log.Error("GetCollectionWithSegments failed. Collection segment scope not found", zap.String("collection_id", collectionID), zap.String("missing_scope", scope.String()))
273-
return res, grpcutils.BuildInternalGrpcError(fmt.Sprintf("Missing segment scope for collection %s: %s", collectionID, scope.String()))
274-
}
275-
}
276-
277259
res.Segments = segmentpbList
260+
278261
return res, nil
279262
}
280263

@@ -360,6 +343,53 @@ func (s *Server) UpdateCollection(ctx context.Context, req *coordinatorpb.Update
360343
return res, nil
361344
}
362345

346+
func (s *Server) ForkCollection(ctx context.Context, req *coordinatorpb.ForkCollectionRequest) (*coordinatorpb.ForkCollectionResponse, error) {
347+
res := &coordinatorpb.ForkCollectionResponse{}
348+
349+
sourceCollectionID := req.SourceCollectionId
350+
parsedSourceCollectionID, err := types.ToUniqueID(&sourceCollectionID)
351+
if err != nil {
352+
log.Error("ForkCollection failed. Failed to parse source collection id", zap.Error(err), zap.String("collection_id", sourceCollectionID))
353+
return res, grpcutils.BuildInternalGrpcError(err.Error())
354+
}
355+
356+
targetCollectionID := req.TargetCollectionId
357+
parsedTargetCollectionID, err := types.ToUniqueID(&targetCollectionID)
358+
if err != nil {
359+
log.Error("ForkCollection failed. Failed to parse target collection id", zap.Error(err), zap.String("collection_id", targetCollectionID))
360+
return res, grpcutils.BuildInternalGrpcError(err.Error())
361+
}
362+
363+
forkCollection := &model.ForkCollection{
364+
SourceCollectionID: parsedSourceCollectionID,
365+
SourceCollectionLogCompactionOffset: req.SourceCollectionLogCompactionOffset,
366+
SourceCollectionLogEnumerationOffset: req.SourceCollectionLogEnumerationOffset,
367+
TargetCollectionID: parsedTargetCollectionID,
368+
TargetCollectionName: req.TargetCollectionName,
369+
}
370+
collection, segments, err := s.coordinator.ForkCollection(ctx, forkCollection)
371+
if err != nil {
372+
log.Error("ForkCollection failed. ", zap.Error(err), zap.String("collection_id", sourceCollectionID))
373+
if err == common.ErrCollectionNotFound {
374+
return res, grpcutils.BuildNotFoundGrpcError(err.Error())
375+
}
376+
if err == common.ErrCollectionUniqueConstraintViolation {
377+
return res, grpcutils.BuildAlreadyExistsGrpcError(err.Error())
378+
}
379+
return res, grpcutils.BuildInternalGrpcError(err.Error())
380+
}
381+
res.Collection = convertCollectionToProto(collection)
382+
383+
segmentpbList := make([]*coordinatorpb.Segment, 0, len(segments))
384+
for _, segment := range segments {
385+
segmentpb := convertSegmentToProto(segment)
386+
segmentpbList = append(segmentpbList, segmentpb)
387+
}
388+
res.Segments = segmentpbList
389+
390+
return res, nil
391+
}
392+
363393
func (s *Server) ListCollectionVersions(ctx context.Context, req *coordinatorpb.ListCollectionVersionsRequest) (*coordinatorpb.ListCollectionVersionsResponse, error) {
364394
collectionID, err := types.ToUniqueID(&req.CollectionId)
365395
if err != nil {

idl/chromadb/proto/coordinator.proto

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,19 @@ message UpdateCollectionResponse {
216216
reserved "status";
217217
}
218218

219+
message ForkCollectionRequest {
220+
string source_collection_id = 1;
221+
uint64 source_collection_log_compaction_offset = 2;
222+
uint64 source_collection_log_enumeration_offset = 3;
223+
string target_collection_id = 4;
224+
string target_collection_name = 5;
225+
}
226+
227+
message ForkCollectionResponse {
228+
Collection collection = 1;
229+
repeated Segment segments = 2;
230+
}
231+
219232
message ResetStateResponse {
220233
reserved 1;
221234
reserved "status";
@@ -333,34 +346,13 @@ message CollectionSegmentInfo {
333346
// GC's {collection,version} selection policy.
334347
}
335348

336-
// Tuple of collection ID, tenant ID, and version.
337-
message CollectionVersionTuple {
338-
string collection_id = 1;
339-
string tenant_id = 2;
340-
int64 version = 3;
341-
}
342-
343349
message VersionListForCollection {
344350
string tenant_id = 1;
345351
string database_id = 2;
346352
string collection_id = 3;
347353
repeated int64 versions = 4;
348354
}
349355

350-
// Contains information about the lineage of a collection.
351-
message CollectionLineageInfo {
352-
// ID of the collection.
353-
string collection_id = 1;
354-
// ID of the tenant.
355-
string tenant_id = 2;
356-
// Whether the collection is a root collection.
357-
bool is_root_collection = 3;
358-
// An ordered list of descendant collections.
359-
// The first element is the root {collection, version}, and the last
360-
// element is the direct parent of the current collection.
361-
repeated CollectionVersionTuple parent_collections = 4;
362-
}
363-
364356
// Request to list versions of a collection.
365357
message ListCollectionVersionsRequest {
366358
string collection_id = 1;
@@ -382,6 +374,16 @@ message ListCollectionVersionsResponse {
382374
bool list_is_truncated = 2;
383375
}
384376

377+
message CollectionLineageFile {
378+
repeated CollectionVersionDependency dependencies = 1;
379+
}
380+
381+
message CollectionVersionDependency {
382+
string source_collection_id = 1; // The forked collection
383+
uint64 source_collection_version = 2; // The forked collection version
384+
string target_collection_id = 3; // The forking collection
385+
}
386+
385387
// Request to restore a collection.
386388
message RestoreCollectionRequest {
387389
string collection_id = 1;
@@ -473,6 +475,7 @@ service SysDB {
473475
rpc GetCollectionWithSegments(GetCollectionWithSegmentsRequest) returns (GetCollectionWithSegmentsResponse) {}
474476
rpc CheckCollections(CheckCollectionsRequest) returns (CheckCollectionsResponse) {}
475477
rpc UpdateCollection(UpdateCollectionRequest) returns (UpdateCollectionResponse) {}
478+
rpc ForkCollection(ForkCollectionRequest) returns (ForkCollectionResponse) {}
476479
rpc ResetState(google.protobuf.Empty) returns (ResetStateResponse) {}
477480
rpc GetLastCompactionTimeForTenant(GetLastCompactionTimeForTenantRequest) returns (GetLastCompactionTimeForTenantResponse) {}
478481
rpc SetLastCompactionTimeForTenant(SetLastCompactionTimeForTenantRequest) returns (google.protobuf.Empty) {}

0 commit comments

Comments
 (0)