Skip to content

Protobuf 3 support #615

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 47 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
5d3707a
Protobuf v3: Add v3 generated files
lb034582341 May 10, 2023
18a9c68
Protobuf v3: Add protobufv3-codec feature.
lb034582341 May 10, 2023
4022818
Protobuf v3: Actual code upgrade to support v3.
lb034582341 May 10, 2023
72f1aa6
Changes from mgeisler's review
lb034582341 May 26, 2023
bab6585
Fix: mgeisler's suggestion
lb034582341 May 30, 2023
ee70846
BusyJay's comments (use public fields for proto2)
lb034582341 Jul 10, 2023
0565ebe
BusyJay's change, notably merge v2/v3 test cases
lb034582341 Jul 10, 2023
8fd3908
Merge examples' protobuf 2 and 3 version
lb034582341 Jul 12, 2023
c641d9f
Remove unnecessary returns
lb034582341 Jul 12, 2023
22bf0ff
Run cargo fmt
lb034582341 Jul 12, 2023
3b3bc12
BusyJay's comments
lb034582341 Jul 12, 2023
36ffab2
Protobufv3 exported as protobuf to simplify code
lb034582341 Jul 12, 2023
7f81f00
Fix protobuf import in tests-and-examples
lb034582341 Jul 12, 2023
ac2c5a2
Interop: merge (delete leftover files)
lb034582341 Jul 12, 2023
d171d2e
Benchmark: merge protobuf 2 and 3 versions
lb034582341 Jul 12, 2023
6d5b382
Shorter syntax for helper function
lb034582341 Jul 12, 2023
3500b6f
Lint to fix clippy's CI errors
lb034582341 Jul 12, 2023
62ee644
Fix erroneous imports (to fix CI)
lb034582341 Jul 16, 2023
e457014
Add protobufv3 features to CI pipeline
lb034582341 Jul 16, 2023
0e3a27d
Fix test
lb034582341 Jul 16, 2023
75c4c7c
Update grpc-sys/grpc submodule
lb034582341 Jul 19, 2023
245568e
Revert erroneous change
lb034582341 Jul 19, 2023
efcbdd9
Revert changed formatting on .yml file.
lb034582341 Jul 19, 2023
1763e96
Triggering CI
lb034582341 Aug 1, 2023
0fb160d
Triggering CI 2
lb034582341 Aug 1, 2023
40c10a8
Protobuf3: enable it in compiler/
lb034582341 Aug 2, 2023
8be9ef5
Nit: change assert
lb034582341 Aug 2, 2023
22e7c0e
Enable boringssl for benchmark, health, interop
lb034582341 Aug 2, 2023
610032f
Remove mapping to RemoteStopped (this was to fix a type error, now un…
lb034582341 Aug 2, 2023
f39ab1a
grpc-sys: update to 1.56.2 (#624)
BusyJay Aug 1, 2023
2867cd2
xtask: Add code to generate pbv3 files
lb034582341 Aug 3, 2023
648b815
Remove Enum redefinitions in health/ (now done by xtask)
lb034582341 Aug 3, 2023
8b1c827
Add slightly-changed auto-generated files
lb034582341 Aug 3, 2023
4766fc4
Merge branch 'tikv:master' into master
lb034582341 Aug 3, 2023
cdf84aa
Fix wrong default in compiler
lb034582341 Aug 4, 2023
cbe53e8
Xtask v3: remove protoc line
lb034582341 Aug 5, 2023
4be3d35
Regenerate v3 pb
lb034582341 Aug 5, 2023
564d9b0
health: remove boringssl default
lb034582341 Aug 7, 2023
0fba43d
Compiler: remove protobufv3 feature
lb034582341 Aug 7, 2023
74c500f
compiler: remove protobuf-codegen and anyhow (unused)
lb034582341 Aug 8, 2023
136c8a7
compiler: revert all changes to src/codegen.rs
lb034582341 Aug 8, 2023
e428de3
Cargo.toml: add space
lb034582341 Aug 8, 2023
1fc7b19
health: remove dead code
lb034582341 Aug 8, 2023
a79276f
health: assert
lb034582341 Aug 8, 2023
549c841
health: timeout is an error
lb034582341 Aug 8, 2023
61abf10
health: missing negation
lb034582341 Aug 8, 2023
0f0577d
Nits
lb034582341 Aug 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ jobs:
- run: cargo xtask bindgen
- run: cargo build --no-default-features
- run: cargo build --no-default-features --features protobuf-codec
- run: cargo build --no-default-features --features protobufv3-codec
- run: cargo build --no-default-features --features prost-codec
- run: cd proto && cargo build --no-default-features --features prost-codec
- run: cargo build
Expand Down Expand Up @@ -80,6 +81,7 @@ jobs:
- run: cargo xtask submodule
- run: cargo build --no-default-features
- run: cargo build --no-default-features --features protobuf-codec
- run: cargo build --no-default-features --features protobufv3-codec
- run: cargo build --no-default-features --features prost-codec
- run: cargo build
- run: cargo test --all
Expand All @@ -98,6 +100,7 @@ jobs:
- run: cargo xtask bindgen
- run: cargo build --no-default-features
- run: cargo build --no-default-features --features "protobuf-codec"
- run: cargo build --no-default-features --features "protobufv3-codec"
- run: cargo build --no-default-features --features "prost-codec"
- run: cargo build
- run: cargo test --all
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ libc = "0.2"
futures-executor = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] }
protobuf = { version = "2.0", optional = true }
protobufv3 = { package = "protobuf", version = "3.2", optional = true }
prost = { version = "0.11", optional = true }
bytes = { version = "1.0", optional = true }
log = "0.4"
Expand All @@ -44,6 +45,7 @@ exclude = ["xtask"]
default = ["protobuf-codec", "boringssl"]
_secure = []
protobuf-codec = ["protobuf"]
protobufv3-codec = ["protobufv3"]
prost-codec = ["prost", "bytes"]
nightly = []
boringssl = ["grpcio-sys/boringssl", "_secure"]
Expand Down
7 changes: 5 additions & 2 deletions benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ publish = false

