Skip to content

Commit d4fcc2b

Browse files
authored
Merge branch 'master' into XT/sink-batchable
2 parents 9be396e + dba0582 commit d4fcc2b

File tree

2 files changed

+40
-2
lines changed

2 files changed

+40
-2
lines changed

src/env.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
3838
pub struct EnvBuilder {
3939
cq_count: usize,
4040
name_prefix: Option<String>,
41+
after_start: Option<Arc<dyn Fn() + Send + Sync>>,
42+
before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
4143
}
4244

4345
impl EnvBuilder {
@@ -46,6 +48,8 @@ impl EnvBuilder {
4648
EnvBuilder {
4749
cq_count: unsafe { grpc_sys::gpr_cpu_num_cores() as usize },
4850
name_prefix: None,
51+
after_start: None,
52+
before_stop: None,
4953
}
5054
}
5155

@@ -67,6 +71,18 @@ impl EnvBuilder {
6771
self
6872
}
6973

74+
/// Execute function `f` after each thread is started but before it starts doing work.
75+
pub fn after_start<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
76+
self.after_start = Some(Arc::new(f));
77+
self
78+
}
79+
80+
/// Execute function `f` before each thread stops.
81+
pub fn before_stop<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
82+
self.before_stop = Some(Arc::new(f));
83+
self
84+
}
85+
7086
/// Finalize the [`EnvBuilder`], build the [`Environment`] and initialize the gRPC library.
7187
pub fn build(self) -> Environment {
7288
unsafe {
@@ -81,7 +97,19 @@ impl EnvBuilder {
8197
if let Some(ref prefix) = self.name_prefix {
8298
builder = builder.name(format!("{}-{}", prefix, i));
8399
}
84-
let handle = builder.spawn(move || poll_queue(tx_i)).unwrap();
100+
let after_start = self.after_start.clone();
101+
let before_stop = self.before_stop.clone();
102+
let handle = builder
103+
.spawn(move || {
104+
if let Some(f) = after_start {
105+
f();
106+
}
107+
poll_queue(tx_i);
108+
if let Some(f) = before_stop {
109+
f();
110+
}
111+
})
112+
.unwrap();
85113
handles.push(handle);
86114
}
87115
for _ in 0..self.cq_count {

tests-and-examples/tests/cases/misc.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,16 @@ impl Greeter for PeerService {
2727

2828
#[test]
2929
fn test_peer() {
30-
let env = Arc::new(EnvBuilder::new().build());
30+
let counter_add = Arc::new(AtomicI32::new(0));
31+
let counter_collect = counter_add.clone();
32+
let env = Arc::new(
33+
EnvBuilder::new()
34+
.cq_count(2)
35+
.after_start(move || {
36+
counter_add.fetch_add(1, Ordering::Relaxed);
37+
})
38+
.build(),
39+
);
3140
let service = create_greeter(PeerService);
3241
let mut server = ServerBuilder::new(env.clone())
3342
.register_service(service)
@@ -43,6 +52,7 @@ fn test_peer() {
4352
let resp = client.say_hello(&req).unwrap();
4453

4554
assert!(resp.get_message().contains("127.0.0.1"), "{:?}", resp);
55+
assert_eq!(counter_collect.load(Ordering::Relaxed), 2);
4656
}
4757

4858
#[derive(Clone)]

0 commit comments

Comments
 (0)