-
Notifications
You must be signed in to change notification settings - Fork 178
RUST-521 Implement naive streaming and resume token caching for change streams #531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RUST-521 Implement naive streaming and resume token caching for change streams #531
Conversation
@@ -15,7 +23,31 @@ use serde::{Deserialize, Serialize}; | |||
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more | |||
/// information on resume tokens. | |||
#[derive(Clone, Debug, Deserialize, Serialize)] | |||
pub struct ResumeToken(pub(crate) Bson); | |||
pub struct ResumeToken(pub(crate) RawBson); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using RawBson
here means the token can be essentially handled as a (nearly) uninterpreted byte blob.
} | ||
} | ||
|
||
pub(crate) async fn execute_watch<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Watch needs its own version of these utility functions so the resume token from the initial aggregate
command can be preserved; conveniently, this tidied up the call sites quite a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we move resume token tracking to the ChangeStream
per my comment below, we can reuse execute_cursor_operation
in the body of this function, ditto for the session one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we need the CursorSpecification
in order to get the initial resume token, and that's not preserved or exposed by Cursor
.
@@ -37,8 +38,10 @@ where | |||
client: Client, | |||
info: CursorInformation, | |||
buffer: VecDeque<RawDocumentBuf>, | |||
post_batch_resume_token: Option<ResumeToken>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two tokens need to be tracked - the one returned by the most recent getMore
call, and the one that will be returned to users via resume_token()
. The two are only the same at the end of a batch.
} | ||
} | ||
|
||
pub(crate) async fn execute_watch<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we move resume token tracking to the ChangeStream
per my comment below, we can reuse execute_cursor_operation
in the body of this function, ditto for the session one.
src/cursor/common.rs
Outdated
if self.buffer.is_empty() && self.post_batch_resume_token.is_some() { | ||
self.post_batch_resume_token.clone() | ||
} else { | ||
doc.get("_id")?.map(|val| ResumeToken(val.to_raw_bson())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could potentially be an expensive lookup if the user is querying large documents with _id
projected out. I think it would probably be better to track these at the ChangeStream
level so that only users of ChangeStream
(which will be a small %) have to pay this cost. The post_batch_resume_token
will still need to be tracked at the cursor level, though.
Another thing to note, the spec requires that we provide a way for users to receive every resume token that gets cached. Currently, this cursor implementation will keep looping until the cursor closes or a document is received, so the tokens that get cached in between empty batches (from post_batch_resume_token
) aren't ever made available to the user. Most drivers get around this by providing a tryNext()
method which returns null if there aren't any documents available yet, and I think we could do something similar here (and exposed via a separate method on ChangeStream
). We'd need to refactor the implementation of this method into a separate function so we could reuse it for both the Stream
implementation and tryNext
, but otherwise I don't think the implementation of that should require too much new code. One issue is that we can't name the method try_next
, as this would clash with the methods provided by TryStreamExt
. Maybe something like next_in_batch
? I can't really think of a good name to be honest.
Implementing a tryNext
equivalent on GenericCursor
should make the tracking of the resume token at the ChangeStream
level much easier too, since I don't think you'd need to introduce any new intermediate types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated! I also realized that I hadn't implemented any of this for the session types, so added that in, and refactored things a bit to be able to share the bulk of the implementation.
31189e5
to
67b420f
Compare
} | ||
} | ||
|
||
pub(crate) struct NextInBatchFuture<'a, T>(&'a mut T); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems awkward to have to have a one-off struct to go from the desugared fn() -> Poll<T>
form back to .await
, but I guess this is the way until poll_fn
stabilizes.
info: self.info, | ||
pinned_connection: self.pinned_connection, | ||
_phantom: Default::default(), | ||
} | ||
} | ||
} | ||
|
||
impl<P, T> Stream for GenericCursor<P, T> | ||
pub(crate) trait CursorStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This trait allows the implementation of poll_next
to be shared via the stream_poll_next
fn; open to suggestions for a more descriptive name.
info: self.info, | ||
pinned_connection: self.pinned_connection, | ||
_phantom: Default::default(), | ||
} | ||
} | ||
} | ||
|
||
impl<P, T> Stream for GenericCursor<P, T> | ||
pub(crate) trait CursorStream { | ||
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used the next_in_batch
suggestion for internal methods but named the external next_if_any
since the batching behavior is not the main point of the method from a user's perspective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just one small suggestion! adding my LGTM now so as not to block this while I'm gone
pub struct ResumeToken(pub(crate) RawBson); | ||
|
||
impl ResumeToken { | ||
pub(crate) fn initial( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we update this so that both values don't always need to be created? e.g.
match spec.post_batch_resume_token {
Some(token) if spec.initial_buffer.is_empty() => token,
None => // token from options
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's much nicer, thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good! just have one suggestion
src/change_stream/mod.rs
Outdated
/// # let coll = client.database("foo").collection("bar"); | ||
/// let mut change_stream = coll.watch(None, None).await?; | ||
/// let mut resume_token = None; | ||
/// loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this makes me think we'll need to include an is_alive
method or something like that to allow this loop to terminate in the event the stream is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good thought, done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
RUST-521
The
Stream
impl just forwards to the underlying cursor; this will need to be updated for RUST-522 to resume on error, but that seemed like a big enough chunk of work to be worth doing separately.The token-tracking machinery is included in the base
Cursor
behavior but only exposed viaChangeStream
; the other option here would have been introducing a new type parallel toGenericCursor
and wrapping that directly inChangeStream
. That didn't seem like a good way to go since it would either involve a lot of duplication of things like the buffering and stream impl, or adding another nested doll to encapsulate the shared logic (i.e. something likeCursor
->CursorCommon
->GenericCursor
andChangeStream
->CursorCommon
->GenericCursorWithToken
).