Skip to content

Commit 1134abf

Browse files
[ENH] Wire up proto defs for sysdb fork endpoint (chroma-core#4299)
1 parent ecfa0fb commit 1134abf

File tree

8 files changed

+210
-202
lines changed

8 files changed

+210
-202
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
diff a/go/pkg/sysdb/coordinator/coordinator.go b/go/pkg/sysdb/coordinator/coordinator.go (rejected hunks)
2+
@@ -157,6 +157,10 @@ func (s *Coordinator) UpdateCollection(ctx context.Context, collection *model.Up
3+
return s.catalog.UpdateCollection(ctx, collection, collection.Ts)
4+
}
5+
6+
+func (s *Coordinator) ForkCollection(ctx context.Context, forkCollection *model.ForkCollection) (*model.Collection, []*model.Segment, error) {
7+
+ return s.catalog.ForkCollection(ctx, forkCollection)
8+
+}
9+
+
10+
func (s *Coordinator) CreateSegment(ctx context.Context, segment *model.CreateSegment) error {
11+
if err := verifyCreateSegment(segment); err != nil {
12+
return err

go/pkg/sysdb/coordinator/model/collection.go

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ type Collection struct {
1515
Ts types.Timestamp
1616
LogPosition int64
1717
Version int32
18-
RootCollectionID *types.UniqueID
19-
LineageFileName string
2018
UpdatedAt types.Timestamp
2119
TotalRecordsPostCompaction uint64
2220
SizeBytesPostCompaction uint64 // Note: This represents the size of the records off the log
@@ -32,20 +30,15 @@ type CollectionToGc struct {
3230
}
3331

3432
type CreateCollection struct {
35-
ID types.UniqueID
36-
Name string
37-
ConfigurationJsonStr string
38-
Dimension *int32
39-
Metadata *CollectionMetadata[CollectionMetadataValueType]
40-
GetOrCreate bool
41-
TenantID string
42-
DatabaseName string
43-
Ts types.Timestamp
44-
LogPosition int64
45-
RootCollectionId string
46-
TotalRecordsPostCompaction uint64
47-
SizeBytesPostCompaction uint64 // Note: This represents the size of the records off the log
48-
LastCompactionTimeSecs uint64
33+
ID types.UniqueID
34+
Name string
35+
ConfigurationJsonStr string
36+
Dimension *int32
37+
Metadata *CollectionMetadata[CollectionMetadataValueType]
38+
GetOrCreate bool
39+
TenantID string
40+
DatabaseName string
41+
Ts types.Timestamp
4942
}
5043

5144
type DeleteCollection struct {
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
diff a/go/pkg/sysdb/coordinator/model/collection.go b/go/pkg/sysdb/coordinator/model/collection.go (rejected hunks)
2+
@@ -60,6 +60,14 @@ type UpdateCollection struct {
3+
Ts types.Timestamp
4+
}
5+
6+
+type ForkCollection struct {
7+
+ SourceCollectionID types.UniqueID
8+
+ SourceCollectionLogCompactionOffset uint64
9+
+ SourceCollectionLogEnumerationOffset uint64
10+
+ TargetCollectionID types.UniqueID
11+
+ TargetCollectionName string
12+
+}
13+
+
14+
type FlushCollectionCompaction struct {
15+
ID types.UniqueID
16+
TenantID string

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 22 additions & 183 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8-
"slices"
98
"time"
109

1110
"github.com/chroma-core/chroma/go/pkg/common"
@@ -292,19 +291,15 @@ func (tc *Catalog) createCollectionImpl(txCtx context.Context, createCollection
292291
}
293292

294293
dbCollection := &dbmodel.Collection{
295-
ID: createCollection.ID.String(),
296-
Name: &createCollection.Name,
297-
ConfigurationJsonStr: &createCollection.ConfigurationJsonStr,
298-
Dimension: createCollection.Dimension,
299-
DatabaseID: databases[0].ID,
300-
VersionFileName: versionFileName,
301-
Tenant: createCollection.TenantID,
302-
Ts: ts,
303-
LogPosition: createCollection.LogPosition,
304-
RootCollectionId: createCollection.RootCollectionId,
305-
TotalRecordsPostCompaction: createCollection.TotalRecordsPostCompaction,
306-
SizeBytesPostCompaction: createCollection.SizeBytesPostCompaction,
307-
LastCompactionTimeSecs: createCollection.LastCompactionTimeSecs,
294+
ID: createCollection.ID.String(),
295+
Name: &createCollection.Name,
296+
ConfigurationJsonStr: &createCollection.ConfigurationJsonStr,
297+
Dimension: createCollection.Dimension,
298+
DatabaseID: databases[0].ID,
299+
Ts: ts,
300+
LogPosition: 0,
301+
VersionFileName: versionFileName,
302+
Tenant: createCollection.TenantID,
308303
}
309304

310305
err = tc.metaDomain.CollectionDb(txCtx).Insert(dbCollection)
@@ -471,7 +466,7 @@ func (tc *Catalog) GetCollectionWithSegments(ctx context.Context, collectionID t
471466
var segments []*model.Segment
472467

473468
err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
474-
collections, e := tc.GetCollections(txCtx, collectionID, nil, "", "", nil, nil)
469+
collections, e := tc.GetCollections(ctx, collectionID, nil, "", "", nil, nil)
475470
if e != nil {
476471
return e
477472
}
@@ -483,7 +478,7 @@ func (tc *Catalog) GetCollectionWithSegments(ctx context.Context, collectionID t
483478
}
484479
collection = collections[0]
485480

486-
segments, e = tc.GetSegments(txCtx, types.NilUniqueID(), nil, nil, collectionID)
481+
segments, e = tc.GetSegments(ctx, types.NilUniqueID(), nil, nil, collectionID)
487482
if e != nil {
488483
return e
489484
}
@@ -818,179 +813,23 @@ func (tc *Catalog) UpdateCollection(ctx context.Context, updateCollection *model
818813
return result, nil
819814
}
820815

821-
func (tc *Catalog) getLineageFile(ctx context.Context, collection *model.Collection) (*coordinatorpb.CollectionLineageFile, error) {
822-
if len(collection.LineageFileName) == 0 {
823-
// There is no lineage file for the given collection
824-
return &coordinatorpb.CollectionLineageFile{
825-
Dependencies: []*coordinatorpb.CollectionVersionDependency{},
826-
}, nil
827-
}
828-
829-
return tc.s3Store.GetLineageFile(collection.LineageFileName)
830-
}
831-
832816
func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.ForkCollection) (*model.Collection, []*model.Segment, error) {
833817
log.Info("Forking collection", zap.String("sourceCollectionId", forkCollection.SourceCollectionID.String()), zap.String("targetCollectionName", forkCollection.TargetCollectionName))
834818

819+
var source_collection *model.Collection
820+
var source_segments []*model.Segment
821+
835822
err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
836823
var err error
837-
var rootCollection *model.Collection
838-
var rootCollectionID types.UniqueID
839-
var rootCollectionIDStr string
840-
var sourceCollection *model.Collection
841-
var sourceSegments []*model.Segment
842-
var newLineageFileFullName string
843-
844-
ts := time.Now().UTC()
845-
846-
sourceCollectionIDStr := forkCollection.SourceCollectionID.String()
847-
848-
// NOTE: We need to retrieve the source collection to get root collection id, then acquire locks on source and root collections in order to avoid deadlock.
849-
// This step is because root collection id is always populated when the collection is created and is never modified.
850-
sourceCollectionDb, err := tc.metaDomain.CollectionDb(txCtx).GetCollectionEntry(&sourceCollectionIDStr, nil)
851-
if err != nil {
852-
return err
853-
}
854-
855-
if len(sourceCollectionDb.RootCollectionId) > 0 {
856-
rootCollectionID, err = types.Parse(sourceCollectionDb.RootCollectionId)
857-
if err != nil {
858-
return err
859-
}
860-
} else {
861-
rootCollectionID = forkCollection.SourceCollectionID
862-
}
863-
rootCollectionIDStr = rootCollectionID.String()
864-
865-
// Lock source and root collections in order
866-
collectionsToLock := []string{sourceCollectionIDStr}
867-
if rootCollectionID != forkCollection.SourceCollectionID {
868-
collectionsToLock = append(collectionsToLock, rootCollectionIDStr)
869-
slices.Sort(collectionsToLock)
870-
}
871-
for _, collectionID := range collectionsToLock {
872-
err = tc.metaDomain.CollectionDb(txCtx).LockCollection(collectionID)
873-
if err != nil {
874-
return err
875-
}
876-
}
877-
878-
// Get source and root collections after they are locked
879-
sourceCollection, sourceSegments, err = tc.GetCollectionWithSegments(txCtx, forkCollection.SourceCollectionID)
880-
if err != nil {
881-
return err
882-
}
883-
if rootCollectionID != forkCollection.SourceCollectionID {
884-
limit := int32(1)
885-
collections, err := tc.GetCollections(txCtx, rootCollectionID, nil, "", "", &limit, nil)
886-
if err != nil {
887-
return err
888-
}
889-
if len(collections) == 0 {
890-
return common.ErrCollectionNotFound
891-
}
892-
rootCollection = collections[0]
893-
} else {
894-
rootCollection = sourceCollection
895-
}
896-
databases, err := tc.metaDomain.DatabaseDb(txCtx).GetDatabases(sourceCollection.TenantID, sourceCollection.DatabaseName)
897-
if err != nil {
898-
return err
899-
}
900-
if len(databases) == 0 {
901-
return common.ErrDatabaseNotFound
902-
}
903-
904-
databaseID := databases[0].ID
905-
906-
// Verify that the source collection log position is between the compaction offset (inclusive) and enumeration offset (inclusive)
907-
// This check is necessary for next compaction to fetch the right logs
908-
// This scenario could occur during fork because we will reach out to log service first to fork logs. For exampls:
909-
// t0: Fork source collection in log with offset [200, 300] (i.e. compaction offset 200, enumeration offset 300)
910-
// t1: User writes to source collection, compaction takes place, source collection log offset become [400, 500]
911-
// t2: Fork source collection in sysdb, the latest source collection compaction offset is 400. If we add new logs, it will start after offset 300, and the data is lost after compaction.
912-
latestSourceCompactionOffset := uint64(sourceCollection.LogPosition)
913-
if forkCollection.SourceCollectionLogEnumerationOffset < latestSourceCompactionOffset || latestSourceCompactionOffset < forkCollection.SourceCollectionLogCompactionOffset {
914-
return common.ErrCollectionLogPositionStale
915-
}
916-
917-
// Create the new collection with source collection information
918-
createCollection := &model.CreateCollection{
919-
ID: forkCollection.TargetCollectionID,
920-
Name: forkCollection.TargetCollectionName,
921-
ConfigurationJsonStr: sourceCollection.ConfigurationJsonStr,
922-
Dimension: sourceCollection.Dimension,
923-
Metadata: sourceCollection.Metadata,
924-
GetOrCreate: false,
925-
TenantID: sourceCollection.TenantID,
926-
DatabaseName: sourceCollection.DatabaseName,
927-
Ts: ts.Unix(),
928-
LogPosition: sourceCollection.LogPosition,
929-
RootCollectionId: rootCollectionIDStr,
930-
TotalRecordsPostCompaction: sourceCollection.TotalRecordsPostCompaction,
931-
SizeBytesPostCompaction: sourceCollection.SizeBytesPostCompaction,
932-
LastCompactionTimeSecs: sourceCollection.LastCompactionTimeSecs,
933-
}
934-
935-
createSegments := []*model.CreateSegment{}
936-
flushFilePaths := []*model.FlushSegmentCompaction{}
937-
for _, segment := range sourceSegments {
938-
newSegmentID := types.NewUniqueID()
939-
createSegment := &model.CreateSegment{
940-
ID: newSegmentID,
941-
Type: segment.Type,
942-
Scope: segment.Scope,
943-
CollectionID: forkCollection.TargetCollectionID,
944-
Metadata: segment.Metadata,
945-
Ts: ts.Unix(),
946-
}
947-
createSegments = append(createSegments, createSegment)
948-
flushFilePath := &model.FlushSegmentCompaction{
949-
ID: newSegmentID,
950-
FilePaths: segment.FilePaths,
951-
}
952-
flushFilePaths = append(flushFilePaths, flushFilePath)
953-
}
954-
955-
_, _, err = tc.CreateCollectionAndSegments(txCtx, createCollection, createSegments, ts.Unix())
956-
if err != nil {
957-
return err
958-
}
959-
960-
err = tc.metaDomain.SegmentDb(txCtx).RegisterFilePaths(flushFilePaths)
961-
if err != nil {
962-
return err
963-
}
964-
965-
// Update the lineage file
966-
lineageFile, err := tc.getLineageFile(txCtx, rootCollection)
967-
if err != nil {
968-
return err
969-
}
970-
// NOTE: This is a temporary hardcoded limit for the size of the lineage file
971-
// TODO: Load the limit value from quota / scorecard, and/or improve the lineage file design to avoid large lineage file
972-
if len(lineageFile.Dependencies) > 1000000 {
973-
return common.ErrCollectionTooManyFork
974-
}
975-
lineageFile.Dependencies = append(lineageFile.Dependencies, &coordinatorpb.CollectionVersionDependency{
976-
SourceCollectionId: sourceCollectionIDStr,
977-
SourceCollectionVersion: uint64(sourceCollection.Version),
978-
TargetCollectionId: forkCollection.TargetCollectionID.String(),
979-
})
980-
981-
newLineageFileBaseName := fmt.Sprintf("%s/%d/%s.binpb", sourceCollectionIDStr, sourceCollection.Version, forkCollection.TargetCollectionID)
982-
newLineageFileFullName, err = tc.s3Store.PutLineageFile(rootCollection.TenantID, databaseID, rootCollectionIDStr, newLineageFileBaseName, lineageFile)
983-
if err != nil {
984-
return err
985-
}
986-
987-
return tc.metaDomain.CollectionDb(txCtx).UpdateCollectionLineageFilePath(rootCollectionIDStr, rootCollection.LineageFileName, newLineageFileFullName)
824+
source_collection, source_segments, err = tc.GetCollectionWithSegments(ctx, forkCollection.SourceCollectionID)
825+
return err
988826
})
989827
if err != nil {
990828
return nil, nil, err
991829
}
992830

993-
return tc.GetCollectionWithSegments(ctx, forkCollection.TargetCollectionID)
831+
// TODO: Implement forking logic
832+
return source_collection, source_segments, nil
994833
}
995834

996835
func (tc *Catalog) CreateSegment(ctx context.Context, createSegment *model.CreateSegment, ts types.Timestamp) (*model.Segment, error) {
@@ -1309,7 +1148,7 @@ func (tc *Catalog) ListCollectionVersions(ctx context.Context,
13091148
zap.Int64("version", int64(collectionEntry.Version)),
13101149
zap.String("version_file_name", collectionEntry.VersionFileName))
13111150

1312-
versionFile, err := tc.s3Store.GetVersionFile(collectionEntry.VersionFileName)
1151+
versionFile, err := tc.s3Store.GetVersionFile(tenantID, collectionID.String(), int64(collectionEntry.Version), collectionEntry.VersionFileName)
13131152
if err != nil {
13141153
log.Error("error getting version file", zap.Error(err))
13151154
return nil, err
@@ -1550,7 +1389,7 @@ func (tc *Catalog) FlushCollectionCompactionForVersionedCollection(ctx context.C
15501389
}
15511390
} else {
15521391
// Read the VersionFile from S3MetaStore.
1553-
existingVersionFilePb, err = tc.s3Store.GetVersionFile(existingVersionFileName)
1392+
existingVersionFilePb, err = tc.s3Store.GetVersionFile(flushCollectionCompaction.TenantID, flushCollectionCompaction.ID.String(), existingVersion, existingVersionFileName)
15541393
if err != nil {
15551394
return nil, err
15561395
}
@@ -1745,7 +1584,7 @@ func (tc *Catalog) markVersionForDeletionInSingleCollection(
17451584
// TODO(rohit): log error if collection in file is different from the one in request.
17461585

17471586
existingVersionFileName := collectionEntry.VersionFileName
1748-
versionFilePb, err := tc.s3Store.GetVersionFile(existingVersionFileName)
1587+
versionFilePb, err := tc.s3Store.GetVersionFile(tenantID, collectionID, int64(collectionEntry.Version), existingVersionFileName)
17491588
if err != nil {
17501589
return err
17511590
}
@@ -1869,7 +1708,7 @@ func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenant
18691708
}
18701709

18711710
existingVersionFileName := collectionEntry.VersionFileName
1872-
versionFilePb, err := tc.s3Store.GetVersionFile(existingVersionFileName)
1711+
versionFilePb, err := tc.s3Store.GetVersionFile(tenantID, collectionID, int64(collectionEntry.Version), existingVersionFileName)
18731712
if err != nil {
18741713
return err
18751714
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
diff a/go/pkg/sysdb/coordinator/table_catalog.go b/go/pkg/sysdb/coordinator/table_catalog.go (rejected hunks)
2+
@@ -813,6 +813,25 @@ func (tc *Catalog) UpdateCollection(ctx context.Context, updateCollection *model
3+
return result, nil
4+
}
5+
6+
+func (tc *Catalog) ForkCollection(ctx context.Context, forkCollection *model.ForkCollection) (*model.Collection, []*model.Segment, error) {
7+
+ log.Info("Forking collection", zap.String("sourceCollectionId", forkCollection.SourceCollectionID.String()), zap.String("targetCollectionName", forkCollection.TargetCollectionName))
8+
+
9+
+ var source_collection *model.Collection
10+
+ var source_segments []*model.Segment
11+
+
12+
+ err := tc.txImpl.Transaction(ctx, func(txCtx context.Context) error {
13+
+ var err error
14+
+ source_collection, source_segments, err = tc.GetCollectionWithSegments(ctx, forkCollection.SourceCollectionID)
15+
+ return err
16+
+ })
17+
+ if err != nil {
18+
+ return nil, nil, err
19+
+ }
20+
+
21+
+ // TODO: Implement forking logic
22+
+ return source_collection, source_segments, nil
23+
+}
24+
+
25+
func (tc *Catalog) CreateSegment(ctx context.Context, createSegment *model.CreateSegment, ts types.Timestamp) (*model.Segment, error) {
26+
var result *model.Segment
27+

go/pkg/sysdb/grpc/collection_service.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,9 +373,6 @@ func (s *Server) ForkCollection(ctx context.Context, req *coordinatorpb.ForkColl
373373
if err == common.ErrCollectionNotFound {
374374
return res, grpcutils.BuildNotFoundGrpcError(err.Error())
375375
}
376-
if err == common.ErrCollectionEntryIsStale {
377-
return res, grpcutils.BuildFailedPreconditionGrpcError(err.Error())
378-
}
379376
if err == common.ErrCollectionUniqueConstraintViolation {
380377
return res, grpcutils.BuildAlreadyExistsGrpcError(err.Error())
381378
}

0 commit comments

Comments
 (0)