Skip to content

RUST-521 Implement naive streaming and resume token caching for change streams #531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions src/change_stream/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
//! Contains the types related to a `ChangeStream` event.
use crate::coll::Namespace;
use bson::{Bson, Document};
use std::convert::TryInto;

use crate::{
coll::Namespace,
cursor::CursorSpecification,
error::Result,
options::ChangeStreamOptions,
};

use bson::{Bson, Document, RawBson, RawDocument, RawDocumentBuf};
use serde::{Deserialize, Serialize};

/// An opaque token used for resuming an interrupted
Expand All @@ -15,7 +23,31 @@ use serde::{Deserialize, Serialize};
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more
/// information on resume tokens.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ResumeToken(pub(crate) Bson);
pub struct ResumeToken(pub(crate) RawBson);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using RawBson here means the token can be essentially handled as a (nearly) uninterpreted byte blob.


impl ResumeToken {
pub(crate) fn initial(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we update this so that both values don't always need to be created? e.g.

match spec.post_batch_resume_token {
    Some(token) if spec.initial_buffer.is_empty() => token,
    None => // token from options
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's much nicer, thank you!

options: &Option<ChangeStreamOptions>,
spec: &CursorSpecification,
) -> Option<ResumeToken> {
// Token from options passed to `watch`
let options_token = options
.as_ref()
.and_then(|o| o.start_after.as_ref().or_else(|| o.resume_after.as_ref()))
.cloned();
// Token from initial response from `aggregate`
let spec_token = if spec.initial_buffer.is_empty() {
spec.post_batch_resume_token.clone()
} else {
None
};
spec_token.or(options_token)
}

pub(crate) fn from_raw(doc: Option<RawDocumentBuf>) -> Option<ResumeToken> {
doc.map(|doc| ResumeToken(RawBson::Document(doc)))
}
}

/// A `ChangeStreamEvent` represents a
/// [change event](https://docs.mongodb.com/manual/reference/change-events/) in the associated change stream.
Expand Down
94 changes: 85 additions & 9 deletions src/change_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub(crate) mod options;
pub mod session;

use std::{
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
Expand All @@ -18,6 +19,7 @@ use crate::{
event::{ChangeStreamEvent, ResumeToken},
options::ChangeStreamOptions,
},
cursor::{stream_poll_next, BatchValue, CursorStream, NextInBatchFuture},
error::Result,
operation::AggregateTarget,
options::AggregateOptions,
Expand Down Expand Up @@ -83,14 +85,25 @@ where

/// The information associate with this change stream.
data: ChangeStreamData,

/// The cached resume token.
resume_token: Option<ResumeToken>,
}

impl<T> ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub(crate) fn new(cursor: Cursor<T>, data: ChangeStreamData) -> Self {
Self { cursor, data }
pub(crate) fn new(
cursor: Cursor<T>,
data: ChangeStreamData,
resume_token: Option<ResumeToken>,
) -> Self {
Self {
cursor,
data,
resume_token,
}
}

/// Returns the cached resume token that can be used to resume after the most recently returned
Expand All @@ -100,16 +113,48 @@ where
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more
/// information on change stream resume tokens.
pub fn resume_token(&self) -> Option<&ResumeToken> {
todo!()
self.resume_token.as_ref()
}

/// Update the type streamed values will be parsed as.
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> ChangeStream<D> {
ChangeStream {
cursor: self.cursor.with_type(),
data: self.data,
resume_token: self.resume_token,
}
}

/// Retrieve the next result from the change stream, if any.
///
/// Where calling `Stream::next` will internally loop until a change document is received,
/// this will make at most one request and return `None` if the returned document batch is
/// empty. This method should be used when storing the resume token in order to ensure the
/// most up to date token is received, e.g.
///
/// ```ignore
/// # use mongodb::{Client, error::Result};
/// # async fn func() -> Result<()> {
/// # let client = Client::with_uri_str("mongodb://example.com").await?;
/// # let coll = client.database("foo").collection("bar");
/// let mut change_stream = coll.watch(None, None).await?;
/// let mut resume_token = None;
/// loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes me think we'll need to include an is_alive method or something like that to allow this loop to terminate in the event the stream is closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good thought, done.

/// if let Some(event) = change_stream.next_if_any() {
/// // process event
/// }
/// resume_token = change_stream.resume_token().cloned();
/// }
/// #
/// # Ok(())
/// # }
/// ```
pub async fn next_if_any<'a>(&'a mut self) -> Result<Option<T>> {
Ok(match NextInBatchFuture::new(self).await? {
BatchValue::Some { doc, .. } => Some(bson::from_slice(doc.as_bytes())?),
BatchValue::Empty | BatchValue::Exhausted => None,
})
}
}

#[derive(Debug)]
Expand All @@ -125,9 +170,6 @@ pub(crate) struct ChangeStreamData {
/// an automatic resume.
target: AggregateTarget,

/// The cached resume token.
resume_token: Option<ResumeToken>,

/// The options provided to the initial `$changeStream` stage.
options: Option<ChangeStreamOptions>,

Expand All @@ -151,21 +193,55 @@ impl ChangeStreamData {
pipeline,
client,
target,
resume_token: None,
options,
resume_attempted: false,
document_returned: false,
}
}
}

fn get_resume_token(
batch_value: &BatchValue,
batch_token: Option<&ResumeToken>,
) -> Result<Option<ResumeToken>> {
Ok(match batch_value {
BatchValue::Some { doc, is_last } => {
if *is_last && batch_token.is_some() {
batch_token.cloned()
} else {
doc.get("_id")?.map(|val| ResumeToken(val.to_raw_bson()))
}
}
BatchValue::Empty => batch_token.cloned(),
_ => None,
})
}

