Skip to content

Commit b553a52

Browse files
committed
Fix StartTime bug with memory store and only one message
Fixes #6032 by ensuring that the memory store doesn't return the first sequence incorrectly when there are no matching messages. Signed-off-by: Neil Twigg <[email protected]>
1 parent cbc7116 commit b553a52

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

server/jetstream_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20069,3 +20069,47 @@ func TestJetStreamDirectGetUpToTime(t *testing.T) {
2006920069
checkResponses(t, fifth.Time, "message 4")
2007020070
})
2007120071
}
20072+
20073+
func TestJetStreamDirectGetStartTimeSingleMsg(t *testing.T) {
20074+
for _, storage := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} {
20075+
t.Run(storage.String(), func(t *testing.T) {
20076+
s := RunBasicJetStreamServer(t)
20077+
defer s.Shutdown()
20078+
20079+
nc, js := jsClientConnect(t, s)
20080+
defer nc.Close()
20081+
20082+
_, err := js.AddStream(&nats.StreamConfig{
20083+
Name: "TEST",
20084+
Subjects: []string{"foo"},
20085+
AllowDirect: true,
20086+
Storage: storage,
20087+
})
20088+
require_NoError(t, err)
20089+
20090+
sendStreamMsg(t, nc, "foo", "message")
20091+
20092+
sendRequest := func(mreq *JSApiMsgGetRequest) *nats.Subscription {
20093+
t.Helper()
20094+
req, err := json.Marshal(mreq)
20095+
require_NoError(t, err)
20096+
reply := nats.NewInbox()
20097+
sub, err := nc.SubscribeSync(reply)
20098+
require_NoError(t, err)
20099+
require_NoError(t, nc.PublishRequest("$JS.API.DIRECT.GET.TEST", reply, req))
20100+
return sub
20101+
}
20102+
20103+
first, err := js.GetMsg("TEST", 1)
20104+
require_NoError(t, err)
20105+
20106+
future := first.Time.Add(10 * time.Second)
20107+
sub := sendRequest(&JSApiMsgGetRequest{StartTime: &future, NextFor: "foo", Batch: 1})
20108+
defer sub.Unsubscribe()
20109+
20110+
msg, err := sub.NextMsg(25 * time.Millisecond)
20111+
require_NoError(t, err)
20112+
require_Equal(t, msg.Header.Get("Status"), "404")
20113+
})
20114+
}
20115+
}

server/memstore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
369369
}
370370
}
371371
if lmsg == nil {
372-
return ms.state.FirstSeq
372+
return ms.state.LastSeq + 1
373373
}
374374

375375
last := lmsg.ts

0 commit comments

Comments
 (0)