Skip to content

Commit 8e91522

Browse files
committed
[ENH]: ListCollectionsToGc should return lineage file path, group by fork tree
1 parent 8170f91 commit 8e91522

File tree

14 files changed

+230
-57
lines changed

14 files changed

+230
-57
lines changed

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,8 @@ func (s *Coordinator) FlushCollectionCompaction(ctx context.Context, flushCollec
247247
return s.catalog.FlushCollectionCompaction(ctx, flushCollectionCompaction)
248248
}
249249

250-
func (s *Coordinator) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64) ([]*model.CollectionToGc, error) {
251-
return s.catalog.ListCollectionsToGc(ctx, cutoffTimeSecs, limit)
250+
func (s *Coordinator) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64, tenantID *string) ([]*model.CollectionToGc, error) {
251+
return s.catalog.ListCollectionsToGc(ctx, cutoffTimeSecs, limit, tenantID)
252252
}
253253

254254
func (s *Coordinator) ListCollectionVersions(ctx context.Context, collectionID types.UniqueID, tenantID string, maxCount *int64, versionsBefore *int64, versionsAtOrAfter *int64, includeMarkedForDeletion bool) ([]*coordinatorpb.CollectionVersionInfo, error) {

go/pkg/sysdb/coordinator/coordinator_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (suite *APIsTestSuite) SetupTest() {
7979
collection.Name = "collection_" + suite.T().Name() + strconv.Itoa(index)
8080
}
8181
ctx := context.Background()
82-
c, err := NewCoordinator(ctx, SoftDelete, suite.s3MetaStore, false)
82+
c, err := NewCoordinator(ctx, SoftDelete, suite.s3MetaStore, true)
8383
if err != nil {
8484
suite.T().Fatalf("error creating coordinator: %v", err)
8585
}
@@ -1482,6 +1482,17 @@ func (suite *APIsTestSuite) TestForkCollection() {
14821482
collections, err := suite.coordinator.GetCollections(ctx, forkCollectionWithSameName.TargetCollectionID, nil, suite.tenantName, suite.databaseName, nil, nil)
14831483
suite.NoError(err)
14841484
suite.Empty(collections)
1485+
1486+
res, err := suite.coordinator.ListCollectionsToGc(ctx, nil, nil, nil)
1487+
suite.NoError(err)
1488+
suite.NotEmpty(res)
1489+
suite.Equal(1, len(res))
1490+
// ListCollectionsToGc groups by fork trees and should always return the root of the tree
1491+
suite.Equal(forkCollectionWithSameName.SourceCollectionID, res[0].ID)
1492+
1493+
exists, err := suite.s3MetaStore.HasObjectWithPrefix(ctx, *res[0].LineageFilePath)
1494+
suite.NoError(err)
1495+
suite.True(exists, "Lineage file should exist in S3")
14851496
}
14861497

14871498
func (suite *APIsTestSuite) TestCountForks() {

go/pkg/sysdb/coordinator/model/collection.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type CollectionToGc struct {
2828
TenantID string
2929
Name string
3030
VersionFilePath string
31-
LatestVersion int64
31+
LineageFilePath *string
3232
}
3333

3434
type CreateCollection struct {

go/pkg/sysdb/coordinator/model_db_convert.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ func convertCollectionToGcToModel(collectionToGc []*dbmodel.CollectionToGc) []*m
5151
ID: types.MustParse(collectionInfo.ID),
5252
Name: collectionInfo.Name,
5353
VersionFilePath: collectionInfo.VersionFileName,
54-
LatestVersion: int64(collectionInfo.Version),
5554
TenantID: collectionInfo.TenantID,
55+
LineageFilePath: collectionInfo.LineageFileName,
5656
}
5757
collections = append(collections, &collection)
5858
}

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,14 +444,14 @@ func (tc *Catalog) GetCollectionSize(ctx context.Context, collectionID types.Uni
444444
return total_records_post_compaction, nil
445445
}
446446

447-
func (tc *Catalog) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64) ([]*model.CollectionToGc, error) {
447+
func (tc *Catalog) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64, tenantID *string) ([]*model.CollectionToGc, error) {
448448
tracer := otel.Tracer
449449
if tracer != nil {
450450
_, span := tracer.Start(ctx, "Catalog.ListCollectionsToGc")
451451
defer span.End()
452452
}
453453

454-
collectionsToGc, err := tc.metaDomain.CollectionDb(ctx).ListCollectionsToGc(cutoffTimeSecs, limit)
454+
collectionsToGc, err := tc.metaDomain.CollectionDb(ctx).ListCollectionsToGc(cutoffTimeSecs, limit, tenantID)
455455

456456
if err != nil {
457457
return nil, err

go/pkg/sysdb/coordinator/table_catalog_test.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -676,31 +676,29 @@ func TestCatalog_ListCollectionsToGc(t *testing.T) {
676676
limit := uint64(10)
677677

678678
// Mock collections to return
679+
lineageFileName := "lineage_file_1"
679680
collectionsToGc := []*dbmodel.CollectionToGc{
680681
{
681682
ID: "00000000-0000-0000-0000-000000000001",
682683
Name: "collection1",
683-
Version: 3,
684684
VersionFileName: "3_existing_version",
685685
OldestVersionTs: time.Now().Add(-48 * time.Hour), // 48 hours ago
686-
NumVersions: 3,
687686
},
688687
{
689688
ID: "00000000-0000-0000-0000-000000000002",
690689
Name: "collection2",
691-
Version: 2,
692690
VersionFileName: "2_existing_version",
693691
OldestVersionTs: time.Now().Add(-36 * time.Hour), // 36 hours ago
694-
NumVersions: 2,
692+
LineageFileName: &lineageFileName,
695693
},
696694
}
697695

698696
// Setup mock behaviors
699697
mockMetaDomain.On("CollectionDb", mock.Anything).Return(mockCollectionDb)
700-
mockCollectionDb.On("ListCollectionsToGc", &cutoffTimeSecs, &limit).Return(collectionsToGc, nil)
698+
mockCollectionDb.On("ListCollectionsToGc", &cutoffTimeSecs, &limit, (*string)(nil)).Return(collectionsToGc, nil)
701699

702700
// Execute test
703-
result, err := catalog.ListCollectionsToGc(context.Background(), &cutoffTimeSecs, &limit)
701+
result, err := catalog.ListCollectionsToGc(context.Background(), &cutoffTimeSecs, &limit, nil)
704702

705703
// Verify results
706704
assert.NoError(t, err)
@@ -710,14 +708,13 @@ func TestCatalog_ListCollectionsToGc(t *testing.T) {
710708
// Verify first collection
711709
assert.Equal(t, "00000000-0000-0000-0000-000000000001", result[0].ID.String())
712710
assert.Equal(t, "collection1", result[0].Name)
713-
assert.Equal(t, int64(3), result[0].LatestVersion)
714711
assert.Equal(t, "3_existing_version", result[0].VersionFilePath)
715712

716713
// Verify second collection
717714
assert.Equal(t, "00000000-0000-0000-0000-000000000002", result[1].ID.String())
718715
assert.Equal(t, "collection2", result[1].Name)
719-
assert.Equal(t, int64(2), result[1].LatestVersion)
720716
assert.Equal(t, "2_existing_version", result[1].VersionFilePath)
717+
assert.Equal(t, "lineage_file_1", *result[1].LineageFilePath)
721718

722719
// Verify mock expectations
723720
mockMetaDomain.AssertExpectations(t)
@@ -739,19 +736,17 @@ func TestCatalog_ListCollectionsToGc_NilParameters(t *testing.T) {
739736
{
740737
ID: "00000000-0000-0000-0000-000000000001",
741738
Name: "collection1",
742-
Version: 3,
743739
VersionFileName: "3_existing_version",
744740
OldestVersionTs: time.Now().Add(-48 * time.Hour),
745-
NumVersions: 3,
746741
},
747742
}
748743

749744
// Setup mock behaviors
750745
mockMetaDomain.On("CollectionDb", mock.Anything).Return(mockCollectionDb)
751-
mockCollectionDb.On("ListCollectionsToGc", (*uint64)(nil), (*uint64)(nil)).Return(collectionsToGc, nil)
746+
mockCollectionDb.On("ListCollectionsToGc", (*uint64)(nil), (*uint64)(nil), (*string)(nil)).Return(collectionsToGc, nil)
752747

753748
// Execute test with nil parameters
754-
result, err := catalog.ListCollectionsToGc(context.Background(), nil, nil)
749+
result, err := catalog.ListCollectionsToGc(context.Background(), nil, nil, nil)
755750

756751
// Verify results
757752
assert.NoError(t, err)
@@ -761,7 +756,6 @@ func TestCatalog_ListCollectionsToGc_NilParameters(t *testing.T) {
761756
// Verify collection details
762757
assert.Equal(t, "00000000-0000-0000-0000-000000000001", result[0].ID.String())
763758
assert.Equal(t, "collection1", result[0].Name)
764-
assert.Equal(t, int64(3), result[0].LatestVersion)
765759
assert.Equal(t, "3_existing_version", result[0].VersionFilePath)
766760

767761
// Verify mock expectations

go/pkg/sysdb/grpc/collection_service.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ func (s *Server) ListCollectionsToGc(ctx context.Context, req *coordinatorpb.Lis
495495
absoluteCutoffTimeSecs = &cutoffTime
496496
}
497497

498-
collectionsToGc, err := s.coordinator.ListCollectionsToGc(ctx, absoluteCutoffTimeSecs, req.Limit)
498+
collectionsToGc, err := s.coordinator.ListCollectionsToGc(ctx, absoluteCutoffTimeSecs, req.Limit, req.TenantId)
499499
if err != nil {
500500
log.Error("ListCollectionsToGc failed", zap.Error(err))
501501
return nil, grpcutils.BuildInternalGrpcError(err.Error())
@@ -507,6 +507,7 @@ func (s *Server) ListCollectionsToGc(ctx context.Context, req *coordinatorpb.Lis
507507
Name: collectionToGc.Name,
508508
VersionFilePath: collectionToGc.VersionFilePath,
509509
TenantId: collectionToGc.TenantID,
510+
LineageFilePath: collectionToGc.LineageFilePath,
510511
})
511512
}
512513
return res, nil

go/pkg/sysdb/metastore/db/dao/collection.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,29 @@ func (s *collectionDb) GetCollections(id *string, name *string, tenantID string,
5656
return s.getCollections(id, name, tenantID, databaseName, limit, offset, false)
5757
}
5858

59-
func (s *collectionDb) ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64) ([]*dbmodel.CollectionToGc, error) {
60-
var collections []*dbmodel.CollectionToGc
61-
// Use the read replica for this so as to not overwhelm the writer.
62-
query := s.read_db.Table("collections").
63-
Select("collections.id, collections.name, collections.version, collections.version_file_name, collections.oldest_version_ts, collections.num_versions, databases.tenant_id").
64-
Joins("INNER JOIN databases ON collections.database_id = databases.id").
65-
Where("version > 0").
59+
func (s *collectionDb) ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64, tenantID *string) ([]*dbmodel.CollectionToGc, error) {
60+
// There are three types of collections:
61+
// 1. Regular: a collection created by a normal call to create_collection(). Does not have a root_collection_id or a lineage_file_name.
62+
// 2. Root of fork tree: a collection created by a call to create_collection() which was later the source of a fork with fork(). Has a lineage_file_name.
63+
// 3. Fork of a root: a collection created by a call to fork(). Has a root_collection_id.
64+
//
65+
// For the purposes of this method, we group by fork "trees". A fork tree is a root collection and all its forks (or, in the case of regular collections, a single collection). For every fork tree, we check if at least one collection in the tree meets the GC requirements. If so, we return the root collection of the tree. We ignore forks in the response as the garbage collector will GC forks when run on the root collection.
66+
67+
sub := s.read_db.Table("collections").
68+
Select("COALESCE(NULLIF(root_collection_id, ''), id) AS id, MIN(oldest_version_ts) AS min_oldest_version_ts").
69+
Group("COALESCE(NULLIF(root_collection_id, ''), id)").
6670
Where("version_file_name IS NOT NULL").
67-
Where("version_file_name != ''").
68-
Where("root_collection_id IS NULL OR root_collection_id = ''").
69-
Where("lineage_file_name IS NULL OR lineage_file_name = ''")
71+
Where("version_file_name != ''")
72+
73+
if tenantID != nil {
74+
sub = sub.Where("tenant = ?", *tenantID)
75+
}
76+
77+
query := s.read_db.Debug().Table("collections").
78+
Select("collections.id, collections.name, collections.version_file_name, sub.min_oldest_version_ts AS oldest_version_ts, databases.tenant_id, NULLIF(collections.lineage_file_name, '') AS lineage_file_name").
79+
Joins("INNER JOIN databases ON collections.database_id = databases.id").
80+
Joins("INNER JOIN (?) AS sub ON collections.id = sub.id", sub)
81+
7082
// Apply cutoff time filter only if provided
7183
if cutoffTimeSecs != nil {
7284
cutoffTime := time.Unix(int64(*cutoffTimeSecs), 0)
@@ -80,6 +92,7 @@ func (s *collectionDb) ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64
8092
query = query.Limit(int(*limit))
8193
}
8294

95+
var collections []*dbmodel.CollectionToGc
8396
err := query.Find(&collections).Error
8497
if err != nil {
8598
return nil, err

go/pkg/sysdb/metastore/db/dbmodel/collection.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,9 @@ type CollectionToGc struct {
3333
ID string `gorm:"id;primaryKey"`
3434
TenantID string `gorm:"tenant_id;not null;index:idx_tenant_id"`
3535
Name string `gorm:"name;not null;index:idx_name,unique;"`
36-
Version int32 `gorm:"version;default:0"`
3736
VersionFileName string `gorm:"version_file_name"`
3837
OldestVersionTs time.Time `gorm:"oldest_version_ts;type:timestamp"`
39-
NumVersions uint32 `gorm:"num_versions;default:0"`
38+
LineageFileName *string `gorm:"lineage_file_name"`
4039
}
4140

4241
func (v Collection) TableName() string {
@@ -65,7 +64,7 @@ type ICollectionDb interface {
6564
sizeBytesPostCompaction uint64, lastCompactionTimeSecs uint64) (int64, error)
6665
GetCollectionEntry(collectionID *string, databaseName *string) (*Collection, error)
6766
GetCollectionSize(collectionID string) (uint64, error)
68-
ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64) ([]*CollectionToGc, error)
67+
ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64, tenantID *string) ([]*CollectionToGc, error)
6968
UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVersionFileName string, oldestVersionTs *time.Time, numActiveVersions *int) (int64, error)
7069
LockCollection(collectionID string) error
7170
UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath string, newLineageFilePath string) error

go/pkg/sysdb/metastore/db/dbmodel/mocks/ICollectionDb.go

Lines changed: 9 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

idl/chromadb/proto/coordinator.proto

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,8 @@ message ListCollectionsToGcRequest {
429429
// This also allows for a cheap and stateless pagination without using offsets.
430430
optional uint64 limit = 2;
431431

432+
optional string tenant_id = 3;
433+
432434
// Design NOTE: When GC calls DeleteCollectionVersion, sysdb will update the
433435
// time associated with the oldest version of the collection. This allows
434436
// sysdb to return the collections that have not been GCed for a long time.
@@ -438,8 +440,9 @@ message CollectionToGcInfo {
438440
string id = 1;
439441
string name = 2;
440442
string version_file_path = 3;
441-
int64 latest_version = 4;
443+
reserved 4; // used to be "latest_version"
442444
string tenant_id = 5;
445+
optional string lineage_file_path = 6;
443446
}
444447

445448
message ListCollectionsToGcResponse {

0 commit comments

Comments
 (0)