Skip to content

Commit e08abbe

Browse files
Iwasaki Yudaijpbetz
Iwasaki Yudai
authored andcommitted
mvcc: restore unsynced watchers
In case syncWatchersLoop() starts before Restore() is called, watchers already added by that moment are moved to s.synced by the loop. However, there is a broken logic that moves watchers from s.synced to s.uncyned without setting keyWatchers of the watcherGroup. Eventually syncWatchers() fails to pickup those watchers from s.unsynced and no events are sent to the watchers, because newWatcherBatch() called in the function uses wg.watcherSetByKey() internally that requires a proper keyWatchers value.
1 parent bdc3ed1 commit e08abbe

File tree

2 files changed

+38
-29
lines changed

2 files changed

+38
-29
lines changed

mvcc/watchable_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
188188
}
189189

190190
for wa := range s.synced.watchers {
191-
s.unsynced.watchers.add(wa)
191+
s.unsynced.add(wa)
192192
}
193193
s.synced = newWatcherGroup()
194194
return nil

mvcc/watchable_store_test.go

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -295,36 +295,45 @@ func TestWatchFutureRev(t *testing.T) {
295295
}
296296

297297
func TestWatchRestore(t *testing.T) {
298-
b, tmpPath := backend.NewDefaultTmpBackend()
299-
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
300-
defer cleanup(s, b, tmpPath)
301-
302-
testKey := []byte("foo")
303-
testValue := []byte("bar")
304-
rev := s.Put(testKey, testValue, lease.NoLease)
305-
306-
newBackend, newPath := backend.NewDefaultTmpBackend()
307-
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
308-
defer cleanup(newStore, newBackend, newPath)
309-
310-
w := newStore.NewWatchStream()
311-
w.Watch(testKey, nil, rev-1)
312-
313-
newStore.Restore(b)
314-
select {
315-
case resp := <-w.Chan():
316-
if resp.Revision != rev {
317-
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
318-
}
319-
if len(resp.Events) != 1 {
320-
t.Fatalf("failed to get events from the response")
321-
}
322-
if resp.Events[0].Kv.ModRevision != rev {
323-
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
298+
test := func(delay time.Duration) func(t *testing.T) {
299+
return func(t *testing.T) {
300+
b, tmpPath := backend.NewDefaultTmpBackend()
301+
s := newWatchableStore(b, &lease.FakeLessor{}, nil)
302+
defer cleanup(s, b, tmpPath)
303+
304+
testKey := []byte("foo")
305+
testValue := []byte("bar")
306+
rev := s.Put(testKey, testValue, lease.NoLease)
307+
308+
newBackend, newPath := backend.NewDefaultTmpBackend()
309+
newStore := newWatchableStore(newBackend, &lease.FakeLessor{}, nil)
310+
defer cleanup(newStore, newBackend, newPath)
311+
312+
w := newStore.NewWatchStream()
313+
w.Watch(0, testKey, nil, rev-1)
314+
315+
time.Sleep(delay)
316+
317+
newStore.Restore(b)
318+
select {
319+
case resp := <-w.Chan():
320+
if resp.Revision != rev {
321+
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
322+
}
323+
if len(resp.Events) != 1 {
324+
t.Fatalf("failed to get events from the response")
325+
}
326+
if resp.Events[0].Kv.ModRevision != rev {
327+
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
328+
}
329+
case <-time.After(time.Second):
330+
t.Fatal("failed to receive event in 1 second.")
331+
}
324332
}
325-
case <-time.After(time.Second):
326-
t.Fatal("failed to receive event in 1 second.")
327333
}
334+
335+
t.Run("Normal", test(0))
336+
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
328337
}
329338

330339
// TestWatchBatchUnsynced tests batching on unsynced watchers

0 commit comments

Comments
 (0)