Skip to content

Commit 9e05d4e

Browse files
committed
[ENH]: ListCollectionsToGc should return lineage file path, group by fork tree
1 parent 8170f91 commit 9e05d4e

File tree

14 files changed

+230
-40
lines changed

14 files changed

+230
-40
lines changed

go/pkg/sysdb/coordinator/coordinator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,8 @@ func (s *Coordinator) FlushCollectionCompaction(ctx context.Context, flushCollec
247247
return s.catalog.FlushCollectionCompaction(ctx, flushCollectionCompaction)
248248
}
249249

250-
func (s *Coordinator) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64) ([]*model.CollectionToGc, error) {
251-
return s.catalog.ListCollectionsToGc(ctx, cutoffTimeSecs, limit)
250+
func (s *Coordinator) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64, tenantID *string) ([]*model.CollectionToGc, error) {
251+
return s.catalog.ListCollectionsToGc(ctx, cutoffTimeSecs, limit, tenantID)
252252
}
253253

254254
func (s *Coordinator) ListCollectionVersions(ctx context.Context, collectionID types.UniqueID, tenantID string, maxCount *int64, versionsBefore *int64, versionsAtOrAfter *int64, includeMarkedForDeletion bool) ([]*coordinatorpb.CollectionVersionInfo, error) {

go/pkg/sysdb/coordinator/coordinator_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (suite *APIsTestSuite) SetupTest() {
7979
collection.Name = "collection_" + suite.T().Name() + strconv.Itoa(index)
8080
}
8181
ctx := context.Background()
82-
c, err := NewCoordinator(ctx, SoftDelete, suite.s3MetaStore, false)
82+
c, err := NewCoordinator(ctx, SoftDelete, suite.s3MetaStore, true)
8383
if err != nil {
8484
suite.T().Fatalf("error creating coordinator: %v", err)
8585
}
@@ -1482,6 +1482,17 @@ func (suite *APIsTestSuite) TestForkCollection() {
14821482
collections, err := suite.coordinator.GetCollections(ctx, forkCollectionWithSameName.TargetCollectionID, nil, suite.tenantName, suite.databaseName, nil, nil)
14831483
suite.NoError(err)
14841484
suite.Empty(collections)
1485+
1486+
res, err := suite.coordinator.ListCollectionsToGc(ctx, nil, nil, nil)
1487+
suite.NoError(err)
1488+
suite.NotEmpty(res)
1489+
suite.Equal(1, len(res))
1490+
// ListCollectionsToGc groups by fork trees and should always return the root of the tree
1491+
suite.Equal(forkCollectionWithSameName.SourceCollectionID, res[0].ID)
1492+
1493+
exists, err := suite.s3MetaStore.HasObjectWithPrefix(ctx, *res[0].LineageFilePath)
1494+
suite.NoError(err)
1495+
suite.True(exists, "Lineage file should exist in S3")
14851496
}
14861497

14871498
func (suite *APIsTestSuite) TestCountForks() {

go/pkg/sysdb/coordinator/model/collection.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type CollectionToGc struct {
2929
Name string
3030
VersionFilePath string
3131
LatestVersion int64
32+
LineageFilePath *string
3233
}
3334

3435
type CreateCollection struct {

go/pkg/sysdb/coordinator/model_db_convert.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func convertCollectionToGcToModel(collectionToGc []*dbmodel.CollectionToGc) []*m
5353
VersionFilePath: collectionInfo.VersionFileName,
5454
LatestVersion: int64(collectionInfo.Version),
5555
TenantID: collectionInfo.TenantID,
56+
LineageFilePath: collectionInfo.LineageFileName,
5657
}
5758
collections = append(collections, &collection)
5859
}

go/pkg/sysdb/coordinator/table_catalog.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,14 +444,14 @@ func (tc *Catalog) GetCollectionSize(ctx context.Context, collectionID types.Uni
444444
return total_records_post_compaction, nil
445445
}
446446

447-
func (tc *Catalog) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64) ([]*model.CollectionToGc, error) {
447+
func (tc *Catalog) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64, tenantID *string) ([]*model.CollectionToGc, error) {
448448
tracer := otel.Tracer
449449
if tracer != nil {
450450
_, span := tracer.Start(ctx, "Catalog.ListCollectionsToGc")
451451
defer span.End()
452452
}
453453

454-
collectionsToGc, err := tc.metaDomain.CollectionDb(ctx).ListCollectionsToGc(cutoffTimeSecs, limit)
454+
collectionsToGc, err := tc.metaDomain.CollectionDb(ctx).ListCollectionsToGc(cutoffTimeSecs, limit, tenantID)
455455

456456
if err != nil {
457457
return nil, err

go/pkg/sysdb/coordinator/table_catalog_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,7 @@ func TestCatalog_ListCollectionsToGc(t *testing.T) {
676676
limit := uint64(10)
677677

678678
// Mock collections to return
679+
lineageFileName := "lineage_file_1"
679680
collectionsToGc := []*dbmodel.CollectionToGc{
680681
{
681682
ID: "00000000-0000-0000-0000-000000000001",
@@ -692,15 +693,16 @@ func TestCatalog_ListCollectionsToGc(t *testing.T) {
692693
VersionFileName: "2_existing_version",
693694
OldestVersionTs: time.Now().Add(-36 * time.Hour), // 36 hours ago
694695
NumVersions: 2,
696+
LineageFileName: &lineageFileName,
695697
},
696698
}
697699

698700
// Setup mock behaviors
699701
mockMetaDomain.On("CollectionDb", mock.Anything).Return(mockCollectionDb)
700-
mockCollectionDb.On("ListCollectionsToGc", &cutoffTimeSecs, &limit).Return(collectionsToGc, nil)
702+
mockCollectionDb.On("ListCollectionsToGc", &cutoffTimeSecs, &limit, (*string)(nil)).Return(collectionsToGc, nil)
701703

702704
// Execute test
703-
result, err := catalog.ListCollectionsToGc(context.Background(), &cutoffTimeSecs, &limit)
705+
result, err := catalog.ListCollectionsToGc(context.Background(), &cutoffTimeSecs, &limit, nil)
704706

705707
// Verify results
706708
assert.NoError(t, err)
@@ -718,6 +720,7 @@ func TestCatalog_ListCollectionsToGc(t *testing.T) {
718720
assert.Equal(t, "collection2", result[1].Name)
719721
assert.Equal(t, int64(2), result[1].LatestVersion)
720722
assert.Equal(t, "2_existing_version", result[1].VersionFilePath)
723+
assert.Equal(t, "lineage_file_1", *result[1].LineageFilePath)
721724

722725
// Verify mock expectations
723726
mockMetaDomain.AssertExpectations(t)
@@ -748,10 +751,10 @@ func TestCatalog_ListCollectionsToGc_NilParameters(t *testing.T) {
748751

749752
// Setup mock behaviors
750753
mockMetaDomain.On("CollectionDb", mock.Anything).Return(mockCollectionDb)
751-
mockCollectionDb.On("ListCollectionsToGc", (*uint64)(nil), (*uint64)(nil)).Return(collectionsToGc, nil)
754+
mockCollectionDb.On("ListCollectionsToGc", (*uint64)(nil), (*uint64)(nil), (*string)(nil)).Return(collectionsToGc, nil)
752755

753756
// Execute test with nil parameters
754-
result, err := catalog.ListCollectionsToGc(context.Background(), nil, nil)
757+
result, err := catalog.ListCollectionsToGc(context.Background(), nil, nil, nil)
755758

756759
// Verify results
757760
assert.NoError(t, err)

go/pkg/sysdb/grpc/collection_service.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ func (s *Server) ListCollectionsToGc(ctx context.Context, req *coordinatorpb.Lis
495495
absoluteCutoffTimeSecs = &cutoffTime
496496
}
497497

498-
collectionsToGc, err := s.coordinator.ListCollectionsToGc(ctx, absoluteCutoffTimeSecs, req.Limit)
498+
collectionsToGc, err := s.coordinator.ListCollectionsToGc(ctx, absoluteCutoffTimeSecs, req.Limit, req.TenantId)
499499
if err != nil {
500500
log.Error("ListCollectionsToGc failed", zap.Error(err))
501501
return nil, grpcutils.BuildInternalGrpcError(err.Error())
@@ -507,6 +507,7 @@ func (s *Server) ListCollectionsToGc(ctx context.Context, req *coordinatorpb.Lis
507507
Name: collectionToGc.Name,
508508
VersionFilePath: collectionToGc.VersionFilePath,
509509
TenantId: collectionToGc.TenantID,
510+
LineageFilePath: collectionToGc.LineageFilePath,
510511
})
511512
}
512513
return res, nil

go/pkg/sysdb/metastore/db/dao/collection.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,29 @@ func (s *collectionDb) GetCollections(id *string, name *string, tenantID string,
5656
return s.getCollections(id, name, tenantID, databaseName, limit, offset, false)
5757
}
5858

59-
func (s *collectionDb) ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64) ([]*dbmodel.CollectionToGc, error) {
60-
var collections []*dbmodel.CollectionToGc
61-
// Use the read replica for this so as to not overwhelm the writer.
59+
func (s *collectionDb) ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64, tenantID *string) ([]*dbmodel.CollectionToGc, error) {
60+
// There are three types of collections:
61+
// 1. Regular: a collection created by a normal call to create_collection(). Does not have a root_collection_id or a lineage_file_name.
62+
// 2. Root of fork tree: a collection created by a call to create_collection() which was later the source of a fork with fork(). Has a lineage_file_name.
63+
// 3. Fork of a root: a collection created by a call to fork(). Has a root_collection_id.
64+
//
65+
// For the purposes of this method, we group by fork "trees". A fork tree is a root collection and all its forks (or, in the case of regular collections, a single collection). For every fork tree, we check if at least one collection in the tree meets the GC requirements. If so, we return the root collection of the tree. We ignore forks in the response as the garbage collector will GC forks when run on the root collection.
66+
67+
sub := s.read_db.Table("collections").
68+
Select("COALESCE(NULLIF(root_collection_id, ''), id) AS id, MAX(version) AS max_version, MIN(oldest_version_ts) AS min_oldest_version_ts, SUM(num_versions) AS sum_num_versions").
69+
Group("COALESCE(NULLIF(root_collection_id, ''), id)").
70+
Where("version_file_name IS NOT NULL").
71+
Where("version_file_name != ''")
72+
73+
if tenantID != nil {
74+
sub = sub.Where("tenant = ?", *tenantID)
75+
}
76+
6277
query := s.read_db.Table("collections").
63-
Select("collections.id, collections.name, collections.version, collections.version_file_name, collections.oldest_version_ts, collections.num_versions, databases.tenant_id").
78+
Select("collections.id, collections.name, collections.version, collections.version_file_name, sub.min_oldest_version_ts AS oldest_version_ts, sub.sum_num_versions AS num_versions, databases.tenant_id, collections.lineage_file_name").
6479
Joins("INNER JOIN databases ON collections.database_id = databases.id").
65-
Where("version > 0").
66-
Where("version_file_name IS NOT NULL").
67-
Where("version_file_name != ''").
68-
Where("root_collection_id IS NULL OR root_collection_id = ''").
69-
Where("lineage_file_name IS NULL OR lineage_file_name = ''")
80+
Joins("INNER JOIN (?) AS sub ON collections.id = sub.id", sub)
81+
7082
// Apply cutoff time filter only if provided
7183
if cutoffTimeSecs != nil {
7284
cutoffTime := time.Unix(int64(*cutoffTimeSecs), 0)
@@ -80,6 +92,7 @@ func (s *collectionDb) ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64
8092
query = query.Limit(int(*limit))
8193
}
8294

95+
var collections []*dbmodel.CollectionToGc
8396
err := query.Find(&collections).Error
8497
if err != nil {
8598
return nil, err

go/pkg/sysdb/metastore/db/dbmodel/collection.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type CollectionToGc struct {
3737
VersionFileName string `gorm:"version_file_name"`
3838
OldestVersionTs time.Time `gorm:"oldest_version_ts;type:timestamp"`
3939
NumVersions uint32 `gorm:"num_versions;default:0"`
40+
LineageFileName *string `gorm:"lineage_file_name"`
4041
}
4142

4243
func (v Collection) TableName() string {
@@ -65,7 +66,7 @@ type ICollectionDb interface {
6566
sizeBytesPostCompaction uint64, lastCompactionTimeSecs uint64) (int64, error)
6667
GetCollectionEntry(collectionID *string, databaseName *string) (*Collection, error)
6768
GetCollectionSize(collectionID string) (uint64, error)
68-
ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64) ([]*CollectionToGc, error)
69+
ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64, tenantID *string) ([]*CollectionToGc, error)
6970
UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVersionFileName string, oldestVersionTs *time.Time, numActiveVersions *int) (int64, error)
7071
LockCollection(collectionID string) error
7172
UpdateCollectionLineageFilePath(collectionID string, currentLineageFilePath string, newLineageFilePath string) error

go/pkg/sysdb/metastore/db/dbmodel/mocks/ICollectionDb.go

Lines changed: 9 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

idl/chromadb/proto/coordinator.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,8 @@ message ListCollectionsToGcRequest {
429429
// This also allows for a cheap and stateless pagination without using offsets.
430430
optional uint64 limit = 2;
431431

432+
optional string tenant_id = 3;
433+
432434
// Design NOTE: When GC calls DeleteCollectionVersion, sysdb will update the
433435
// time associated with the oldest version of the collection. This allows
434436
// sysdb to return the collections that have not been GCed for a long time.
@@ -440,6 +442,7 @@ message CollectionToGcInfo {
440442
string version_file_path = 3;
441443
int64 latest_version = 4;
442444
string tenant_id = 5;
445+
optional string lineage_file_path = 6;
443446
}
444447

445448
message ListCollectionsToGcResponse {

0 commit comments

Comments
 (0)