Skip to content

Commit f77508a

Browse files
committed
add client stream benchmark
Signed-off-by: Xintao <[email protected]>
1 parent 182dff0 commit f77508a

File tree

2 files changed

+54
-4
lines changed

2 files changed

+54
-4
lines changed

benchmark/src/bench.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
77
use std::sync::Arc;
88

99
use futures::prelude::*;
10+
use futures::sink::SinkExt;
1011
use grpc::{
1112
self, ClientStreamingSink, DuplexSink, MessageReader, Method, MethodType, RequestStream,
1213
RpcContext, RpcStatus, RpcStatusCode, ServerStreamingSink, ServiceBuilder, UnarySink,
@@ -56,12 +57,21 @@ impl BenchmarkService for Benchmark {
5657
fn streaming_from_client(
5758
&mut self,
5859
ctx: RpcContext,
59-
_: RequestStream<SimpleRequest>,
60+
mut stream: RequestStream<SimpleRequest>,
6061
sink: ClientStreamingSink<SimpleResponse>,
6162
) {
62-
let f = sink.fail(RpcStatus::new(RpcStatusCode::UNIMPLEMENTED, None));
63+
let f = async move {
64+
let mut resp: Option<SimpleResponse> = None;
65+
while let Some(req) = stream.try_next().await? {
66+
if resp.is_none() {
67+
resp = Some(gen_resp(&req));
68+
}
69+
}
70+
sink.success(resp.unwrap()).await?;
71+
Ok(())
72+
};
6373
let keep_running = self.keep_running.clone();
64-
spawn!(ctx, keep_running, "reporting unimplemented method", f)
74+
spawn!(ctx, keep_running, "streaming from client", f)
6575
}
6676

6777
fn streaming_from_server(

benchmark/src/client.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ struct RequestExecutor<B> {
182182
ctx: ExecutorContext<B>,
183183
client: Arc<BenchmarkServiceClient>,
184184
req: SimpleRequest,
185+
cfg: ClientConfig,
185186
}
186187

187188
impl<B: Backoff + Send + 'static> RequestExecutor<B> {
@@ -190,6 +191,7 @@ impl<B: Backoff + Send + 'static> RequestExecutor<B> {
190191
ctx,
191192
client: Arc::new(BenchmarkServiceClient::new(channel)),
192193
req: gen_req(cfg),
194+
cfg: cfg.clone(),
193195
}
194196
}
195197

@@ -254,6 +256,38 @@ impl<B: Backoff + Send + 'static> RequestExecutor<B> {
254256
};
255257
spawn!(client, keep_running, "streaming ping pong", f);
256258
}
259+
260+
fn execute_stream_from_client(mut self) {
261+
let client = self.client.clone();
262+
let keep_running = self.ctx.keep_running.clone();
263+
264+
let mut send_data = vec![];
265+
for _ in 0..self.cfg.get_messages_per_stream() {
266+
send_data.push(self.req.clone());
267+
}
268+
let f = async move {
269+
loop {
270+
let (mut sender, receiver) = self.client.streaming_from_client().unwrap();
271+
let latency_timer = Instant::now();
272+
let send_stream = futures::stream::iter(send_data.clone());
273+
sender
274+
.send_all(&mut send_stream.map(move |item| Ok((item, WriteFlags::default()))))
275+
.await?;
276+
sender.close().await?;
277+
receiver.await?;
278+
self.ctx.observe_latency(latency_timer.elapsed());
279+
let mut time = self.ctx.backoff_async();
280+
if let Some(t) = &mut time {
281+
t.await;
282+
}
283+
if !self.ctx.keep_running() {
284+
break;
285+
}
286+
}
287+
Ok(())
288+
};
289+
spawn!(client, keep_running, "streaming from client", f);
290+
}
257291
}
258292

259293
fn execute<B: Backoff + Send + 'static>(
@@ -272,7 +306,7 @@ fn execute<B: Backoff + Send + 'static>(
272306
ClientType::ASYNC_CLIENT => match cfg.get_rpc_type() {
273307
RpcType::UNARY => {
274308
if cfg.get_payload_config().has_bytebuf_params() {
275-
panic!("only streaming is supported for generic service.");
309+
panic!("only ping pong streaming is supported for generic service.");
276310
}
277311
RequestExecutor::new(ctx, ch, cfg).execute_unary_async()
278312
}
@@ -283,6 +317,12 @@ fn execute<B: Backoff + Send + 'static>(
283317
RequestExecutor::new(ctx, ch, cfg).execute_stream_ping_pong()
284318
}
285319
}
320+
RpcType::STREAMING_FROM_CLIENT => {
321+
if cfg.get_payload_config().has_bytebuf_params() {
322+
panic!("only ping pong streaming is supported for generic service.");
323+
}
324+
RequestExecutor::new(ctx, ch, cfg).execute_stream_from_client()
325+
}
286326
_ => unimplemented!(),
287327
},
288328
_ => unimplemented!(),

0 commit comments

Comments
 (0)