diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 52b197f0803..8bb9b227ddd 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -19997,3 +19997,119 @@ func TestJetStreamPurgeExSeqInInteriorDeleteGap(t *testing.T) { }) } } + +func TestJetStreamDirectGetUpToTime(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + AllowDirect: true, + Storage: nats.FileStorage, + }) + require_NoError(t, err) + + for i := range 10 { + sendStreamMsg(t, nc, "foo", fmt.Sprintf("message %d", i+1)) + } + + sendRequest := func(mreq *JSApiMsgGetRequest) *nats.Subscription { + t.Helper() + req, err := json.Marshal(mreq) + require_NoError(t, err) + reply := nats.NewInbox() + sub, err := nc.SubscribeSync(reply) + require_NoError(t, err) + require_NoError(t, nc.PublishRequest("$JS.API.DIRECT.GET.TEST", reply, req)) + return sub + } + + checkResponses := func(t *testing.T, upToTime time.Time, expected ...string) { + t.Helper() + sub := sendRequest(&JSApiMsgGetRequest{MultiLastFor: []string{"foo"}, UpToTime: &upToTime}) + defer sub.Unsubscribe() + for _, expect := range expected { + msg, err := sub.NextMsg(25 * time.Millisecond) + require_NoError(t, err) + require_Equal(t, msg.Header.Get(JSSubject), "foo") + require_Equal(t, bytesToString(msg.Data), expect) + } + // By this time we're either at the end of our expected and looking + // for an EOB marker (204) or we're not finding anything (404). + msg, err := sub.NextMsg(25 * time.Millisecond) + require_NoError(t, err) + if len(expected) == 0 { + require_Equal(t, msg.Header.Get("Status"), "404") + } else { + require_Equal(t, msg.Header.Get("Status"), "204") + } + } + + t.Run("DistantPast", func(t *testing.T) { + checkResponses(t, time.Time{}) + }) + + t.Run("DistantFuture", func(t *testing.T) { + checkResponses(t, time.Unix(0, math.MaxInt64), "message 10") + }) + + t.Run("BeforeFirstSeq", func(t *testing.T) { + first, err := js.GetMsg("TEST", 1) + require_NoError(t, err) + checkResponses(t, first.Time) + }) + + t.Run("BeforeFifthSeq", func(t *testing.T) { + fifth, err := js.GetMsg("TEST", 5) + require_NoError(t, err) + checkResponses(t, fifth.Time, "message 4") + }) +} + +func TestJetStreamDirectGetStartTimeSingleMsg(t *testing.T) { + for _, storage := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} { + t.Run(storage.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + AllowDirect: true, + Storage: storage, + }) + require_NoError(t, err) + + sendStreamMsg(t, nc, "foo", "message") + + sendRequest := func(mreq *JSApiMsgGetRequest) *nats.Subscription { + t.Helper() + req, err := json.Marshal(mreq) + require_NoError(t, err) + reply := nats.NewInbox() + sub, err := nc.SubscribeSync(reply) + require_NoError(t, err) + require_NoError(t, nc.PublishRequest("$JS.API.DIRECT.GET.TEST", reply, req)) + return sub + } + + first, err := js.GetMsg("TEST", 1) + require_NoError(t, err) + + future := first.Time.Add(10 * time.Second) + sub := sendRequest(&JSApiMsgGetRequest{StartTime: &future, NextFor: "foo", Batch: 1}) + defer sub.Unsubscribe() + + msg, err := sub.NextMsg(25 * time.Millisecond) + require_NoError(t, err) + require_Equal(t, msg.Header.Get("Status"), "404") + }) + } +} diff --git a/server/memstore.go b/server/memstore.go index 79ffbfdef5c..4da9adff467 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -369,7 +369,7 @@ func (ms *memStore) GetSeqFromTime(t time.Time) uint64 { } } if lmsg == nil { - return ms.state.FirstSeq + return ms.state.LastSeq + 1 } last := lmsg.ts diff --git a/server/stream.go b/server/stream.go index 65afec733f8..8e8029e4ace 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4542,8 +4542,15 @@ func (mset *stream) getDirectMulti(req *JSApiMsgGetRequest, reply string) { // If we have UpToTime set get the proper sequence. if req.UpToTime != nil { upToSeq = store.GetSeqFromTime((*req.UpToTime).UTC()) + // Avoid selecting a first sequence that will take us to before the stream first + // sequence, otherwise we can return messages after the supplied UpToTime. + if upToSeq <= mset.state().FirstSeq { + hdr := []byte("NATS/1.0 404 No Results\r\n\r\n") + mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0)) + return + } // We need to back off one since this is used to determine start sequence normally, - // were as here we want it to be the ceiling. + // whereas here we want it to be the ceiling. upToSeq-- } // If not set, set to the last sequence and remember that for EOB.