Skip to content

Commit 3e4a25d

Browse files
authored
fix(test): fix flakiness of TestPersistLFDiscardStats (#1963)
fixes DGRAPHCORE-234 ## Problem The purpose of TestPersistLFDiscardStats is to make sure that if you make changes to the file such that we need to maintain the discardStats in the value log, and if you close the DB, then when you reopen the DB, you remember the same status. Note that the DiscardStats are updated when we perform compaction, and in our test cases we have 4 compactors concurrently running (with ids 0, 1, 2, and 3). Most of the time, we were fine when we captured a single compaction cycle -- and then closed the DB and then reopened. However, there was a race condition wherein we waited for some arbitrary amount of time, then captured the current discardStats, then closed the DB. The problem is that between the time that we capture the discardStatus and close the DB, another compaction cycle may have started! Hence, every once in a while, we would find that the saved copy of the discard stats would not match what we picked up later when we reopened the file. ## Solution As noted in the problem statement, we ended up waiting an "arbitrary amount of time" instead of waiting for specific events and then reacting to those events. Specifically, we wanted to wait until at least "some stats" had been generated, and then we waited for compaction to complete. Unfortunately, there did not exist a clear way (previously) to capture the event of "some stats having been generated", nor was there a way to capture the discardStats upon closure of the database. The solution then was to add in two things: first, a test channel in the database where we can log messages to this channel, but only when the channel has been specified. Second, we add in a means (via options.go) of specifying an "onCloseDiscardCapture" map. This map will be populated (assuming it was initialized and is not nil) when we close the db and specifically when we close the valueLog. We no longer rely on time.Sleep, but instead rely on specific events.
1 parent 907dd65 commit 3e4a25d

File tree

6 files changed

+139
-8
lines changed

6 files changed

+139
-8
lines changed

db.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ func (lk *lockedKeys) all() []uint64 {
9090
// DB provides the various functions required to interact with Badger.
9191
// DB is thread-safe.
9292
type DB struct {
93+
testOnlyDBExtensions
94+
9395
lock sync.RWMutex // Guards list of inmemory tables, not individual reads and writes.
9496

9597
dirLockGuard *directoryLockGuard
@@ -252,6 +254,9 @@ func Open(opt Options) (*DB, error) {
252254
bannedNamespaces: &lockedKeys{keys: make(map[uint64]struct{})},
253255
threshold: initVlogThreshold(&opt),
254256
}
257+
258+
db.syncChan = opt.syncChan
259+
255260
// Cleanup all the goroutines started by badger in case of an error.
256261
defer func() {
257262
if err != nil {

db_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,42 @@ import (
4040
"github.com/dgraph-io/ristretto/z"
4141
)
4242

43+
// waitForMessage(ch, expected, count, timeout, t) will block until either
44+
// `timeout` seconds have occurred or `count` instances of the string `expected`
45+
// have occurred on the channel `ch`. We log messages or generate errors using `t`.
46+
func waitForMessage(ch chan string, expected string, count int, timeout int, t *testing.T) {
47+
if count <= 0 {
48+
t.Logf("Will skip waiting for %s since exected count <= 0.",
49+
expected)
50+
return
51+
}
52+
tout := time.NewTimer(time.Duration(timeout) * time.Second)
53+
remaining := count
54+
for {
55+
select {
56+
case curMsg, ok := <-ch:
57+
if !ok {
58+
t.Errorf("Test channel closed while waiting for "+
59+
"message %s with %d remaining instances expected",
60+
expected, remaining)
61+
return
62+
}
63+
t.Logf("Found message: %s", curMsg)
64+
if curMsg == expected {
65+
remaining--
66+
if remaining == 0 {
67+
return
68+
}
69+
}
70+
case <-tout.C:
71+
t.Errorf("Timed out after %d seconds while waiting on test chan "+
72+
"for message '%s' with %d remaining instances expected",
73+
timeout, expected, remaining)
74+
return
75+
}
76+
}
77+
}
78+
4379
// summary is produced when DB is closed. Currently it is used only for testing.
4480
type summary struct {
4581
fileIDs map[uint64]bool

options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ import (
4242
//
4343
// Each option X is documented on the WithX method.
4444
type Options struct {
45+
testOnlyOptions
46+
4547
// Required options.
4648

4749
Dir string

test_extensions.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2023 Dgraph Labs, Inc. and Contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package badger
18+
19+
// Important: Do NOT import the "testing" package, as otherwise, that
20+
// will pull in imports into the production class that we do not want.
21+
22+
// TODO: Consider using this with specific compilation tags so that it only
23+
// shows up when performing testing (e.g., specify build tag=unit).
24+
// We are not yet ready to do that, as it may impact customer usage as
25+
// well as requiring us to update the CI build flags. Moreover, the
26+
// current model does not actually incur any significant cost.
27+
// If we do this, we will also want to introduce a parallel file that
28+
// overrides some of these structs and functions with empty contents.
29+
30+
// String constants for messages to be pushed to syncChan.
31+
const (
32+
updateDiscardStatsMsg = "updateDiscardStats iteration done"
33+
endVLogInitMsg = "End: vlog.init(db)"
34+
)
35+
36+
// testOnlyOptions specifies an extension to the type Options that we want to
37+
// use only in the context of testing.
38+
type testOnlyOptions struct {
39+
// syncChan is used to listen for specific messages related to activities
40+
// that can occur in a DB instance. Currently, this is only used in
41+
// testing activities.
42+
syncChan chan string
43+
}
44+
45+
// testOnlyDBExtensions specifies an extension to the type DB that we want to
46+
// use only in the context of testing.
47+
type testOnlyDBExtensions struct {
48+
syncChan chan string
49+
50+
// onCloseDiscardCapture will be populated by a DB instance during the
51+
// process of performing the Close operation. Currently, we only consider
52+
// using this during testing.
53+
onCloseDiscardCapture map[uint64]uint64
54+
}
55+
56+
// logToSyncChan sends a message to the DB's syncChan. Note that we expect
57+
// that the DB never closes this channel; the responsibility for
58+
// allocating and closing the channel belongs to the test module.
59+
// if db.syncChan is nil or has never been initialized, ths will be
60+
// silently ignored.
61+
func (db *DB) logToSyncChan(msg string) {
62+
if db.syncChan != nil {
63+
db.syncChan <- msg
64+
}
65+
}
66+
67+
// captureDiscardStats will copy the contents of the discardStats file
68+
// maintained by vlog to the onCloseDiscardCapture map specified by
69+
// db.opt. Of couse, if db.opt.onCloseDiscardCapture is nil (as expected
70+
// for a production system as opposed to a test system), this is a no-op.
71+
func (db *DB) captureDiscardStats() {
72+
if db.onCloseDiscardCapture != nil {
73+
db.vlog.discardStats.Lock()
74+
db.vlog.discardStats.Iterate(func(id, val uint64) {
75+
db.onCloseDiscardCapture[id] = val
76+
})
77+
db.vlog.discardStats.Unlock()
78+
}
79+
}

value.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,8 @@ func (vlog *valueLog) init(db *DB) {
554554
lf, err := InitDiscardStats(vlog.opt)
555555
y.Check(err)
556556
vlog.discardStats = lf
557+
// See TestPersistLFDiscardStats for purpose of statement below.
558+
db.logToSyncChan(endVLogInitMsg)
557559
}
558560

559561
func (vlog *valueLog) open(db *DB) error {
@@ -640,6 +642,7 @@ func (vlog *valueLog) Close() error {
640642
}
641643
}
642644
if vlog.discardStats != nil {
645+
vlog.db.captureDiscardStats()
643646
if terr := vlog.discardStats.Close(-1); terr != nil && err == nil {
644647
err = terr
645648
}
@@ -1103,6 +1106,9 @@ func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
11031106
for fid, discard := range stats {
11041107
vlog.discardStats.Update(fid, discard)
11051108
}
1109+
// The following is to coordinate with some test cases where we want to
1110+
// verify that at least one iteration of updateDiscardStats has been completed.
1111+
vlog.db.logToSyncChan(updateDiscardStatsMsg)
11061112
}
11071113

11081114
type vlogThreshold struct {

value_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"reflect"
2727
"sync"
2828
"testing"
29-
"time"
3029

3130
humanize "github.com/dustin/go-humanize"
3231
"github.com/stretchr/testify/require"
@@ -496,9 +495,14 @@ func TestPersistLFDiscardStats(t *testing.T) {
496495
opt.CompactL0OnClose = false
497496
opt.MemTableSize = 1 << 15
498497
opt.ValueThreshold = 1 << 10
498+
tChan := make(chan string, 100)
499+
defer close(tChan)
500+
opt.syncChan = tChan
499501

500502
db, err := Open(opt)
501503
require.NoError(t, err)
504+
capturedDiscardStats := make(map[uint64]uint64)
505+
db.onCloseDiscardCapture = capturedDiscardStats
502506

503507
sz := 128 << 10 // 5 entries per value log file.
504508
v := make([]byte, sz)
@@ -522,14 +526,11 @@ func TestPersistLFDiscardStats(t *testing.T) {
522526
require.NoError(t, err)
523527
}
524528

525-
time.Sleep(2 * time.Second) // wait for compaction to complete
529+
// Wait for invocation of updateDiscardStats at least once -- timeout after 60 seconds.
530+
waitForMessage(tChan, updateDiscardStatsMsg, 1, 60, t)
526531

527-
persistedMap := make(map[uint64]uint64)
528532
db.vlog.discardStats.Lock()
529533
require.True(t, db.vlog.discardStats.Len() > 1, "some discardStats should be generated")
530-
db.vlog.discardStats.Iterate(func(fid, val uint64) {
531-
persistedMap[fid] = val
532-
})
533534

534535
db.vlog.discardStats.Unlock()
535536
require.NoError(t, db.Close())
@@ -539,13 +540,15 @@ func TestPersistLFDiscardStats(t *testing.T) {
539540
db, err = Open(opt)
540541
require.NoError(t, err)
541542
defer db.Close()
542-
time.Sleep(1 * time.Second) // Wait for discardStats to be populated by populateDiscardStats().
543+
waitForMessage(tChan, endVLogInitMsg, 1, 60, t)
543544
db.vlog.discardStats.Lock()
544545
statsMap := make(map[uint64]uint64)
545546
db.vlog.discardStats.Iterate(func(fid, val uint64) {
546547
statsMap[fid] = val
547548
})
548-
require.True(t, reflect.DeepEqual(persistedMap, statsMap), "Discard maps are not equal")
549+
require.Truef(t, reflect.DeepEqual(capturedDiscardStats, statsMap),
550+
"Discard maps are not equal. On Close: %+v, After Reopen: %+v",
551+
capturedDiscardStats, statsMap)
549552
db.vlog.discardStats.Unlock()
550553
}
551554

0 commit comments

Comments
 (0)