@@ -1338,7 +1338,7 @@ mod conn {
1338
1338
use bytes:: { Buf , Bytes } ;
1339
1339
use futures_channel:: { mpsc, oneshot} ;
1340
1340
use futures_util:: future:: { self , poll_fn, FutureExt , TryFutureExt } ;
1341
- use http_body_util:: { BodyExt , Empty , StreamBody } ;
1341
+ use http_body_util:: { BodyExt , Empty , Full , StreamBody } ;
1342
1342
use hyper:: rt:: Timer ;
1343
1343
use tokio:: io:: { AsyncReadExt as _, AsyncWriteExt as _} ;
1344
1344
use tokio:: net:: { TcpListener as TkTcpListener , TcpStream } ;
@@ -2126,6 +2126,62 @@ mod conn {
2126
2126
. expect ( "client should be open" ) ;
2127
2127
}
2128
2128
2129
+ #[ tokio:: test]
2130
+ async fn http2_responds_before_consuming_request_body ( ) {
2131
+ // Test that a early-response from server works correctly (request body wasn't fully consumed).
2132
+ // https://github.com/hyperium/hyper/issues/2872
2133
+ use hyper:: service:: service_fn;
2134
+
2135
+ let _ = pretty_env_logger:: try_init ( ) ;
2136
+
2137
+ let ( listener, addr) = setup_tk_test_server ( ) . await ;
2138
+
2139
+ // Spawn an HTTP2 server that responds before reading the whole request body.
2140
+ // It's normal case to decline the request due to headers or size of the body.
2141
+ tokio:: spawn ( async move {
2142
+ let sock = TokioIo :: new ( listener. accept ( ) . await . unwrap ( ) . 0 ) ;
2143
+ hyper:: server:: conn:: http2:: Builder :: new ( TokioExecutor )
2144
+ . timer ( TokioTimer )
2145
+ . serve_connection (
2146
+ sock,
2147
+ service_fn ( |_req| async move {
2148
+ Ok :: < _ , hyper:: Error > ( Response :: new ( Full :: new ( Bytes :: from (
2149
+ "No bread for you!" ,
2150
+ ) ) ) )
2151
+ } ) ,
2152
+ )
2153
+ . await
2154
+ . expect ( "serve_connection" ) ;
2155
+ } ) ;
2156
+
2157
+ let io = tcp_connect ( & addr) . await . expect ( "tcp connect" ) ;
2158
+ let ( mut client, conn) = conn:: http2:: Builder :: new ( TokioExecutor )
2159
+ . timer ( TokioTimer )
2160
+ . handshake ( io)
2161
+ . await
2162
+ . expect ( "http handshake" ) ;
2163
+
2164
+ tokio:: spawn ( async move {
2165
+ conn. await . expect ( "client conn shouldn't error" ) ;
2166
+ } ) ;
2167
+
2168
+ // Use a channel to keep request stream open
2169
+ let ( _tx, recv) = mpsc:: channel :: < Result < Frame < Bytes > , Box < dyn Error + Send + Sync > > > ( 0 ) ;
2170
+ let req = Request :: post ( "/a" ) . body ( StreamBody :: new ( recv) ) . unwrap ( ) ;
2171
+ let resp = client. send_request ( req) . await . expect ( "send_request" ) ;
2172
+ assert ! ( resp. status( ) . is_success( ) ) ;
2173
+
2174
+ let mut body = String :: new ( ) ;
2175
+ concat ( resp. into_body ( ) )
2176
+ . await
2177
+ . unwrap ( )
2178
+ . reader ( )
2179
+ . read_to_string ( & mut body)
2180
+ . unwrap ( ) ;
2181
+
2182
+ assert_eq ! ( & body, "No bread for you!" ) ;
2183
+ }
2184
+
2129
2185
#[ tokio:: test]
2130
2186
async fn h2_connect ( ) {
2131
2187
let ( listener, addr) = setup_tk_test_server ( ) . await ;
0 commit comments