impl<T> CursorStream for ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
let out = self.cursor.poll_next_in_batch(cx);
match &out {
Poll::Ready(Ok(bv)) => {
if let Some(token) = get_resume_token(bv, self.cursor.post_batch_resume_token())? {
self.resume_token = Some(token);
}
}
_ => {}
}
out
}
}

impl<T> Stream for ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
type Item = Result<T>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
todo!()
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
stream_poll_next(Pin::into_inner(self), cx)
}
}
105 changes: 96 additions & 9 deletions src/change_stream/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ use std::{

use bson::Document;
use futures_core::Stream;
use futures_util::StreamExt;
use serde::de::DeserializeOwned;

use crate::{error::Result, ClientSession, SessionCursor, SessionCursorStream};
use crate::{
cursor::{BatchValue, CursorStream, NextInBatchFuture},
error::Result,
ClientSession,
SessionCursor,
SessionCursorStream,
};

use super::{event::ResumeToken, ChangeStreamData};
use super::{event::ResumeToken, get_resume_token, stream_poll_next, ChangeStreamData};

/// A [`SessionChangeStream`] is a change stream that was created with a [`ClientSession`] that must
/// be iterated using one. To iterate, use [`SessionChangeStream::next`] or retrieve a
Expand Down Expand Up @@ -45,14 +52,23 @@ where
{
cursor: SessionCursor<T>,
data: ChangeStreamData,
resume_token: Option<ResumeToken>,
}

impl<T> SessionChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub(crate) fn new(cursor: SessionCursor<T>, data: ChangeStreamData) -> Self {
Self { cursor, data }
pub(crate) fn new(
cursor: SessionCursor<T>,
data: ChangeStreamData,
resume_token: Option<ResumeToken>,
) -> Self {
Self {
cursor,
data,
resume_token,
}
}

/// Returns the cached resume token that can be used to resume after the most recently returned
Expand All @@ -62,12 +78,16 @@ where
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more
/// information on change stream resume tokens.
pub fn resume_token(&self) -> Option<&ResumeToken> {
todo!()
self.resume_token.as_ref()
}

/// Update the type streamed values will be parsed as.
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> SessionChangeStream<D> {
todo!()
SessionChangeStream {
cursor: self.cursor.with_type(),
data: self.data,
resume_token: self.resume_token,
}
}

/// Retrieves a [`SessionCursorStream`] to iterate this change stream. The session provided must
Expand Down Expand Up @@ -117,7 +137,10 @@ where
&mut self,
session: &'session mut ClientSession,
) -> SessionChangeStreamValues<'_, 'session, T> {
todo!()
SessionChangeStreamValues {
stream: self.cursor.stream(session),
resume_token: &mut self.resume_token,
}
}

/// Retrieve the next result from the change stream.
Expand Down Expand Up @@ -145,7 +168,36 @@ where
/// # }
/// ```
pub async fn next(&mut self, session: &mut ClientSession) -> Option<Result<T>> {
todo!()
self.values(session).next().await
}

/// Retrieve the next result from the change stream, if any.
///
/// Where calling `next` will internally loop until a change document is received,
/// this will make at most one request and return `None` if the returned document batch is
/// empty. This method should be used when storing the resume token in order to ensure the
/// most up to date token is received, e.g.
///
/// ```ignore
/// # use mongodb::{Client, error::Result};
/// # async fn func() -> Result<()> {
/// # let client = Client::with_uri_str("mongodb://example.com").await?;
/// # let coll = client.database("foo").collection("bar");
/// # let mut session = client.start_session(None).await?;
/// let mut change_stream = coll.watch_with_session(None, None, &mut session).await?;
/// let mut resume_token = None;
/// loop {
/// if let Some(event) = change_stream.next_if_any(&mut session) {
/// // process event
/// }
/// resume_token = change_stream.resume_token().cloned();
/// }
/// #
/// # Ok(())
/// # }
/// ```
pub async fn next_if_any<'a>(&'a mut self, session: &mut ClientSession) -> Result<Option<T>> {
self.values(session).next_if_any().await
}
}

Expand All @@ -160,6 +212,41 @@ where
T: DeserializeOwned + Unpin + Send + Sync,
{
stream: SessionCursorStream<'cursor, 'session, T>,
resume_token: &'cursor mut Option<ResumeToken>,
}

impl<'cursor, 'session, T> SessionChangeStreamValues<'cursor, 'session, T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub fn resume_token(&self) -> Option<&ResumeToken> {
self.resume_token.as_ref()
}

pub async fn next_if_any<'a>(&'a mut self) -> Result<Option<T>> {
Ok(match NextInBatchFuture::new(self).await? {
BatchValue::Some { doc, .. } => Some(bson::from_slice(doc.as_bytes())?),
BatchValue::Empty | BatchValue::Exhausted => None,
})
}
}

impl<'cursor, 'session, T> CursorStream for SessionChangeStreamValues<'cursor, 'session, T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
let out = self.stream.poll_next_in_batch(cx);
match &out {
Poll::Ready(Ok(bv)) => {
if let Some(token) = get_resume_token(bv, self.stream.post_batch_resume_token())? {
*self.resume_token = Some(token);
}
}
_ => {}
}
out
}
}

impl<'cursor, 'session, T> Stream for SessionChangeStreamValues<'cursor, 'session, T>
Expand All @@ -169,6 +256,6 @@ where
type Item = Result<T>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!()
stream_poll_next(Pin::into_inner(self), cx)
}
}
Loading