[features]
default = ["protobuf-codec"]
protobuf-codec = ["grpcio/protobuf-codec", "grpcio-proto/protobuf-codec"]
protobuf-codec = ["grpcio/protobuf-codec", "grpcio-proto/protobuf-codec", "dep:protobuf"]
protobufv3-codec = ["grpcio/protobufv3-codec", "grpcio-proto/protobufv3-codec", "dep:protobufv3"]
prost-codec = ["grpcio/prost-codec", "grpcio-proto/prost-codec"]

[dependencies]
grpcio = { path = ".." }
grpcio = { path = "..", default-features = false, features = ["boringssl"] }
grpcio-proto = { path = "../proto", default-features = false }
futures-channel = "0.3"
futures-executor = "0.3"
Expand All @@ -28,6 +29,8 @@ slog-async = "2.1"
slog-stdlog = "4.0"
slog-scope = "4.0"
slog-term = "2.2"
protobuf = { version = "2", optional = true }
protobufv3 = { package = "protobuf", version = "3.2", optional = true }

[[bin]]
name = "qps_worker"
Expand Down
9 changes: 5 additions & 4 deletions benchmark/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use grpc_proto::util;
use grpcio::GrpcSlice;

fn gen_resp(req: &SimpleRequest) -> SimpleResponse {
let payload = util::new_payload(req.get_response_size() as usize);
let mut resp = SimpleResponse::default();
resp.set_payload(payload);
resp
let payload = util::new_payload(req.response_size as usize);
SimpleResponse {
payload: Some(payload).into(),
..SimpleResponse::default()
}
}

