Skip to content

fix: limit the amount of pending-accept reset streams #668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ pub struct Builder {
/// Maximum number of locally reset streams to keep at a time.
reset_stream_max: usize,

/// Maximum number of remotely reset streams to allow in the pending
/// accept queue.
pending_accept_reset_stream_max: usize,

/// Initial `Settings` frame to send as part of the handshake.
settings: Settings,

Expand Down Expand Up @@ -634,6 +638,7 @@ impl Builder {
max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
initial_target_connection_window_size: None,
initial_max_send_streams: usize::MAX,
settings: Default::default(),
Expand Down Expand Up @@ -966,6 +971,49 @@ impl Builder {
self
}

/// Sets the maximum number of pending-accept remotely-reset streams.
///
/// Streams that have been received by the peer, but not accepted by the
/// user, can also receive a RST_STREAM. This is a legitimate pattern: one
/// could send a request and then shortly after, realize it is not needed,
/// sending a CANCEL.
///
/// However, since those streams are now "closed", they don't count towards
/// the max concurrent streams. So, they will sit in the accept queue,
/// using memory.
///
/// When the number of remotely-reset streams sitting in the pending-accept
/// queue reaches this maximum value, a connection error with the code of
/// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
/// `Future`.
///
/// The default value is currently 20, but could change.
///
/// # Examples
///
/// ```
/// # use tokio::io::{AsyncRead, AsyncWrite};
/// # use h2::client::*;
/// # use bytes::Bytes;
/// #
/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
/// # {
/// // `client_fut` is a future representing the completion of the HTTP/2
/// // handshake.
/// let client_fut = Builder::new()
/// .max_pending_accept_reset_streams(100)
/// .handshake(my_io);
/// # client_fut.await
/// # }
/// #
/// # pub fn main() {}
/// ```
pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
self.pending_accept_reset_stream_max = max;
self
}

/// Sets the maximum send buffer size per stream.
///
/// Once a stream has buffered up to (or over) the maximum, the stream's
Expand Down Expand Up @@ -1209,6 +1257,7 @@ where
max_send_buffer_size: builder.max_send_buffer_size,
reset_stream_duration: builder.reset_stream_duration,
reset_stream_max: builder.reset_stream_max,
remote_reset_stream_max: builder.pending_accept_reset_stream_max,
settings: builder.settings.clone(),
},
);
Expand Down
7 changes: 7 additions & 0 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub(crate) struct Config {
pub max_send_buffer_size: usize,
pub reset_stream_duration: Duration,
pub reset_stream_max: usize,
pub remote_reset_stream_max: usize,
pub settings: frame::Settings,
}

Expand Down Expand Up @@ -118,6 +119,7 @@ where
.unwrap_or(false),
local_reset_duration: config.reset_stream_duration,
local_reset_max: config.reset_stream_max,
remote_reset_max: config.remote_reset_stream_max,
remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
remote_max_initiated: config
.settings
Expand Down Expand Up @@ -172,6 +174,11 @@ where
self.inner.streams.max_recv_streams()
}

#[cfg(feature = "unstable")]
pub fn num_wired_streams(&self) -> usize {
self.inner.streams.num_wired_streams()
}

/// Returns `Ready` when the connection is ready to receive a frame.
///
/// Returns `Error` as this may raise errors that are caused by delayed
Expand Down
1 change: 1 addition & 0 deletions src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub type WindowSize = u32;

// Constants
pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1;
pub const DEFAULT_REMOTE_RESET_STREAM_MAX: usize = 20;
pub const DEFAULT_RESET_STREAM_MAX: usize = 10;
pub const DEFAULT_RESET_STREAM_SECS: u64 = 30;
pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400;
51 changes: 43 additions & 8 deletions src/proto/streams/counts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@ pub(super) struct Counts {
num_recv_streams: usize,

/// Maximum number of pending locally reset streams
max_reset_streams: usize,
max_local_reset_streams: usize,

/// Current number of pending locally reset streams
num_reset_streams: usize,
num_local_reset_streams: usize,

/// Max number of "pending accept" streams that were remotely reset
max_remote_reset_streams: usize,

/// Current number of "pending accept" streams that were remotely reset
num_remote_reset_streams: usize,
}

