Skip to content

Commit b90daf3

Browse files
committed
Add batch switch
Signed-off-by: Xintao <[email protected]>
1 parent dfca98d commit b90daf3

File tree

3 files changed

+30
-8
lines changed

3 files changed

+30
-8
lines changed

src/call/client.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,10 @@ impl<Req> StreamingCallSink<Req> {
331331
}
332332
}
333333

334+
pub fn enable_batch(&mut self, flag: bool) {
335+
self.sink_base.enable_batch = flag;
336+
}
337+
334338
pub fn cancel(&mut self) {
335339
let call = self.call.lock();
336340
call.call.cancel()

src/call/mod.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,10 @@ struct SinkBase {
635635
// Write flags used to control the data to be sent in `buf`.
636636
buf_flags: Option<WriteFlags>,
637637
// Used to records whether there is a message in which `buffer_hint` is false.
638-
buf_buffer_hint: bool,
638+
last_buffer_hint: bool,
639+
// Flag to indicate if enable batch. This behavior will modify the `buffer_hint` to batch messages
640+
// as much as possible.
641+
enable_batch: bool,
639642
send_metadata: bool,
640643
}
641644

@@ -645,8 +648,9 @@ impl SinkBase {
645648
batch_f: None,
646649
buf: Vec::new(),
647650
buf_flags: None,
648-
buf_buffer_hint: true,
651+
last_buffer_hint: true,
649652
send_metadata,
653+
enable_batch: false,
650654
}
651655
}
652656

@@ -666,16 +670,23 @@ impl SinkBase {
666670
self.send_metadata = false;
667671
return Ok(());
668672
}
673+
669674
// If there is already a buffered message waiting to be sent, set `buffer_hint` to true to indicate
670675
// that this is not the last message.
671-
if self.buf_flags.is_some() {
672-
// `start_send` is supposed to be called after `poll_ready` returns ready.
673-
assert!(self.batch_f.is_none());
676+
if !self.buf.is_empty() {
674677
self.start_send_buffer_message(true, call)?;
675678
}
679+
676680
ser(t, &mut self.buf);
677-
self.buf_buffer_hint &= flags.get_buffer_hint();
681+
let hint = flags.get_buffer_hint();
682+
self.last_buffer_hint &= hint;
678683
self.buf_flags = Some(flags);
684+
685+
// If sink disable batch, start sending the message in buffer immediately.
686+
if !self.enable_batch {
687+
self.start_send_buffer_message(hint, call)?;
688+
}
689+
679690
Ok(())
680691
}
681692

@@ -700,8 +711,8 @@ impl SinkBase {
700711
if self.batch_f.is_some() {
701712
ready!(self.poll_ready(cx)?);
702713
}
703-
if self.buf_flags.is_some() {
704-
self.start_send_buffer_message(self.buf_buffer_hint, call)?;
714+
if !self.buf.is_empty() {
715+
self.start_send_buffer_message(self.last_buffer_hint, call)?;
705716
ready!(self.poll_ready(cx)?);
706717
}
707718
Poll::Ready(Ok(()))
@@ -713,6 +724,9 @@ impl SinkBase {
713724
buffer_hint: bool,
714725
call: &mut C,
715726
) -> Result<()> {
727+
// `start_send` is supposed to be called after `poll_ready` returns ready.
728+
assert!(self.batch_f.is_none());
729+
716730
let mut flags = self.buf_flags.clone().unwrap();
717731
flags = flags.buffer_hint(buffer_hint);
718732
let write_f = call.call(|c| {

src/call/server.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,10 @@ macro_rules! impl_stream_sink {
424424
}
425425
}
426426

427+
pub fn enable_batch(&mut self, flag: bool) {
428+
self.base.enable_batch = flag;
429+
}
430+
427431
pub fn set_status(&mut self, status: RpcStatus) {
428432
assert!(self.flush_f.is_none());
429433
self.status = status;

0 commit comments

Comments
 (0)