Skip to content

Commit aa12149

Browse files
committed
update test to test different buffer strategies
Signed-off-by: Xintao <[email protected]>
1 parent e5e4804 commit aa12149

File tree

2 files changed

+39
-17
lines changed

2 files changed

+39
-17
lines changed

tests-and-examples/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ prost-codec = ["prost", "bytes", "grpcio/prost-codec", "grpcio-proto/prost-codec
1313
[dependencies]
1414
grpcio-sys = { path = "../grpc-sys", version = "0.6.0" }
1515
libc = "0.2"
16-
futures = "0.3"
16+
futures = { version = "0.3", features = ["thread-pool"] }
1717
protobuf = { version = "2.0", optional = true }
1818
prost = { version = "0.6", optional = true }
1919
bytes = { version = "0.5", optional = true }

tests-and-examples/tests/cases/stream.rs

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
use std::sync::Arc;
44

5-
use futures::executor::block_on;
5+
use futures::executor::ThreadPoolBuilder;
6+
use futures::join;
67
use futures::prelude::*;
78
use futures::sink::SinkExt;
89
use grpcio::{
@@ -11,6 +12,8 @@ use grpcio::{
1112
};
1213
use grpcio_proto::example::route_guide::*;
1314

15+
const MESSAGE_NUM: i32 = 3000;
16+
1417
#[derive(Clone)]
1518
struct RouteGuideService {}
1619

@@ -38,11 +41,16 @@ impl RouteGuide for RouteGuideService {
3841
);
3942
current_num += 1;
4043
summary.point_count += 1;
44+
// Send a reply message after receiving a limited number of messages, which
45+
// can be used to test the correctness under different buffer strategies.
46+
if current_num >= MESSAGE_NUM {
47+
break;
48+
}
4149
}
4250
resp.success(summary).await?;
4351
Ok(())
4452
}
45-
.map_err(|_: grpcio::Error| panic!("server got error"))
53+
.map_err(|e: grpcio::Error| panic!("server got error: {:?}", e))
4654
.map(|_| ());
4755
ctx.spawn(f)
4856
}
@@ -71,11 +79,13 @@ fn test_client_send_all() {
7179
let ch = ChannelBuilder::new(env).connect(&format!("127.0.0.1:{}", port));
7280
let client = RouteGuideClient::new(ch);
7381

82+
let pool = ThreadPoolBuilder::new().pool_size(2).create().unwrap();
83+
7484
let exec_test_f = async move {
75-
// test for send all
85+
// Test for send all disable batch
7686
let (mut sink, receiver) = client.record_route().unwrap();
7787
let mut send_data = vec![];
78-
for i in 0..3000 {
88+
for i in 0..MESSAGE_NUM {
7989
let mut p = Point::default();
8090
p.set_longitude(i);
8191
send_data.push(p);
@@ -84,13 +94,13 @@ fn test_client_send_all() {
8494
sink.send_all(&mut send_stream.map(move |item| Ok((item, WriteFlags::default()))))
8595
.await
8696
.unwrap();
87-
sink.close().await.unwrap();
8897
let summary = receiver.await.unwrap();
89-
assert_eq!(summary.get_point_count(), 3000);
90-
// test for send all enable batch
98+
assert_eq!(summary.get_point_count(), MESSAGE_NUM);
99+
100+
// Test for send all enable batch
91101
let (mut sink, receiver) = client.record_route().unwrap();
92102
let mut send_data = vec![];
93-
for i in 0..3000 {
103+
for i in 0..MESSAGE_NUM {
94104
let mut p = Point::default();
95105
p.set_longitude(i);
96106
send_data.push(p);
@@ -100,13 +110,13 @@ fn test_client_send_all() {
100110
sink.send_all(&mut send_stream.map(move |item| Ok((item, WriteFlags::default()))))
101111
.await
102112
.unwrap();
103-
sink.close().await.unwrap();
104113
let summary = receiver.await.unwrap();
105-
assert_eq!(summary.get_point_count(), 3000);
106-
// test for send all and buffer hint is true
114+
assert_eq!(summary.get_point_count(), MESSAGE_NUM);
115+
116+
// Test for send all and all buffer hints are true
107117
let (mut sink, receiver) = client.record_route().unwrap();
108118
let mut send_data = vec![];
109-
for i in 0..3000 {
119+
for i in 0..MESSAGE_NUM {
110120
let mut p = Point::default();
111121
p.set_longitude(i);
112122
send_data.push(p);
@@ -118,9 +128,21 @@ fn test_client_send_all() {
118128
)
119129
.await
120130
.unwrap();
121-
sink.close().await.unwrap();
122-
let summary = receiver.await.unwrap();
123-
assert_eq!(summary.get_point_count(), 3000);
131+
// The following code is to test that when all msgs are set to be buffered, the msgs
132+
// should be stored in the buffer until `sink.close()` is called.
133+
let (tx, rx) = std::sync::mpsc::channel();
134+
let close_sink_task = async move {
135+
rx.recv_timeout(std::time::Duration::from_secs(1))
136+
.unwrap_err();
137+
sink.close().await.unwrap();
138+
rx.recv_timeout(std::time::Duration::from_secs(1)).unwrap();
139+
};
140+
let recv_msg_task = async move {
141+
let summary = receiver.await.unwrap();
142+
tx.send(()).unwrap();
143+
assert_eq!(summary.get_point_count(), MESSAGE_NUM);
144+
};
145+
join!(close_sink_task, recv_msg_task);
124146
};
125-
block_on(exec_test_f);
147+
pool.spawn_ok(exec_test_f);
126148
}

0 commit comments

Comments
 (0)