Skip to content

Commit 8ea3bf2

Browse files
authored
Pick Add blocking callback to EnvBuilder (#474) (#475)
Signed-off-by: Xintao <[email protected]>
1 parent 5ccfa62 commit 8ea3bf2

File tree

5 files changed

+50
-12
lines changed

5 files changed

+50
-12
lines changed

grpc-sys/build.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -381,9 +381,9 @@ fn main() {
381381
};
382382

383383
if get_env("CARGO_CFG_TARGET_OS").map_or(false, |s| s == "windows") {
384-
// At lease win7
385-
cc.define("_WIN32_WINNT", Some("0x0700"));
386-
bind_config = bind_config.clang_arg("-D _WIN32_WINNT=0x0700");
384+
// At lease vista
385+
cc.define("_WIN32_WINNT", Some("0x600"));
386+
bind_config = bind_config.clang_arg("-D _WIN32_WINNT=0x600");
387387
}
388388

389389
if get_env("GRPCIO_SYS_USE_PKG_CONFIG").map_or(false, |s| s == "1") {

proto/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,6 @@ protobuf = "2"
2727
lazy_static = { version = "1.3", optional = true }
2828

2929
[build-dependencies]
30-
protobuf-build = { version = "0.11", default-features = false }
30+
protobuf-build = { version = "=0.11.3", default-features = false }
3131
grpcio-compiler = { path = "../compiler", version = "0.5.0", default-features = false }
3232
walkdir = "2.2"

src/call/server.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ macro_rules! impl_unary_sink {
317317
$t {
318318
call: Some(call),
319319
write_flags: 0,
320-
ser: ser,
320+
ser,
321321
}
322322
}
323323

@@ -349,8 +349,8 @@ macro_rules! impl_unary_sink {
349349

350350
$rt {
351351
call: self.call.take().unwrap(),
352-
cq_f: cq_f,
353-
err: err,
352+
cq_f,
353+
err,
354354
}
355355
}
356356
}
@@ -415,7 +415,7 @@ macro_rules! impl_stream_sink {
415415
status: RpcStatus::ok(),
416416
flushed: false,
417417
closed: false,
418-
ser: ser,
418+
ser,
419419
}
420420
}
421421

@@ -439,8 +439,8 @@ macro_rules! impl_stream_sink {
439439

440440
$ft {
441441
call: self.call.take().unwrap(),
442-
fail_f: fail_f,
443-
err: err,
442+
fail_f,
443+
err,
444444
}
445445
}
446446
}

src/env.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ fn poll_queue(tx: mpsc::Sender<CompletionQueue>) {
3838
pub struct EnvBuilder {
3939
cq_count: usize,
4040
name_prefix: Option<String>,
41+
after_start: Option<Arc<dyn Fn() + Send + Sync>>,
42+
before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
4143
}
4244

4345
impl EnvBuilder {
@@ -46,6 +48,8 @@ impl EnvBuilder {
4648
EnvBuilder {
4749
cq_count: unsafe { grpc_sys::gpr_cpu_num_cores() as usize },
4850
name_prefix: None,
51+
after_start: None,
52+
before_stop: None,
4953
}
5054
}
5155

@@ -67,6 +71,18 @@ impl EnvBuilder {
6771
self
6872
}
6973

74+
/// Execute function `f` after each thread is started but before it starts doing work.
75+
pub fn after_start<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
76+
self.after_start = Some(Arc::new(f));
77+
self
78+
}
79+
80+
/// Execute function `f` before each thread stops.
81+
pub fn before_stop<F: Fn() + Send + Sync + 'static>(mut self, f: F) -> EnvBuilder {
82+
self.before_stop = Some(Arc::new(f));
83+
self
84+
}
85+
7086
/// Finalize the [`EnvBuilder`], build the [`Environment`] and initialize the gRPC library.
7187
pub fn build(self) -> Environment {
7288
unsafe {
@@ -81,7 +97,19 @@ impl EnvBuilder {
8197
if let Some(ref prefix) = self.name_prefix {
8298
builder = builder.name(format!("{}-{}", prefix, i));
8399
}
84-
let handle = builder.spawn(move || poll_queue(tx_i)).unwrap();
100+
let after_start = self.after_start.clone();
101+
let before_stop = self.before_stop.clone();
102+
let handle = builder
103+
.spawn(move || {
104+
if let Some(f) = after_start {
105+
f();
106+
}
107+
poll_queue(tx_i);
108+
if let Some(f) = before_stop {
109+
f();
110+
}
111+
})
112+
.unwrap();
85113
handles.push(handle);
86114
}
87115
for _ in 0..self.cq_count {

tests-and-examples/tests/cases/misc.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,16 @@ impl Greeter for PeerService {
2525

2626
#[test]
2727
fn test_peer() {
28-
let env = Arc::new(EnvBuilder::new().build());
28+
let counter_add = Arc::new(AtomicI32::new(0));
29+
let counter_collect = counter_add.clone();
30+
let env = Arc::new(
31+
EnvBuilder::new()
32+
.cq_count(2)
33+
.after_start(move || {
34+
counter_add.fetch_add(1, Ordering::Relaxed);
35+
})
36+
.build(),
37+
);
2938
let service = create_greeter(PeerService);
3039
let mut server = ServerBuilder::new(env.clone())
3140
.register_service(service)
@@ -41,6 +50,7 @@ fn test_peer() {
4150
let resp = client.say_hello(&req).unwrap();
4251

4352
assert!(resp.get_message().contains("127.0.0.1"), "{:?}", resp);
53+
assert_eq!(counter_collect.load(Ordering::Relaxed), 2);
4454
}
4555

4656
#[derive(Clone)]

0 commit comments

Comments
 (0)