diff --git a/src/client/pool.rs b/src/client/pool.rs index 95521d21..6ea29e5d 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -2,8 +2,9 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::error::Error; -use std::fmt; +use std::fmt::{self, Debug}; use std::future::Future; +use std::hash::Hash; use std::marker::Unpin; use std::ops::{Deref, DerefMut}; use std::pin::Pin; @@ -23,9 +24,9 @@ use crate::common::{exec::Exec, ready}; // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] -pub(super) struct Pool { +pub(super) struct Pool { // If the pool is disabled, this is None. - inner: Option>>>, + inner: Option>>>, } // Before using a pooled connection, make sure the sender is not dead. @@ -42,6 +43,10 @@ pub(super) trait Poolable: Unpin + Send + Sized + 'static { fn can_share(&self) -> bool; } +pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {} + +impl Key for T where T: Eq + Hash + Clone + Debug + Unpin + Send + 'static {} + /// When checking out a pooled connection, it might be that the connection /// only supports a single reservation, or it might be usable for many. /// @@ -61,16 +66,16 @@ pub(super) enum Reservation { } /// Simple type alias in case the key type needs to be adjusted. -pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc; +// pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc; -struct PoolInner { +struct PoolInner { // A flag that a connection is being established, and the connection // should be shared. This prevents making multiple HTTP/2 connections // to the same host. - connecting: HashSet, + connecting: HashSet, // These are internal Conns sitting in the event loop in the KeepAlive // state, waiting to receive a new Request to send on the socket. - idle: HashMap>>, + idle: HashMap>>, max_idle_per_host: usize, // These are outstanding Checkouts that are waiting for a socket to be // able to send a Request one. This is used when "racing" for a new @@ -81,7 +86,7 @@ struct PoolInner { // this list is checked for any parked Checkouts, and tries to notify // them that the Conn could be used instead of waiting for a brand new // connection. - waiters: HashMap>>, + waiters: HashMap>>, // A oneshot channel is used to allow the interval to be notified when // the Pool completely drops. That way, the interval can cancel immediately. #[cfg(feature = "runtime")] @@ -107,8 +112,8 @@ impl Config { } } -impl Pool { - pub(super) fn new(config: Config, __exec: &Exec) -> Pool { +impl Pool { + pub(super) fn new(config: Config, __exec: &Exec) -> Pool { let inner = if config.is_enabled() { Some(Arc::new(Mutex::new(PoolInner { connecting: HashSet::new(), @@ -145,10 +150,10 @@ impl Pool { } } -impl Pool { +impl Pool { /// Returns a `Checkout` which is a future that resolves if an idle /// connection becomes available. - pub(super) fn checkout(&self, key: Key) -> Checkout { + pub(super) fn checkout(&self, key: K) -> Checkout { Checkout { key, pool: self.clone(), @@ -158,7 +163,7 @@ impl Pool { /// Ensure that there is only ever 1 connecting task for HTTP/2 /// connections. This does nothing for HTTP/1. - pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option> { + pub(super) fn connecting(&self, key: &K, ver: Ver) -> Option> { if ver == Ver::Http2 { if let Some(ref enabled) = self.inner { let mut inner = enabled.lock().unwrap(); @@ -185,7 +190,7 @@ impl Pool { } #[cfg(test)] - fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner> { + fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner> { self.inner.as_ref().expect("enabled").lock().expect("lock") } @@ -210,9 +215,9 @@ impl Pool { pub(super) fn pooled( &self, - #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting, + #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting, value: T, - ) -> Pooled { + ) -> Pooled { let (value, pool_ref) = if let Some(ref enabled) = self.inner { match value.reserve() { #[cfg(feature = "http2")] @@ -252,7 +257,7 @@ impl Pool { } } - fn reuse(&self, key: &Key, value: T) -> Pooled { + fn reuse(&self, key: &K, value: T) -> Pooled { debug!("reuse idle connection for {:?}", key); // TODO: unhack this // In Pool::pooled(), which is used for inserting brand new connections, @@ -279,12 +284,12 @@ impl Pool { } /// Pop off this list, looking for a usable connection that hasn't expired. -struct IdlePopper<'a, T> { - key: &'a Key, +struct IdlePopper<'a, T, K> { + key: &'a K, list: &'a mut Vec>, } -impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { +impl<'a, T: Poolable + 'a, K: Debug> IdlePopper<'a, T, K> { fn pop(self, expiration: &Expiration) -> Option> { while let Some(entry) = self.list.pop() { // If the connection has been closed, or is older than our idle @@ -326,8 +331,8 @@ impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { } } -impl PoolInner { - fn put(&mut self, key: Key, value: T, __pool_ref: &Arc>>) { +impl PoolInner { + fn put(&mut self, key: K, value: T, __pool_ref: &Arc>>) { if value.can_share() && self.idle.contains_key(&key) { trace!("put; existing idle HTTP/2 connection for {:?}", key); return; @@ -397,7 +402,7 @@ impl PoolInner { /// A `Connecting` task is complete. Not necessarily successfully, /// but the lock is going away, so clean up. - fn connected(&mut self, key: &Key) { + fn connected(&mut self, key: &K) { let existed = self.connecting.remove(key); debug_assert!(existed, "Connecting dropped, key not in pool.connecting"); // cancel any waiters. if there are any, it's because @@ -407,7 +412,7 @@ impl PoolInner { } #[cfg(feature = "runtime")] - fn spawn_idle_interval(&mut self, pool_ref: &Arc>>) { + fn spawn_idle_interval(&mut self, pool_ref: &Arc>>) { let (dur, rx) = { if self.idle_interval_ref.is_some() { return; @@ -432,12 +437,12 @@ impl PoolInner { } } -impl PoolInner { +impl PoolInner { /// Any `FutureResponse`s that were created will have made a `Checkout`, /// and possibly inserted into the pool that it is waiting for an idle /// connection. If a user ever dropped that future, we need to clean out /// those parked senders. - fn clean_waiters(&mut self, key: &Key) { + fn clean_waiters(&mut self, key: &K) { let mut remove_waiters = false; if let Some(waiters) = self.waiters.get_mut(key) { waiters.retain(|tx| !tx.is_canceled()); @@ -450,7 +455,7 @@ impl PoolInner { } #[cfg(feature = "runtime")] -impl PoolInner { +impl PoolInner { /// This should *only* be called by the IdleTask fn clear_expired(&mut self) { let dur = self.timeout.expect("interval assumes timeout"); @@ -481,8 +486,8 @@ impl PoolInner { } } -impl Clone for Pool { - fn clone(&self) -> Pool { +impl Clone for Pool { + fn clone(&self) -> Pool { Pool { inner: self.inner.clone(), } @@ -491,14 +496,14 @@ impl Clone for Pool { /// A wrapped poolable value that tries to reinsert to the Pool on Drop. // Note: The bounds `T: Poolable` is needed for the Drop impl. -pub(super) struct Pooled { +pub(super) struct Pooled { value: Option, is_reused: bool, - key: Key, - pool: WeakOpt>>, + key: K, + pool: WeakOpt>>, } -impl Pooled { +impl Pooled { pub(super) fn is_reused(&self) -> bool { self.is_reused } @@ -516,20 +521,20 @@ impl Pooled { } } -impl Deref for Pooled { +impl Deref for Pooled { type Target = T; fn deref(&self) -> &T { self.as_ref() } } -impl DerefMut for Pooled { +impl DerefMut for Pooled { fn deref_mut(&mut self) -> &mut T { self.as_mut() } } -impl Drop for Pooled { +impl Drop for Pooled { fn drop(&mut self) { if let Some(value) = self.value.take() { if !value.is_open() { @@ -551,7 +556,7 @@ impl Drop for Pooled { } } -impl fmt::Debug for Pooled { +impl fmt::Debug for Pooled { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Pooled").field("key", &self.key).finish() } @@ -564,9 +569,9 @@ struct Idle { // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] -pub(super) struct Checkout { - key: Key, - pool: Pool, +pub(super) struct Checkout { + key: K, + pool: Pool, waiter: Option>, } @@ -581,11 +586,11 @@ impl fmt::Display for CheckoutIsClosedError { } } -impl Checkout { +impl Checkout { fn poll_waiter( &mut self, cx: &mut task::Context<'_>, - ) -> Poll>>> { + ) -> Poll>>> { if let Some(mut rx) = self.waiter.take() { match Pin::new(&mut rx).poll(cx) { Poll::Ready(Ok(value)) => { @@ -608,7 +613,7 @@ impl Checkout { } } - fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option> { + fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option> { let entry = { let mut inner = self.pool.inner.as_ref()?.lock().unwrap(); let expiration = Expiration::new(inner.timeout); @@ -658,8 +663,8 @@ impl Checkout { } } -impl Future for Checkout { - type Output = crate::Result>; +impl Future for Checkout { + type Output = crate::Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { if let Some(pooled) = ready!(self.poll_waiter(cx)?) { @@ -678,7 +683,7 @@ impl Future for Checkout { } } -impl Drop for Checkout { +impl Drop for Checkout { fn drop(&mut self) { if self.waiter.take().is_some() { trace!("checkout dropped for {:?}", self.key); @@ -691,13 +696,13 @@ impl Drop for Checkout { // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] -pub(super) struct Connecting { - key: Key, - pool: WeakOpt>>, +pub(super) struct Connecting { + key: K, + pool: WeakOpt>>, } -impl Connecting { - pub(super) fn alpn_h2(self, pool: &Pool) -> Option { +impl Connecting { + pub(super) fn alpn_h2(self, pool: &Pool) -> Option { debug_assert!( self.pool.0.is_none(), "Connecting::alpn_h2 but already Http2" @@ -707,7 +712,7 @@ impl Connecting { } } -impl Drop for Connecting { +impl Drop for Connecting { fn drop(&mut self) { if let Some(pool) = self.pool.upgrade() { // No need to panic on drop, that could abort! @@ -736,10 +741,10 @@ impl Expiration { #[cfg(feature = "runtime")] pin_project_lite::pin_project! { - struct IdleTask { + struct IdleTask { #[pin] interval: Interval, - pool: WeakOpt>>, + pool: WeakOpt>>, // This allows the IdleTask to be notified as soon as the entire // Pool is fully dropped, and shutdown. This channel is never sent on, // but Err(Canceled) will be received when the Pool is dropped. @@ -749,7 +754,7 @@ pin_project_lite::pin_project! { } #[cfg(feature = "runtime")] -impl Future for IdleTask { +impl Future for IdleTask { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { @@ -794,7 +799,9 @@ impl WeakOpt { #[cfg(all(test, not(miri)))] mod tests { + use std::fmt::Debug; use std::future::Future; + use std::hash::Hash; use std::pin::Pin; use std::task::{self, Poll}; use std::time::Duration; @@ -802,6 +809,11 @@ mod tests { use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; use crate::common::exec::Exec; + #[derive(Clone, Debug, PartialEq, Eq, Hash)] + struct KeyImpl(http::uri::Scheme, http::uri::Authority); + + type KeyTuple = (http::uri::Scheme, http::uri::Authority); + /// Test unique reservations. #[derive(Debug, PartialEq, Eq)] struct Uniq(T); @@ -820,22 +832,22 @@ mod tests { } } - fn c(key: Key) -> Connecting { + fn c(key: K) -> Connecting { Connecting { key, pool: WeakOpt::none(), } } - fn host_key(s: &str) -> Key { - (http::uri::Scheme::HTTP, s.parse().expect("host key")) + fn host_key(s: &str) -> KeyImpl { + KeyImpl(http::uri::Scheme::HTTP, s.parse().expect("host key")) } - fn pool_no_timer() -> Pool { + fn pool_no_timer() -> Pool { pool_max_idle_no_timer(::std::usize::MAX) } - fn pool_max_idle_no_timer(max_idle: usize) -> Pool { + fn pool_max_idle_no_timer(max_idle: usize) -> Pool { let pool = Pool::new( super::Config { idle_timeout: Some(Duration::from_millis(100)), @@ -988,7 +1000,7 @@ mod tests { #[tokio::test] async fn test_pool_checkout_drop_cleans_up_waiters() { - let pool = pool_no_timer::>(); + let pool = pool_no_timer::, KeyImpl>(); let key = host_key("foo"); let mut checkout1 = pool.checkout(key.clone());