impl Counts {
Expand All @@ -36,8 +42,10 @@ impl Counts {
num_send_streams: 0,
max_recv_streams: config.remote_max_initiated.unwrap_or(usize::MAX),
num_recv_streams: 0,
max_reset_streams: config.local_reset_max,
num_reset_streams: 0,
max_local_reset_streams: config.local_reset_max,
num_local_reset_streams: 0,
max_remote_reset_streams: config.remote_reset_max,
num_remote_reset_streams: 0,
}
}

Expand Down Expand Up @@ -90,7 +98,7 @@ impl Counts {

/// Returns true if the number of pending reset streams can be incremented.
pub fn can_inc_num_reset_streams(&self) -> bool {
self.max_reset_streams > self.num_reset_streams
self.max_local_reset_streams > self.num_local_reset_streams
}

/// Increments the number of pending reset streams.
Expand All @@ -101,7 +109,34 @@ impl Counts {
pub fn inc_num_reset_streams(&mut self) {
assert!(self.can_inc_num_reset_streams());

self.num_reset_streams += 1;
self.num_local_reset_streams += 1;
}

pub(crate) fn max_remote_reset_streams(&self) -> usize {
self.max_remote_reset_streams
}

/// Returns true if the number of pending REMOTE reset streams can be
/// incremented.
pub(crate) fn can_inc_num_remote_reset_streams(&self) -> bool {
self.max_remote_reset_streams > self.num_remote_reset_streams
}

/// Increments the number of pending REMOTE reset streams.
///
/// # Panics
///
/// Panics on failure as this should have been validated before hand.
pub(crate) fn inc_num_remote_reset_streams(&mut self) {
assert!(self.can_inc_num_remote_reset_streams());

self.num_remote_reset_streams += 1;
}

pub(crate) fn dec_num_remote_reset_streams(&mut self) {
assert!(self.num_remote_reset_streams > 0);

self.num_remote_reset_streams -= 1;
}

pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
Expand Down Expand Up @@ -194,8 +229,8 @@ impl Counts {
}

fn dec_num_reset_streams(&mut self) {
assert!(self.num_reset_streams > 0);
self.num_reset_streams -= 1;
assert!(self.num_local_reset_streams > 0);
self.num_local_reset_streams -= 1;
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/proto/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ pub struct Config {
/// Maximum number of locally reset streams to keep at a time
pub local_reset_max: usize,

/// Maximum number of remotely reset "pending accept" streams to keep at a
/// time. Going over this number results in a connection error.
pub remote_reset_max: usize,

/// Initial window size of remote initiated streams
pub remote_init_window_sz: WindowSize,

Expand Down
30 changes: 28 additions & 2 deletions src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,12 +741,39 @@ impl Recv {
}

/// Handle remote sending an explicit RST_STREAM.
pub fn recv_reset(&mut self, frame: frame::Reset, stream: &mut Stream) {
pub fn recv_reset(
&mut self,
frame: frame::Reset,
stream: &mut Stream,
counts: &mut Counts,
) -> Result<(), Error> {
// Reseting a stream that the user hasn't accepted is possible,
// but should be done with care. These streams will continue
// to take up memory in the accept queue, but will no longer be
// counted as "concurrent" streams.
//
// So, we have a separate limit for these.
//
// See https://github.com/hyperium/hyper/issues/2877
if stream.is_pending_accept {
if counts.can_inc_num_remote_reset_streams() {
counts.inc_num_remote_reset_streams();
} else {
tracing::warn!(
"recv_reset; remotely-reset pending-accept streams reached limit ({:?})",
counts.max_remote_reset_streams(),
);
return Err(Error::library_go_away(Reason::ENHANCE_YOUR_CALM));
}
}

// Notify the stream
stream.state.recv_reset(frame, stream.is_pending_send);

stream.notify_send();
stream.notify_recv();

Ok(())
}

/// Handle a connection-level error
Expand Down Expand Up @@ -1033,7 +1060,6 @@ impl Recv {
cx: &Context,
stream: &mut Stream,
) -> Poll<Option<Result<Bytes, proto::Error>>> {
// TODO: Return error when the stream is reset
match stream.pending_recv.pop_front(&mut self.buffer) {
Some(Event::Data(payload)) => Poll::Ready(Some(Ok(payload))),
Some(event) => {
Expand Down
7 changes: 7 additions & 0 deletions src/proto/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,13 @@ impl State {
}
}

pub fn is_remote_reset(&self) -> bool {
match self.inner {
Closed(Cause::Error(ref e)) => e.is_local(),
_ => false,
}
}

/// Returns true if the stream is already reset.
pub fn is_reset(&self) -> bool {
match self.inner {
Expand Down
8 changes: 7 additions & 1 deletion src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ where
// TODO: ideally, OpaqueStreamRefs::new would do this, but we're holding
// the lock, so it can't.
me.refs += 1;

// Pending-accepted remotely-reset streams are counted.
if stream.state.is_remote_reset() {
me.counts.dec_num_remote_reset_streams();
}

StreamRef {
opaque: OpaqueStreamRef::new(self.inner.clone(), stream),
send_buffer: self.send_buffer.clone(),
Expand Down Expand Up @@ -601,7 +607,7 @@ impl Inner {
let actions = &mut self.actions;

self.counts.transition(stream, |counts, stream| {
actions.recv.recv_reset(frame, stream);
actions.recv.recv_reset(frame, stream, counts)?;
actions.send.handle_error(send_buffer, stream, counts);
assert!(stream.state.is_closed());
Ok(())
Expand Down
56 changes: 56 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ pub struct Builder {
/// Maximum number of locally reset streams to keep at a time.
reset_stream_max: usize,

/// Maximum number of remotely reset streams to allow in the pending
/// accept queue.
pending_accept_reset_stream_max: usize,

/// Initial `Settings` frame to send as part of the handshake.
settings: Settings,

Expand Down Expand Up @@ -576,6 +580,13 @@ where
pub fn max_concurrent_recv_streams(&self) -> usize {
self.connection.max_recv_streams()
}

// Could disappear at anytime.
#[doc(hidden)]
#[cfg(feature = "unstable")]
pub fn num_wired_streams(&self) -> usize {
self.connection.num_wired_streams()
}
}

#[cfg(feature = "stream")]
Expand Down Expand Up @@ -635,6 +646,7 @@ impl Builder {
Builder {
reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
settings: Settings::default(),
initial_target_connection_window_size: None,
max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
Expand Down Expand Up @@ -875,6 +887,49 @@ impl Builder {
self
}

/// Sets the maximum number of pending-accept remotely-reset streams.
///
/// Streams that have been received by the peer, but not accepted by the
/// user, can also receive a RST_STREAM. This is a legitimate pattern: one
/// could send a request and then shortly after, realize it is not needed,
/// sending a CANCEL.
///
/// However, since those streams are now "closed", they don't count towards
/// the max concurrent streams. So, they will sit in the accept queue,
/// using memory.
///
/// When the number of remotely-reset streams sitting in the pending-accept
/// queue reaches this maximum value, a connection error with the code of
/// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
/// `Future`.
///
/// The default value is currently 20, but could change.
///
/// # Examples
///
///
/// ```
/// # use tokio::io::{AsyncRead, AsyncWrite};
/// # use h2::server::*;
/// #
/// # fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
/// # -> Handshake<T>
/// # {
/// // `server_fut` is a future representing the completion of the HTTP/2
/// // handshake.
/// let server_fut = Builder::new()
/// .max_pending_accept_reset_streams(100)
/// .handshake(my_io);
/// # server_fut
/// # }
/// #
/// # pub fn main() {}
/// ```
pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
self.pending_accept_reset_stream_max = max;
self
}

/// Sets the maximum send buffer size per stream.
///
/// Once a stream has buffered up to (or over) the maximum, the stream's
Expand Down Expand Up @@ -1305,6 +1360,7 @@ where
max_send_buffer_size: self.builder.max_send_buffer_size,
reset_stream_duration: self.builder.reset_stream_duration,
reset_stream_max: self.builder.reset_stream_max,
remote_reset_stream_max: self.builder.pending_accept_reset_stream_max,
settings: self.builder.settings.clone(),
},
);
Expand Down
4 changes: 4 additions & 0 deletions tests/h2-support/src/frames.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ impl Mock<frame::GoAway> {
self.reason(frame::Reason::FRAME_SIZE_ERROR)
}

pub fn calm(self) -> Self {
self.reason(frame::Reason::ENHANCE_YOUR_CALM)
}

pub fn no_error(self) -> Self {
self.reason(frame::Reason::NO_ERROR)
}
Expand Down
Loading