@@ -22,6 +22,9 @@ use crate::error::{Error, Result};
22
22
use crate :: grpc_sys:: grpc_status_code:: * ;
23
23
use crate :: task:: { self , BatchFuture , BatchType , CallTag } ;
24
24
25
+ // By default buffers in `SinkBase` will be shrink to 4K size.
26
+ const BUF_SHRINK_SIZE : usize = 4 * 1024 ;
27
+
25
28
/// An gRPC status code structure.
26
29
/// This type contains constants for all gRPC status codes.
27
30
#[ derive( PartialEq , Eq , Clone , Copy ) ]
@@ -651,17 +654,24 @@ impl SinkBase {
651
654
flags : WriteFlags ,
652
655
ser : SerializeFn < T > ,
653
656
) -> Result < ( ) > {
657
+ // temporary fix: buffer hint with send meta will not send out any metadata.
658
+ // note: only the first message can enter this code block.
659
+ if self . send_metadata {
660
+ ser ( t, & mut self . buf ) ;
661
+ self . buf_flags = Some ( flags) ;
662
+ self . start_send_buffer_message ( false , call) ?;
663
+ self . send_metadata = false ;
664
+ return Ok ( ( ) ) ;
665
+ }
654
666
// If there is already a buffered message waiting to be sent, set `buffer_hint` to true to indicate
655
667
// that this is not the last message.
656
668
if self . buf_flags . is_some ( ) {
657
669
// `start_send` is supposed to be called after `poll_ready` returns ready.
658
670
assert ! ( self . batch_f. is_none( ) ) ;
659
671
self . start_send_buffer_message ( true , call) ?;
660
672
}
661
-
662
673
ser ( t, & mut self . buf ) ;
663
674
self . buf_flags = Some ( flags) ;
664
-
665
675
Ok ( ( ) )
666
676
}
667
677
@@ -700,22 +710,20 @@ impl SinkBase {
700
710
buffer_hint : bool ,
701
711
call : & mut C ,
702
712
) -> Result < ( ) > {
703
- let mut flags = self . buf_flags . clone ( ) . unwrap ( ) ;
704
- flags = flags. buffer_hint ( buffer_hint) ;
705
- if flags. get_buffer_hint ( ) && self . send_metadata {
706
- // temporary fix: buffer hint with send meta will not send out any metadata.
707
- flags = flags. buffer_hint ( false ) ;
708
- }
709
-
713
+ let flags = self . buf_flags . clone ( ) . unwrap ( ) ;
714
+ flags. buffer_hint ( buffer_hint) ;
710
715
let write_f = call. call ( |c| {
711
716
c. call
712
717
. start_send_message ( & self . buf , flags. flags , self . send_metadata )
713
718
} ) ?;
714
-
715
- self . send_metadata = false ;
716
719
self . batch_f = Some ( write_f) ;
717
- self . buf . clear ( ) ;
718
720
self . buf_flags . take ( ) ;
721
+ // NOTE: Content of `self.buf` is copied into grpc internal.
722
+ self . buf . clear ( ) ;
723
+ if self . buf . capacity ( ) > BUF_SHRINK_SIZE {
724
+ self . buf . truncate ( BUF_SHRINK_SIZE ) ;
725
+ self . buf . shrink_to_fit ( ) ;
726
+ }
719
727
Ok ( ( ) )
720
728
}
721
729
}
0 commit comments