Skip to content

Commit 5770690

Browse files
authored
*: support querying channelz (#550)
This is a basic support for [channelz][1] that allows querying the information using APIs. Full support is difficult because no library supports parse JSON to protobuf in Rust. [1]: https://github.com/grpc/proposal/blob/master/A14-channelz.md Signed-off-by: Jay Lee <[email protected]>
1 parent 1a13827 commit 5770690

File tree

3 files changed

+177
-0
lines changed

3 files changed

+177
-0
lines changed

src/channelz.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
2+
3+
//! Channelz provides channel level debug information. In short, There are four types of
4+
//! top level entities: channel, subchannel, socket and server. All entities are
5+
//! identified by an positive unique integer, which is allocated in order. For more
6+
//! explanation, see https://github.com/grpc/proposal/blob/master/A14-channelz.md.
7+
//!
8+
//! A full support requires a service that allow remote querying. But for now it's
9+
//! too complicated to add full support. Because gRPC C core exposes the information
10+
//! using JSON format, and there is no protobuf library that supports parsing json
11+
//! format in Rust. So this module only provides safe APIs to access the informations.
12+
13+
use std::ffi::CStr;
14+
use std::{cmp, str};
15+
16+
macro_rules! visit {
17+
($ptr:expr, $visitor:ident) => {{
18+
let s_ptr = $ptr;
19+
let res;
20+
if !s_ptr.is_null() {
21+
let c_s = CStr::from_ptr(s_ptr);
22+
// It's json string, so it must be utf8 compatible.
23+
let s = str::from_utf8_unchecked(c_s.to_bytes());
24+
res = $visitor(s);
25+
grpcio_sys::gpr_free(s_ptr as _);
26+
} else {
27+
res = $visitor("");
28+
}
29+
res
30+
}};
31+
}
32+
33+
/// Gets all root channels (i.e. channels the application has directly created). This
34+
/// does not include subchannels nor non-top level channels.
35+
pub fn get_top_channels<V, R>(start_channel_id: u64, visitor: V) -> R
36+
where
37+
V: FnOnce(&str) -> R,
38+
{
39+
unsafe {
40+
visit!(
41+
grpcio_sys::grpc_channelz_get_top_channels(start_channel_id as _),
42+
visitor
43+
)
44+
}
45+
}
46+
47+
/// Gets all servers that exist in the process.
48+
pub fn get_servers<V, R>(start_server_id: u64, visitor: V) -> R
49+
where
50+
V: FnOnce(&str) -> R,
51+
{
52+
unsafe {
53+
visit!(
54+
grpcio_sys::grpc_channelz_get_servers(start_server_id as _),
55+
visitor
56+
)
57+
}
58+
}
59+
60+
/// Returns a single Server, or else an empty string.
61+
pub fn get_server<V, R>(server_id: u64, visitor: V) -> R
62+
where
63+
V: FnOnce(&str) -> R,
64+
{
65+
unsafe {
66+
visit!(
67+
grpcio_sys::grpc_channelz_get_server(server_id as _),
68+
visitor
69+
)
70+
}
71+
}
72+
73+
/// Gets all server sockets that exist in the server.
74+
pub fn get_server_sockets<V, R>(
75+
server_id: u64,
76+
start_socket_id: u64,
77+
max_results: usize,
78+
visitor: V,
79+
) -> R
80+
where
81+
V: FnOnce(&str) -> R,
82+
{
83+
let max_results = cmp::min(isize::MAX as usize, max_results) as isize;
84+
unsafe {
85+
visit!(
86+
grpcio_sys::grpc_channelz_get_server_sockets(
87+
server_id as _,
88+
start_socket_id as _,
89+
max_results
90+
),
91+
visitor
92+
)
93+
}
94+
}
95+
96+
/// Returns a single Channel, or else an empty string.
97+
pub fn get_channel<V, R>(channel_id: u64, visitor: V) -> R
98+
where
99+
V: FnOnce(&str) -> R,
100+
{
101+
unsafe {
102+
visit!(
103+
grpcio_sys::grpc_channelz_get_channel(channel_id as _),
104+
visitor
105+
)
106+
}
107+
}
108+
109+
/// Returns a single Subchannel, or else an empty string.
110+
pub fn get_subchannel<V, R>(subchannel_id: u64, visitor: V) -> R
111+
where
112+
V: FnOnce(&str) -> R,
113+
{
114+
unsafe {
115+
visit!(
116+
grpcio_sys::grpc_channelz_get_subchannel(subchannel_id as _),
117+
visitor
118+
)
119+
}
120+
}
121+
122+
/// Returns a single Socket, or else an empty string.
123+
pub fn get_socket<V, R>(socket_id: u64, visitor: V) -> R
124+
where
125+
V: FnOnce(&str) -> R,
126+
{
127+
unsafe {
128+
visit!(
129+
grpcio_sys::grpc_channelz_get_socket(socket_id as _),
130+
visitor
131+
)
132+
}
133+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ mod auth_context;
3030
mod buf;
3131
mod call;
3232
mod channel;
33+
pub mod channelz;
3334
mod client;
3435
mod codec;
3536
mod cq;

tests-and-examples/tests/cases/misc.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,3 +367,46 @@ fn test_connectivity() {
367367
ch.wait_for_connected(Duration::from_secs(3)).await;
368368
});
369369
}
370+
371+
/// Tests channelz related API works as expected.
372+
#[test]
373+
fn test_channelz() {
374+
let env = Arc::new(Environment::new(2));
375+
let service = create_greeter(PeerService);
376+
let mut server = ServerBuilder::new(env.clone())
377+
.register_service(service)
378+
.bind("127.0.0.1", 0)
379+
.build()
380+
.unwrap();
381+
server.start();
382+
let port = server.bind_addrs().next().unwrap().1;
383+
let mut res = None;
384+
channelz::get_servers(0, |s| {
385+
res = Some(s.to_string());
386+
});
387+
// There should be at least one server.
388+
assert!(
389+
res.as_ref().map_or(false, |s| s.contains("serverId")),
390+
"{:?}",
391+
res
392+
);
393+
res = None;
394+
channelz::get_server(0, |s| {
395+
res = Some(s.to_string());
396+
});
397+
// 0 will never be used as id.
398+
assert_eq!(res, Some(String::new()));
399+
400+
res = None;
401+
let ch = ChannelBuilder::new(env.clone()).connect(&format!("127.0.0.1:{}", port));
402+
assert!(block_on(ch.wait_for_connected(Duration::from_secs(3))));
403+
channelz::get_top_channels(0, |s| {
404+
res = Some(s.to_string());
405+
});
406+
// There should be at least one channel.
407+
assert!(
408+
res.as_ref().map_or(false, |s| s.contains("channelId")),
409+
"{:?}",
410+
res
411+
);
412+
}

0 commit comments

Comments
 (0)