Skip to content

RUST-521 Implement naive streaming and resume token caching for change streams #531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Dec 15, 2021

Conversation

abr-egn
Copy link
Contributor

@abr-egn abr-egn commented Dec 2, 2021

RUST-521

The Stream impl just forwards to the underlying cursor; this will need to be updated for RUST-522 to resume on error, but that seemed like a big enough chunk of work to be worth doing separately.

The token-tracking machinery is included in the base Cursor behavior but only exposed via ChangeStream; the other option here would have been introducing a new type parallel to GenericCursor and wrapping that directly in ChangeStream. That didn't seem like a good way to go since it would either involve a lot of duplication of things like the buffering and stream impl, or adding another nested doll to encapsulate the shared logic (i.e. something like Cursor -> CursorCommon -> GenericCursor and ChangeStream -> CursorCommon -> GenericCursorWithToken).

@@ -15,7 +23,31 @@ use serde::{Deserialize, Serialize};
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more
/// information on resume tokens.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ResumeToken(pub(crate) Bson);
pub struct ResumeToken(pub(crate) RawBson);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using RawBson here means the token can be essentially handled as a (nearly) uninterpreted byte blob.

}
}

pub(crate) async fn execute_watch<T>(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Watch needs its own version of these utility functions so the resume token from the initial aggregate command can be preserved; conveniently, this tidied up the call sites quite a bit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we move resume token tracking to the ChangeStream per my comment below, we can reuse execute_cursor_operation in the body of this function, ditto for the session one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we need the CursorSpecification in order to get the initial resume token, and that's not preserved or exposed by Cursor.

@@ -37,8 +38,10 @@ where
client: Client,
info: CursorInformation,
buffer: VecDeque<RawDocumentBuf>,
post_batch_resume_token: Option<ResumeToken>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two tokens need to be tracked - the one returned by the most recent getMore call, and the one that will be returned to users via resume_token(). The two are only the same at the end of a batch.

@abr-egn abr-egn marked this pull request as ready for review December 2, 2021 19:23
}
}

pub(crate) async fn execute_watch<T>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we move resume token tracking to the ChangeStream per my comment below, we can reuse execute_cursor_operation in the body of this function, ditto for the session one.

if self.buffer.is_empty() && self.post_batch_resume_token.is_some() {
self.post_batch_resume_token.clone()
} else {
doc.get("_id")?.map(|val| ResumeToken(val.to_raw_bson()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could potentially be an expensive lookup if the user is querying large documents with _id projected out. I think it would probably be better to track these at the ChangeStream level so that only users of ChangeStream (which will be a small %) have to pay this cost. The post_batch_resume_token will still need to be tracked at the cursor level, though.

Another thing to note, the spec requires that we provide a way for users to receive every resume token that gets cached. Currently, this cursor implementation will keep looping until the cursor closes or a document is received, so the tokens that get cached in between empty batches (from post_batch_resume_token) aren't ever made available to the user. Most drivers get around this by providing a tryNext() method which returns null if there aren't any documents available yet, and I think we could do something similar here (and exposed via a separate method on ChangeStream). We'd need to refactor the implementation of this method into a separate function so we could reuse it for both the Stream implementation and tryNext, but otherwise I don't think the implementation of that should require too much new code. One issue is that we can't name the method try_next, as this would clash with the methods provided by TryStreamExt. Maybe something like next_in_batch? I can't really think of a good name to be honest.

Implementing a tryNext equivalent on GenericCursor should make the tracking of the resume token at the ChangeStream level much easier too, since I don't think you'd need to introduce any new intermediate types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated! I also realized that I hadn't implemented any of this for the session types, so added that in, and refactored things a bit to be able to share the bulk of the implementation.

@abr-egn abr-egn force-pushed the RUST-521/change-stream-stream branch from 31189e5 to 67b420f Compare December 13, 2021 18:06
}
}

pub(crate) struct NextInBatchFuture<'a, T>(&'a mut T);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems awkward to have to have a one-off struct to go from the desugared fn() -> Poll<T> form back to .await, but I guess this is the way until poll_fn stabilizes.

info: self.info,
pinned_connection: self.pinned_connection,
_phantom: Default::default(),
}
}
}

impl<P, T> Stream for GenericCursor<P, T>
pub(crate) trait CursorStream {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This trait allows the implementation of poll_next to be shared via the stream_poll_next fn; open to suggestions for a more descriptive name.

info: self.info,
pinned_connection: self.pinned_connection,
_phantom: Default::default(),
}
}
}

impl<P, T> Stream for GenericCursor<P, T>
pub(crate) trait CursorStream {
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the next_in_batch suggestion for internal methods but named the external next_if_any since the batching behavior is not the main point of the method from a user's perspective.

Copy link
Contributor

@isabelatkinson isabelatkinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just one small suggestion! adding my LGTM now so as not to block this while I'm gone

pub struct ResumeToken(pub(crate) RawBson);

impl ResumeToken {
pub(crate) fn initial(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we update this so that both values don't always need to be created? e.g.

match spec.post_batch_resume_token {
    Some(token) if spec.initial_buffer.is_empty() => token,
    None => // token from options
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's much nicer, thank you!

Copy link
Contributor

@patrickfreed patrickfreed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good! just have one suggestion

/// # let coll = client.database("foo").collection("bar");
/// let mut change_stream = coll.watch(None, None).await?;
/// let mut resume_token = None;
/// loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes me think we'll need to include an is_alive method or something like that to allow this loop to terminate in the event the stream is closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good thought, done.

Copy link
Contributor

@patrickfreed patrickfreed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@abr-egn abr-egn merged commit 7209033 into mongodb:master Dec 15, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants