-
Notifications
You must be signed in to change notification settings - Fork 178
RUST-522 Implement resume functionality for change streams #547
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RUST-522 Implement resume functionality for change streams #547
Conversation
@@ -227,11 +220,47 @@ where | |||
T: DeserializeOwned + Unpin + Send + Sync, | |||
{ | |||
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is essentially the manual state machine version of SessionChangeStream::next_if_any
. Hopefully at some point Rust gets reasonable first-class syntax for streams.
I considered trying to write this by wrapping an async function in a stream adaptor for clarity, but that ended up needing to be self-referential.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good! Great work with handling this, I know it has been a lot more complicated than originally anticipated. I have a few minor questions and one concern about handling implicit sessions. I agree with the approach to cutting out the Stream
+ SessionChangeStream
combo for now as we discussed in triage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, awesome job with this!
b8500be
to
637e33a
Compare
RUST-522
This lets change streams resume when encountering appropriate errors.
Unfortunately, I couldn't find a way to implement
SessionChangeStream::values
without resorting to unsafe code to dodge around lifetime restrictions; for now, I've removed that interface and filed RUST-1139 to track implementing it in the future. SinceSessionChangeStream
exposes bothnext
andnext_if_any
, iteration is still possible and reasonably convenient; the main loss is the API consistency with cursors and the convenient combinators available forStream
.