#[derive(Clone)]
Expand Down
142 changes: 107 additions & 35 deletions benchmark/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ use futures_util::{
use grpcio::{
CallOption, Channel, ChannelBuilder, Client as GrpcClient, EnvBuilder, Environment, WriteFlags,
};
use grpcio_proto::testing::control::SecurityParams;
use grpcio_proto::testing::control::{ClientConfig, ClientType, RpcType};
use grpcio_proto::testing::messages::SimpleRequest;
use grpcio_proto::testing::payloads::PayloadConfig;
use grpcio_proto::testing::services_grpc::BenchmarkServiceClient;
use grpcio_proto::testing::stats::ClientStats;
use grpcio_proto::util as proto_util;
Expand All @@ -27,14 +29,23 @@ use rand_xorshift::XorShiftRng;
use crate::bench;
use crate::util::{self, CpuRecorder, Histogram};

#[cfg(feature = "protobuf-codec")]
fn gen_req(cfg: &ClientConfig) -> SimpleRequest {
let mut req = SimpleRequest::default();
let payload_config = cfg.get_payload_config();
let simple_params = payload_config.get_simple_params();
req.set_payload(proto_util::new_payload(
simple_params.get_req_size() as usize
));
req.set_response_size(simple_params.get_resp_size());
req.payload = Some(proto_util::new_payload(simple_params.req_size as usize)).into();
req.response_size = simple_params.resp_size;
req
}

#[cfg(feature = "protobufv3-codec")]
fn gen_req(cfg: &ClientConfig) -> SimpleRequest {
let mut req = SimpleRequest::default();
let payload_config = &cfg.payload_config;
let simple_params = payload_config.simple_params();
req.payload = Some(proto_util::new_payload(simple_params.req_size as usize)).into();
req.response_size = simple_params.resp_size;
req
}

Expand Down Expand Up @@ -138,7 +149,11 @@ struct GenericExecutor<B> {

impl<B: Backoff + Send + 'static> GenericExecutor<B> {
fn new(ctx: ExecutorContext<B>, channel: Channel, cfg: &ClientConfig) -> GenericExecutor<B> {
#[cfg(feature = "protobuf-codec")]
let cap = cfg.get_payload_config().get_bytebuf_params().get_req_size();
#[cfg(feature = "protobufv3-codec")]
let cap = cfg.payload_config.bytebuf_params().req_size;

let req = vec![0; cap as usize];
GenericExecutor {
ctx,
Expand Down Expand Up @@ -297,6 +312,23 @@ impl<B: Backoff + Send + 'static> RequestExecutor<B> {
}
}

#[cfg(feature = "protobuf-codec")]
fn get_payload_cfg(cfg: &ClientConfig) -> &PayloadConfig {
cfg.get_payload_config()
}
#[cfg(feature = "protobufv3-codec")]
fn get_payload_cfg(cfg: &ClientConfig) -> &PayloadConfig {
&cfg.payload_config
}
#[cfg(feature = "protobuf-codec")]
fn get_rpc_type(cfg: &ClientConfig) -> RpcType {
cfg.get_rpc_type()
}
#[cfg(feature = "protobufv3-codec")]
fn get_rpc_type(cfg: &ClientConfig) -> RpcType {
cfg.rpc_type.enum_value().unwrap()
}

fn execute<B: Backoff + Send + 'static>(
ctx: ExecutorContext<B>,
ch: Channel,
Expand All @@ -305,27 +337,27 @@ fn execute<B: Backoff + Send + 'static>(
) {
match client_type {
ClientType::SYNC_CLIENT => {
if cfg.get_payload_config().has_bytebuf_params() {
if get_payload_cfg(cfg).has_bytebuf_params() {
panic!("only async_client is supported for generic service.");
}
RequestExecutor::new(ctx, ch, cfg).execute_unary()
}
ClientType::ASYNC_CLIENT => match cfg.get_rpc_type() {
ClientType::ASYNC_CLIENT => match get_rpc_type(cfg) {
RpcType::UNARY => {
if cfg.get_payload_config().has_bytebuf_params() {
if get_payload_cfg(cfg).has_bytebuf_params() {
panic!("only ping pong streaming is supported for generic service.");
}
RequestExecutor::new(ctx, ch, cfg).execute_unary_async()
}
RpcType::STREAMING => {
if cfg.get_payload_config().has_bytebuf_params() {
if get_payload_cfg(cfg).has_bytebuf_params() {
GenericExecutor::new(ctx, ch, cfg).execute_stream()
} else {
RequestExecutor::new(ctx, ch, cfg).execute_stream_ping_pong()
}
}
RpcType::STREAMING_FROM_CLIENT => {
if cfg.get_payload_config().has_bytebuf_params() {
if get_payload_cfg(cfg).has_bytebuf_params() {
panic!("only ping pong streaming is supported for generic service.");
}
RequestExecutor::new(ctx, ch, cfg).execute_stream_from_client()
Expand All @@ -344,68 +376,108 @@ pub struct Client {
running_reqs: Option<Vec<Receiver<()>>>,
}

#[cfg(feature = "protobuf-codec")]
fn get_security_params(cfg: &ClientConfig) -> Option<&SecurityParams> {
match cfg.has_security_params() {
true => Some(cfg.get_security_params()),
false => None,
}
}
#[cfg(feature = "protobufv3-codec")]
fn get_security_params(cfg: &ClientConfig) -> Option<&SecurityParams> {
cfg.security_params.0.as_deref()
}

impl Client {
pub fn new(cfg: &ClientConfig) -> Client {
let mut builder = EnvBuilder::new();
let thd_cnt = cfg.get_async_client_threads() as usize;
let thd_cnt = cfg.async_client_threads as usize;
if thd_cnt != 0 {
builder = builder.cq_count(thd_cnt);
}
let env = Arc::new(builder.build());
if cfg.get_core_limit() > 0 {
if cfg.core_limit > 0 {
error!("client config core limit is set but ignored");
}

let ch_env = env.clone();
let channels = (0..cfg.get_client_channels())
.zip(cfg.get_server_targets().iter().cycle())
let channels = (0..cfg.client_channels)
.zip(cfg.server_targets.iter().cycle())
.map(|(_, addr)| {
let mut builder = ChannelBuilder::new(ch_env.clone());
for arg in cfg.get_channel_args() {
let key = CString::new(arg.get_name()).unwrap();
for arg in &cfg.channel_args {
let key = CString::new(arg.name.clone()).unwrap();
#[cfg(feature = "protobuf-codec")]
if arg.has_str_value() {
builder =
builder.raw_cfg_string(key, CString::new(arg.get_str_value()).unwrap());
} else if arg.has_int_value() {
builder = builder.raw_cfg_int(key, arg.get_int_value());
}
#[cfg(feature = "protobufv3-codec")]
if arg.has_str_value() {
builder =
builder.raw_cfg_string(key, CString::new(arg.str_value()).unwrap());
} else if arg.has_int_value() {
builder = builder.raw_cfg_int(key, arg.int_value());
}
}
// Check https://github.com/grpc/grpc/issues/31465.
builder = builder.enable_retry(false);
if cfg.has_security_params() {
let params = cfg.get_security_params();
if !params.get_server_host_override().is_empty() {
builder = builder
.override_ssl_target(params.get_server_host_override().to_owned());
if let Some(params) = get_security_params(cfg) {
if !params.server_host_override.is_empty() {
builder =
builder.override_ssl_target(params.server_host_override.to_owned());
}
builder =
builder.set_credentials(proto_util::create_test_channel_credentials());
}
builder.connect(addr)
});

let client_type = cfg.get_client_type();
let load_params = cfg.get_load_params();
let client_channels = cfg.get_client_channels() as usize;
let outstanding_rpcs_per_channel = cfg.get_outstanding_rpcs_per_channel() as usize;

let recorder = CpuRecorder::new();
#[cfg(feature = "protobuf-codec")]
let client_type = cfg.client_type;
#[cfg(feature = "protobuf-codec")]
let his_param = cfg.get_histogram_params();
#[cfg(feature = "protobuf-codec")]
let his = Arc::new(Mutex::new(Histogram::new(
his_param.get_resolution(),
his_param.get_max_possible(),
)));
#[cfg(feature = "protobuf-codec")]
let has_poisson = cfg.get_load_params().has_poisson();

#[cfg(feature = "protobufv3-codec")]
let client_type = cfg.client_type.enum_value().unwrap();
#[cfg(feature = "protobufv3-codec")]
let his_param = &cfg.histogram_params;
#[cfg(feature = "protobufv3-codec")]
let his = Arc::new(Mutex::new(Histogram::new(
his_param.resolution,
his_param.max_possible,
)));
#[cfg(feature = "protobufv3-codec")]
let has_poisson = cfg.load_params.has_poisson();

let client_channels = cfg.client_channels as usize;
let outstanding_rpcs_per_channel = cfg.outstanding_rpcs_per_channel as usize;

let recorder = CpuRecorder::new();
let keep_running = Arc::new(AtomicBool::new(true));
let mut running_reqs = Vec::with_capacity(client_channels * outstanding_rpcs_per_channel);

for ch in channels {
for _ in 0..cfg.get_outstanding_rpcs_per_channel() {
for _ in 0..cfg.outstanding_rpcs_per_channel {
let his = his.clone();
let ch = ch.clone();
let rx = if load_params.has_poisson() {
let lambda = load_params.get_poisson().get_offered_load()
/ client_channels as f64
/ outstanding_rpcs_per_channel as f64;
let rx = if has_poisson {
#[cfg(feature = "protobuf-codec")]
let offered_load = cfg.get_load_params().get_poisson().get_offered_load();
#[cfg(feature = "protobufv3-codec")]
let offered_load = cfg.load_params.poisson().offered_load;

let lambda =
offered_load / client_channels as f64 / outstanding_rpcs_per_channel as f64;
let poisson = Poisson::new(lambda);
let (ctx, rx) = ExecutorContext::new(his, keep_running.clone(), poisson);
execute(ctx, ch, client_type, cfg);
Expand All @@ -432,13 +504,13 @@ impl Client {
let mut stats = ClientStats::default();

let sample = self.recorder.cpu_time(reset);
stats.set_time_elapsed(sample.real_time);
stats.set_time_user(sample.user_time);
stats.set_time_system(sample.sys_time);
stats.time_elapsed = sample.real_time;
stats.time_user = sample.user_time;
stats.time_system = sample.sys_time;

{
let mut his = self.histogram.lock().unwrap();
stats.set_latencies(his.report(reset));
stats.latencies = Some(his.report(reset)).into();
}

stats
Expand Down
Loading