Skip to content

Compaction by tombstone ratio in simpledb #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion simpledb/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func backgroundCompaction(db *DB) {
}

func executeCompaction(db *DB) (compactionMetadata *proto.CompactionMetadata, err error) {
compactionAction := db.sstableManager.candidateTablesForCompaction(db.compactedMaxSizeBytes)
compactionAction := db.sstableManager.candidateTablesForCompaction(db.compactedMaxSizeBytes, db.compactionRatio)
paths := compactionAction.pathsToCompact
numRecords := compactionAction.totalRecords
if len(paths) <= db.compactionFileThreshold {
Expand Down
8 changes: 3 additions & 5 deletions simpledb/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,8 @@ func TestCompactionWithTombstonesBeyondMaxSize(t *testing.T) {

compactionMeta, err := executeCompaction(db)
assert.Nil(t, err)
// TODO(thomas): this should also compact the 42 table, as it wastes a ton of space in tombstones
assert.Equal(t, "sstable_000000000000043", compactionMeta.ReplacementPath)
assert.Equal(t, []string{"sstable_000000000000043"}, compactionMeta.SstablePaths)
assert.Equal(t, "sstable_000000000000042", compactionMeta.ReplacementPath)
assert.Equal(t, []string{"sstable_000000000000042", "sstable_000000000000043"}, compactionMeta.SstablePaths)

err = db.sstableManager.reflectCompactionResult(compactionMeta)
assert.NoError(t, err)
Expand All @@ -175,8 +174,7 @@ func TestCompactionWithTombstonesBeyondMaxSize(t *testing.T) {
assert.Equal(t, "512", v)
// for cleanups
assert.Nil(t, db.sstableManager.currentReader.Close())
// TODO(thomas): ideally that table should only be 10
assert.Equal(t, 310, int(db.sstableManager.currentSSTable().MetaData().NumRecords))
assert.Equal(t, 10, int(db.sstableManager.currentSSTable().MetaData().NumRecords))
}

func writeSSTableWithDataInDatabaseFolder(t *testing.T, db *DB, p string) {
Expand Down
21 changes: 21 additions & 0 deletions simpledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package simpledb

import (
"errors"
"fmt"
"os"
"sync"
"time"
Expand All @@ -25,6 +26,7 @@ const MemStoreMaxSizeBytes uint64 = 1024 * 1024 * 1024 // 1gb
const NumSSTablesToTriggerCompaction int = 10
const DefaultCompactionMaxSizeBytes uint64 = 5 * 1024 * 1024 * 1024 // 5gb
const DefaultCompactionInterval = 5 * time.Second
const DefaultCompactionRatio = float32(0.2)
const DefaultWriteBufferSizeBytes uint64 = 4 * 1024 * 1024 // 4Mb
const DefaultReadBufferSizeBytes uint64 = 4 * 1024 * 1024 // 4Mb

Expand Down Expand Up @@ -73,6 +75,7 @@ type DB struct {
memstoreMaxSize uint64
compactionFileThreshold int
compactionInterval time.Duration
compactionRatio float32
compactedMaxSizeBytes uint64
enableCompactions bool
enableAsyncWAL bool
Expand Down Expand Up @@ -331,6 +334,7 @@ func NewSimpleDB(basePath string, extraOptions ...ExtraOption) (*DB, error) {
NumSSTablesToTriggerCompaction,
DefaultCompactionMaxSizeBytes,
DefaultCompactionInterval,
DefaultCompactionRatio,
DefaultWriteBufferSizeBytes,
DefaultReadBufferSizeBytes,
}
Expand Down Expand Up @@ -361,6 +365,7 @@ func NewSimpleDB(basePath string, extraOptions ...ExtraOption) (*DB, error) {
enableAsyncWAL: extraOpts.enableAsyncWAL,
enableDirectIOWAL: extraOpts.enableDirectIOWAL,
compactionInterval: extraOpts.compactionRunInterval,
compactionRatio: extraOpts.compactionRatio,
closed: false,
rwLock: rwLock,
wal: nil,
Expand All @@ -385,6 +390,7 @@ type ExtraOptions struct {
compactionFileThreshold int
compactionMaxSizeBytes uint64
compactionRunInterval time.Duration
compactionRatio float32
writeBufferSizeBytes uint64
readBufferSizeBytes uint64
}
Expand Down Expand Up @@ -428,6 +434,21 @@ func CompactionRunInterval(interval time.Duration) ExtraOption {
}
}

// CompactionRatio configures when a sstable is eligible for compaction through a ratio threshold, which can be used to save disk space.
// The ratio is measured as the amount of tombstoned keys divided by the overall record number in the sstable.
// This threshold must be between 0.0 and 1.0 as float32 and by default is DefaultCompactionRatio.
// So when a sstable has more than 20% of records flagged as tombstones, it will be automatically compacted.
// A value of 1.0 turns this feature off and resorts to the max size calculation, a value of 0.0 will always compact
// all files regardless of how many tombstones are in there.
func CompactionRatio(ratio float32) ExtraOption {
if ratio < 0.0 || ratio > 1.0 {
panic(fmt.Sprintf("invalid compaction ratio: %f, must be between 0 and 1", ratio))
}
return func(args *ExtraOptions) {
args.compactionRatio = ratio
}
}

// CompactionFileThreshold tells how often SSTables are being compacted, this is measured in the number of SSTables.
// The default is 10, which in turn will compact into a single SSTable.
func CompactionFileThreshold(n int) ExtraOption {
Expand Down
9 changes: 7 additions & 2 deletions simpledb/sstable_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,18 @@ func (s *SSTableManager) currentSSTable() sstables.SSTableReaderI {
return s.currentReader
}

func (s *SSTableManager) candidateTablesForCompaction(compactionMaxSizeBytes uint64) compactionAction {
func (s *SSTableManager) candidateTablesForCompaction(compactionMaxSizeBytes uint64, compactionRatio float32) compactionAction {
s.managerLock.RLock()
defer s.managerLock.RUnlock()

var selectedForCompaction []bool
for i := 0; i < len(s.allSSTableReaders); i++ {
selectedForCompaction = append(selectedForCompaction, s.allSSTableReaders[i].MetaData().TotalBytes < compactionMaxSizeBytes)
selected := s.allSSTableReaders[i].MetaData().TotalBytes < compactionMaxSizeBytes
if s.allSSTableReaders[i].MetaData().NumRecords > 0 {
tombstoneRatio := float32(s.allSSTableReaders[i].MetaData().NullValues) / float32(s.allSSTableReaders[i].MetaData().NumRecords)
selected = selected || tombstoneRatio >= compactionRatio
}
selectedForCompaction = append(selectedForCompaction, selected)
}

/*
Expand Down
44 changes: 37 additions & 7 deletions simpledb/sstable_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,40 @@ func TestSSTableManagerSelectCompactionCandidates(t *testing.T) {
path: "4",
})

assertCompactionAction(t, 0, []string{"4"}, manager.candidateTablesForCompaction(25))
assertCompactionAction(t, 105, []string{"2", "3", "4"}, manager.candidateTablesForCompaction(51))
assertCompactionAction(t, 115, []string{"1", "2", "3", "4"}, manager.candidateTablesForCompaction(101))
assertCompactionAction(t, 115, []string{"1", "2", "3", "4"}, manager.candidateTablesForCompaction(1500))
assertCompactionAction(t, 0, []string{"4"}, manager.candidateTablesForCompaction(25, 1))
assertCompactionAction(t, 105, []string{"2", "3", "4"}, manager.candidateTablesForCompaction(51, 1))
assertCompactionAction(t, 115, []string{"1", "2", "3", "4"}, manager.candidateTablesForCompaction(101, 1))
assertCompactionAction(t, 115, []string{"1", "2", "3", "4"}, manager.candidateTablesForCompaction(1500, 1))
}

func TestSSTableManagerSelectCompactionCandidatesTombstoneRatios(t *testing.T) {
manager := NewSSTableManager(skiplist.BytesComparator{}, &sync.RWMutex{}, "")

manager.addReader(&MockSSTableReader{
metadata: &proto.MetaData{NumRecords: 10, NullValues: 8, TotalBytes: 1000},
path: "1",
})

manager.addReader(&MockSSTableReader{
metadata: &proto.MetaData{NumRecords: 5, NullValues: 0, TotalBytes: 1000},
path: "2",
})

manager.addReader(&MockSSTableReader{
metadata: &proto.MetaData{NumRecords: 100, NullValues: 10, TotalBytes: 1000},
path: "3",
})

manager.addReader(&MockSSTableReader{
metadata: &proto.MetaData{NumRecords: 0, NullValues: 0, TotalBytes: 1000},
path: "4",
})

assertCompactionAction(t, 10, []string{"1"}, manager.candidateTablesForCompaction(999, 0.2))
// 1 and 3 should be selected by ratio, 2 is here for the ride because of flood filling
assertCompactionAction(t, 115, []string{"1", "2", "3"}, manager.candidateTablesForCompaction(999, 0.1))
assertCompactionAction(t, 115, []string{"1", "2", "3"}, manager.candidateTablesForCompaction(999, 0))
assertCompactionAction(t, 0, nil, manager.candidateTablesForCompaction(999, 1))
}

func TestSSTableManagerSelectCompactionCandidatesEmptyStart(t *testing.T) {
Expand All @@ -124,8 +154,8 @@ func TestSSTableManagerSelectCompactionCandidatesEmptyStart(t *testing.T) {
path: "4",
})

assertCompactionAction(t, 5, []string{"1", "2", "3"}, manager.candidateTablesForCompaction(100))
assertCompactionAction(t, 30, []string{"1", "2", "3", "4"}, manager.candidateTablesForCompaction(200))
assertCompactionAction(t, 5, []string{"1", "2", "3"}, manager.candidateTablesForCompaction(100, 1))
assertCompactionAction(t, 30, []string{"1", "2", "3", "4"}, manager.candidateTablesForCompaction(200, 1))
}

func TestSSTableManagerSelectCompactionCandidatesTombstonedHoles(t *testing.T) {
Expand All @@ -151,7 +181,7 @@ func TestSSTableManagerSelectCompactionCandidatesTombstonedHoles(t *testing.T) {
path: "4",
})

assertCompactionAction(t, 3010, []string{"2", "3", "4"}, manager.candidateTablesForCompaction(2000))
assertCompactionAction(t, 3010, []string{"2", "3", "4"}, manager.candidateTablesForCompaction(2000, 1))
}

func assertCompactionAction(t *testing.T, numRecords int, paths []string, actualAction compactionAction) {
Expand Down
Loading