Skip to content

feat: Defer cleanup for log/index compactions, add debug log (#26511) #26554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 1.12
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 43 additions & 22 deletions tsdb/index/tsi1/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"

Expand Down Expand Up @@ -62,7 +63,7 @@ type Partition struct {
// Fieldset shared with engine.
fieldset *tsdb.MeasurementFieldSet

currentCompactionN int // counter of in-progress compactions
currentCompactionN atomic.Int32 // counter of in-progress compactions

// Directory of the Partition's index files.
path string
Expand Down Expand Up @@ -348,22 +349,38 @@ func (p *Partition) buildSeriesSet() error {
}

// CurrentCompactionN returns the number of compactions currently running.
func (p *Partition) CurrentCompactionN() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.currentCompactionN
func (p *Partition) CurrentCompactionN() int32 {
return p.currentCompactionN.Load()
}

// Wait will block until all compactions are finished.
// Must only be called while they are disabled.
func (p *Partition) Wait() {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()

// Debug level timeout
timeoutDuration := 24 * time.Hour
startTime := time.Now()

for {
if p.CurrentCompactionN() == 0 {
return
}
<-ticker.C
select {
case <-ticker.C:
elapsed := time.Since(startTime)
if elapsed >= timeoutDuration {
files := make([]string, 0)
for _, v := range p.fileSet.Files() {
files = append(files, v.Path())
}
p.logger.Warn("Partition.Wait() timed out waiting for compactions to complete",
zap.Int32("stuck_compactions", p.CurrentCompactionN()), zap.Duration("timeout", timeoutDuration),
zap.Strings("files", files))
startTime = time.Now()
}
}
}
}

Expand Down Expand Up @@ -1040,14 +1057,17 @@ func (p *Partition) compact() {
}
// Mark the level as compacting.
p.levelCompacting[0] = true
p.currentCompactionN++
p.currentCompactionN.Add(1)
go func() {
defer func() {
p.mu.Lock()
p.currentCompactionN.Add(-1)
p.levelCompacting[0] = false
p.mu.Unlock()
p.Compact()
}()

p.compactLogFile(logFile)
p.mu.Lock()
p.currentCompactionN--
p.levelCompacting[0] = false
p.mu.Unlock()
p.Compact()
}()
}
}
Expand Down Expand Up @@ -1079,20 +1099,21 @@ func (p *Partition) compact() {
// Execute in closure to save reference to the group within the loop.
func(files []*IndexFile, level int) {
// Start compacting in a separate goroutine.
p.currentCompactionN++
p.currentCompactionN.Add(1)
go func() {
defer func() {
// Ensure compaction lock for the level is released.
p.mu.Lock()
p.levelCompacting[level] = false
p.currentCompactionN.Add(-1)
p.mu.Unlock()

// Check for new compactions
p.Compact()
}()

// Compact to a new level.
p.compactToLevel(files, level+1, interrupt)

// Ensure compaction lock for the level is released.
p.mu.Lock()
p.levelCompacting[level] = false
p.currentCompactionN--
p.mu.Unlock()

// Check for new compactions
p.Compact()
}()
}(files, level)
}
Expand Down