Skip to content

Commit e375872

Browse files
Cherry-picks for 2.11.4-RC.3 (#6913)
Includes the following: - #6912 - #6911 Signed-off-by: Neil Twigg <[email protected]>
2 parents 718136e + f941c3b commit e375872

File tree

4 files changed

+128
-3
lines changed

4 files changed

+128
-3
lines changed

server/filestore.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,9 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
516516
// Also make sure we get rid of old idx and fss files on return.
517517
// Do this in separate go routine vs inline and at end of processing.
518518
defer func() {
519-
go fs.cleanupOldMeta()
519+
if fs != nil {
520+
go fs.cleanupOldMeta()
521+
}
520522
}()
521523

522524
// Lock while we do enforcements and removals.

server/jetstream_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19997,3 +19997,119 @@ func TestJetStreamPurgeExSeqInInteriorDeleteGap(t *testing.T) {
1999719997
})
1999819998
}
1999919999
}
20000+
20001+
func TestJetStreamDirectGetUpToTime(t *testing.T) {
20002+
s := RunBasicJetStreamServer(t)
20003+
defer s.Shutdown()
20004+
20005+
nc, js := jsClientConnect(t, s)
20006+
defer nc.Close()
20007+
20008+
_, err := js.AddStream(&nats.StreamConfig{
20009+
Name: "TEST",
20010+
Subjects: []string{"foo"},
20011+
AllowDirect: true,
20012+
Storage: nats.FileStorage,
20013+
})
20014+
require_NoError(t, err)
20015+
20016+
for i := range 10 {
20017+
sendStreamMsg(t, nc, "foo", fmt.Sprintf("message %d", i+1))
20018+
}
20019+
20020+
sendRequest := func(mreq *JSApiMsgGetRequest) *nats.Subscription {
20021+
t.Helper()
20022+
req, err := json.Marshal(mreq)
20023+
require_NoError(t, err)
20024+
reply := nats.NewInbox()
20025+
sub, err := nc.SubscribeSync(reply)
20026+
require_NoError(t, err)
20027+
require_NoError(t, nc.PublishRequest("$JS.API.DIRECT.GET.TEST", reply, req))
20028+
return sub
20029+
}
20030+
20031+
checkResponses := func(t *testing.T, upToTime time.Time, expected ...string) {
20032+
t.Helper()
20033+
sub := sendRequest(&JSApiMsgGetRequest{MultiLastFor: []string{"foo"}, UpToTime: &upToTime})
20034+
defer sub.Unsubscribe()
20035+
for _, expect := range expected {
20036+
msg, err := sub.NextMsg(25 * time.Millisecond)
20037+
require_NoError(t, err)
20038+
require_Equal(t, msg.Header.Get(JSSubject), "foo")
20039+
require_Equal(t, bytesToString(msg.Data), expect)
20040+
}
20041+
// By this time we're either at the end of our expected and looking
20042+
// for an EOB marker (204) or we're not finding anything (404).
20043+
msg, err := sub.NextMsg(25 * time.Millisecond)
20044+
require_NoError(t, err)
20045+
if len(expected) == 0 {
20046+
require_Equal(t, msg.Header.Get("Status"), "404")
20047+
} else {
20048+
require_Equal(t, msg.Header.Get("Status"), "204")
20049+
}
20050+
}
20051+
20052+
t.Run("DistantPast", func(t *testing.T) {
20053+
checkResponses(t, time.Time{})
20054+
})
20055+
20056+
t.Run("DistantFuture", func(t *testing.T) {
20057+
checkResponses(t, time.Unix(0, math.MaxInt64), "message 10")
20058+
})
20059+
20060+
t.Run("BeforeFirstSeq", func(t *testing.T) {
20061+
first, err := js.GetMsg("TEST", 1)
20062+
require_NoError(t, err)
20063+
checkResponses(t, first.Time)
20064+
})
20065+
20066+
t.Run("BeforeFifthSeq", func(t *testing.T) {
20067+
fifth, err := js.GetMsg("TEST", 5)
20068+
require_NoError(t, err)
20069+
checkResponses(t, fifth.Time, "message 4")
20070+
})
20071+
}
20072+
20073+
func TestJetStreamDirectGetStartTimeSingleMsg(t *testing.T) {
20074+
for _, storage := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} {
20075+
t.Run(storage.String(), func(t *testing.T) {
20076+
s := RunBasicJetStreamServer(t)
20077+
defer s.Shutdown()
20078+
20079+
nc, js := jsClientConnect(t, s)
20080+
defer nc.Close()
20081+
20082+
_, err := js.AddStream(&nats.StreamConfig{
20083+
Name: "TEST",
20084+
Subjects: []string{"foo"},
20085+
AllowDirect: true,
20086+
Storage: storage,
20087+
})
20088+
require_NoError(t, err)
20089+
20090+
sendStreamMsg(t, nc, "foo", "message")
20091+
20092+
sendRequest := func(mreq *JSApiMsgGetRequest) *nats.Subscription {
20093+
t.Helper()
20094+
req, err := json.Marshal(mreq)
20095+
require_NoError(t, err)
20096+
reply := nats.NewInbox()
20097+
sub, err := nc.SubscribeSync(reply)
20098+
require_NoError(t, err)
20099+
require_NoError(t, nc.PublishRequest("$JS.API.DIRECT.GET.TEST", reply, req))
20100+
return sub
20101+
}
20102+
20103+
first, err := js.GetMsg("TEST", 1)
20104+
require_NoError(t, err)
20105+
20106+
future := first.Time.Add(10 * time.Second)
20107+
sub := sendRequest(&JSApiMsgGetRequest{StartTime: &future, NextFor: "foo", Batch: 1})
20108+
defer sub.Unsubscribe()
20109+
20110+
msg, err := sub.NextMsg(25 * time.Millisecond)
20111+
require_NoError(t, err)
20112+
require_Equal(t, msg.Header.Get("Status"), "404")
20113+
})
20114+
}
20115+
}

server/memstore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 {
369369
}
370370
}
371371
if lmsg == nil {
372-
return ms.state.FirstSeq
372+
return ms.state.LastSeq + 1
373373
}
374374

375375
last := lmsg.ts

server/stream.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4542,8 +4542,15 @@ func (mset *stream) getDirectMulti(req *JSApiMsgGetRequest, reply string) {
45424542
// If we have UpToTime set get the proper sequence.
45434543
if req.UpToTime != nil {
45444544
upToSeq = store.GetSeqFromTime((*req.UpToTime).UTC())
4545+
// Avoid selecting a first sequence that will take us to before the stream first
4546+
// sequence, otherwise we can return messages after the supplied UpToTime.
4547+
if upToSeq <= mset.state().FirstSeq {
4548+
hdr := []byte("NATS/1.0 404 No Results\r\n\r\n")
4549+
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
4550+
return
4551+
}
45454552
// We need to back off one since this is used to determine start sequence normally,
4546-
// were as here we want it to be the ceiling.
4553+
// whereas here we want it to be the ceiling.
45474554
upToSeq--
45484555
}
45494556
// If not set, set to the last sequence and remember that for EOB.

0 commit comments

Comments
 (0)