-
Notifications
You must be signed in to change notification settings - Fork 179
RUST-521 Implement naive streaming and resume token caching for change streams #531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 24 commits
629774b
2557955
15d93c9
e4819a3
7bbc26c
f2bb095
49ecb29
18da865
9fb7e48
8211cfb
2ca6f9d
58684d5
0eb841e
98cba67
d69d3e6
62e0d02
67b420f
b5081ec
9cf8eb5
5d11cee
cdcb246
fb522fa
b3108f3
451424b
f7ef6e2
b45f321
225695b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,14 @@ | ||
//! Contains the types related to a `ChangeStream` event. | ||
use crate::coll::Namespace; | ||
use bson::{Bson, Document}; | ||
use std::convert::TryInto; | ||
|
||
use crate::{ | ||
coll::Namespace, | ||
cursor::CursorSpecification, | ||
error::Result, | ||
options::ChangeStreamOptions, | ||
}; | ||
|
||
use bson::{Bson, Document, RawBson, RawDocument, RawDocumentBuf}; | ||
use serde::{Deserialize, Serialize}; | ||
|
||
/// An opaque token used for resuming an interrupted | ||
|
@@ -15,7 +23,31 @@ use serde::{Deserialize, Serialize}; | |
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more | ||
/// information on resume tokens. | ||
#[derive(Clone, Debug, Deserialize, Serialize)] | ||
pub struct ResumeToken(pub(crate) Bson); | ||
pub struct ResumeToken(pub(crate) RawBson); | ||
|
||
impl ResumeToken { | ||
pub(crate) fn initial( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can we update this so that both values don't always need to be created? e.g.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's much nicer, thank you! |
||
options: &Option<ChangeStreamOptions>, | ||
spec: &CursorSpecification, | ||
) -> Option<ResumeToken> { | ||
// Token from options passed to `watch` | ||
let options_token = options | ||
.as_ref() | ||
.and_then(|o| o.start_after.as_ref().or_else(|| o.resume_after.as_ref())) | ||
.cloned(); | ||
// Token from initial response from `aggregate` | ||
let spec_token = if spec.initial_buffer.is_empty() { | ||
spec.post_batch_resume_token.clone() | ||
} else { | ||
None | ||
}; | ||
spec_token.or(options_token) | ||
} | ||
|
||
pub(crate) fn from_raw(doc: Option<RawDocumentBuf>) -> Option<ResumeToken> { | ||
doc.map(|doc| ResumeToken(RawBson::Document(doc))) | ||
} | ||
} | ||
|
||
/// A `ChangeStreamEvent` represents a | ||
/// [change event](https://docs.mongodb.com/manual/reference/change-events/) in the associated change stream. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ pub(crate) mod options; | |
pub mod session; | ||
|
||
use std::{ | ||
future::Future, | ||
marker::PhantomData, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
|
@@ -18,6 +19,7 @@ use crate::{ | |
event::{ChangeStreamEvent, ResumeToken}, | ||
options::ChangeStreamOptions, | ||
}, | ||
cursor::{stream_poll_next, BatchValue, CursorStream, NextInBatchFuture}, | ||
error::Result, | ||
operation::AggregateTarget, | ||
options::AggregateOptions, | ||
|
@@ -83,14 +85,25 @@ where | |
|
||
/// The information associate with this change stream. | ||
data: ChangeStreamData, | ||
|
||
/// The cached resume token. | ||
resume_token: Option<ResumeToken>, | ||
} | ||
|
||
impl<T> ChangeStream<T> | ||
where | ||
T: DeserializeOwned + Unpin + Send + Sync, | ||
{ | ||
pub(crate) fn new(cursor: Cursor<T>, data: ChangeStreamData) -> Self { | ||
Self { cursor, data } | ||
pub(crate) fn new( | ||
cursor: Cursor<T>, | ||
data: ChangeStreamData, | ||
resume_token: Option<ResumeToken>, | ||
) -> Self { | ||
Self { | ||
cursor, | ||
data, | ||
resume_token, | ||
} | ||
} | ||
|
||
/// Returns the cached resume token that can be used to resume after the most recently returned | ||
|
@@ -100,16 +113,48 @@ where | |
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more | ||
/// information on change stream resume tokens. | ||
pub fn resume_token(&self) -> Option<&ResumeToken> { | ||
todo!() | ||
self.resume_token.as_ref() | ||
} | ||
|
||
/// Update the type streamed values will be parsed as. | ||
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> ChangeStream<D> { | ||
ChangeStream { | ||
cursor: self.cursor.with_type(), | ||
data: self.data, | ||
resume_token: self.resume_token, | ||
} | ||
} | ||
|
||
/// Retrieve the next result from the change stream, if any. | ||
/// | ||
/// Where calling `Stream::next` will internally loop until a change document is received, | ||
/// this will make at most one request and return `None` if the returned document batch is | ||
/// empty. This method should be used when storing the resume token in order to ensure the | ||
/// most up to date token is received, e.g. | ||
/// | ||
/// ```ignore | ||
/// # use mongodb::{Client, error::Result}; | ||
/// # async fn func() -> Result<()> { | ||
/// # let client = Client::with_uri_str("mongodb://example.com").await?; | ||
/// # let coll = client.database("foo").collection("bar"); | ||
/// let mut change_stream = coll.watch(None, None).await?; | ||
/// let mut resume_token = None; | ||
/// loop { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this makes me think we'll need to include an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, good thought, done. |
||
/// if let Some(event) = change_stream.next_if_any() { | ||
/// // process event | ||
/// } | ||
/// resume_token = change_stream.resume_token().cloned(); | ||
/// } | ||
/// # | ||
/// # Ok(()) | ||
/// # } | ||
/// ``` | ||
pub async fn next_if_any<'a>(&'a mut self) -> Result<Option<T>> { | ||
Ok(match NextInBatchFuture::new(self).await? { | ||
BatchValue::Some { doc, .. } => Some(bson::from_slice(doc.as_bytes())?), | ||
BatchValue::Empty | BatchValue::Exhausted => None, | ||
}) | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
|
@@ -125,9 +170,6 @@ pub(crate) struct ChangeStreamData { | |
/// an automatic resume. | ||
target: AggregateTarget, | ||
|
||
/// The cached resume token. | ||
resume_token: Option<ResumeToken>, | ||
|
||
/// The options provided to the initial `$changeStream` stage. | ||
options: Option<ChangeStreamOptions>, | ||
|
||
|
@@ -151,21 +193,55 @@ impl ChangeStreamData { | |
pipeline, | ||
client, | ||
target, | ||
resume_token: None, | ||
options, | ||
resume_attempted: false, | ||
document_returned: false, | ||
} | ||
} | ||
} | ||
|
||
fn get_resume_token( | ||
batch_value: &BatchValue, | ||
batch_token: Option<&ResumeToken>, | ||
) -> Result<Option<ResumeToken>> { | ||
Ok(match batch_value { | ||
BatchValue::Some { doc, is_last } => { | ||
if *is_last && batch_token.is_some() { | ||
batch_token.cloned() | ||
} else { | ||
doc.get("_id")?.map(|val| ResumeToken(val.to_raw_bson())) | ||
} | ||
} | ||
BatchValue::Empty => batch_token.cloned(), | ||
_ => None, | ||
}) | ||
} | ||
|
||
impl<T> CursorStream for ChangeStream<T> | ||
where | ||
T: DeserializeOwned + Unpin + Send + Sync, | ||
{ | ||
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> { | ||
let out = self.cursor.poll_next_in_batch(cx); | ||
match &out { | ||
Poll::Ready(Ok(bv)) => { | ||
if let Some(token) = get_resume_token(bv, self.cursor.post_batch_resume_token())? { | ||
self.resume_token = Some(token); | ||
} | ||
} | ||
_ => {} | ||
} | ||
out | ||
} | ||
} | ||
|
||
impl<T> Stream for ChangeStream<T> | ||
where | ||
T: DeserializeOwned + Unpin + Send + Sync, | ||
{ | ||
type Item = Result<T>; | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | ||
todo!() | ||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
stream_poll_next(Pin::into_inner(self), cx) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using
RawBson
here means the token can be essentially handled as a (nearly) uninterpreted byte blob.