Skip to content

metadata: avoid change metadata ref #566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions grpc-sys/grpc_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,33 +283,13 @@ grpcwrap_request_call_context_destroy(grpcwrap_request_call_context* ctx) {
GPR_EXPORT void GPR_CALLTYPE grpcwrap_batch_context_take_recv_initial_metadata(
grpcwrap_batch_context* ctx, grpc_metadata_array* res) {
grpcwrap_metadata_array_move(res, &(ctx->recv_initial_metadata));

/* According to the documentation for struct grpc_op in grpc_types.h,
* ownership of keys and values for
* metadata stays with the call object. This means we have ref each of the
* keys and values here. */
size_t i;
for (i = 0; i < res->count; i++) {
grpc_slice_ref(res->metadata[i].key);
grpc_slice_ref(res->metadata[i].value);
}
}

GPR_EXPORT void GPR_CALLTYPE
grpcwrap_batch_context_take_recv_status_on_client_trailing_metadata(
grpcwrap_batch_context* ctx, grpc_metadata_array* res) {
grpcwrap_metadata_array_move(res,
&(ctx->recv_status_on_client.trailing_metadata));

/* According to the documentation for struct grpc_op in grpc_types.h,
* ownership of keys and values for
* metadata stays with the call object. This means we have ref each of the
* keys and values here. */
size_t i;
for (i = 0; i < res->count; i++) {
grpc_slice_ref(res->metadata[i].key);
grpc_slice_ref(res->metadata[i].value);
}
}

GPR_EXPORT const char* GPR_CALLTYPE
Expand Down
2 changes: 1 addition & 1 deletion proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ protobuf = "2"
lazy_static = { version = "1.3", optional = true }

[build-dependencies]
protobuf-build = { version = ">=0.12", default-features = false }
protobuf-build = { version = ">=0.13", default-features = false }
walkdir = "2.2"
2 changes: 1 addition & 1 deletion src/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const INLINED_SIZE: usize = mem::size_of::<libc::size_t>() + mem::size_of::<*mut
/// A convenient rust wrapper for the type `grpc_slice`.
///
/// It's expected that the slice should be initialized.
#[repr(C)]
#[repr(transparent)]
pub struct GrpcSlice(grpc_slice);

impl GrpcSlice {
Expand Down
35 changes: 20 additions & 15 deletions src/call/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::call::{check_run, Call, MessageReader, Method};
use crate::channel::Channel;
use crate::codec::{DeserializeFn, SerializeFn};
use crate::error::{Error, Result};
use crate::metadata::{Metadata, MetadataBuilder};
use crate::metadata::{Metadata, UnownedMetadata};
use crate::task::{BatchFuture, BatchType};

/// Update the flag bit in res.
Expand Down Expand Up @@ -225,8 +225,8 @@ pub struct ClientUnaryReceiver<T> {
resp_de: DeserializeFn<T>,
finished: bool,
message: Option<T>,
initial_metadata: Metadata,
trailing_metadata: Metadata,
initial_metadata: UnownedMetadata,
trailing_metadata: UnownedMetadata,
}

impl<T> ClientUnaryReceiver<T> {
Expand All @@ -237,8 +237,8 @@ impl<T> ClientUnaryReceiver<T> {
resp_de,
finished: false,
message: None,
initial_metadata: MetadataBuilder::new().build(),
trailing_metadata: MetadataBuilder::new().build(),
initial_metadata: UnownedMetadata::empty(),
trailing_metadata: UnownedMetadata::empty(),
}
}

Expand Down Expand Up @@ -274,12 +274,14 @@ impl<T> ClientUnaryReceiver<T> {
/// Get the initial metadata.
pub async fn headers(&mut self) -> Result<&Metadata> {
self.wait_for_batch_future().await?;
Ok(&self.initial_metadata)
// Because we have a reference to call, so it's safe to read.
Ok(unsafe { self.initial_metadata.assume_valid() })
}

pub async fn trailers(&mut self) -> Result<&Metadata> {
self.wait_for_batch_future().await?;
Ok(&self.trailing_metadata)
// Because we have a reference to call, so it's safe to read.
Ok(unsafe { self.trailing_metadata.assume_valid() })
}

pub fn receive_sync(&mut self) -> Result<(Metadata, T, Metadata)> {
Expand Down Expand Up @@ -325,8 +327,8 @@ pub struct ClientCStreamReceiver<T> {
resp_de: DeserializeFn<T>,
finished: bool,
message: Option<T>,
initial_metadata: Metadata,
trailing_metadata: Metadata,
initial_metadata: UnownedMetadata,
trailing_metadata: UnownedMetadata,
}

impl<T> ClientCStreamReceiver<T> {
Expand All @@ -337,8 +339,8 @@ impl<T> ClientCStreamReceiver<T> {
resp_de,
finished: false,
message: None,
initial_metadata: MetadataBuilder::new().build(),
trailing_metadata: MetadataBuilder::new().build(),
initial_metadata: UnownedMetadata::empty(),
trailing_metadata: UnownedMetadata::empty(),
}
}

Expand Down Expand Up @@ -378,12 +380,14 @@ impl<T> ClientCStreamReceiver<T> {
/// Get the initial metadata.
pub async fn headers(&mut self) -> Result<&Metadata> {
self.wait_for_batch_future().await?;
Ok(&self.initial_metadata)
// We still have a reference in share call.
Ok(unsafe { self.initial_metadata.assume_valid() })
}

pub async fn trailers(&mut self) -> Result<&Metadata> {
self.wait_for_batch_future().await?;
Ok(&self.trailing_metadata)
// We still have a reference in share call.
Ok(unsafe { self.trailing_metadata.assume_valid() })
}
}

Expand Down Expand Up @@ -550,7 +554,7 @@ struct ResponseStreamImpl<H, T> {
read_done: bool,
finished: bool,
resp_de: DeserializeFn<T>,
headers_f: FutureOrValue<BatchFuture, Metadata>,
headers_f: FutureOrValue<BatchFuture, UnownedMetadata>,
}

impl<H: ShareCallHolder + Unpin, T> ResponseStreamImpl<H, T> {
Expand Down Expand Up @@ -623,7 +627,8 @@ impl<H: ShareCallHolder + Unpin, T> ResponseStreamImpl<H, T> {
self.headers_f = FutureOrValue::Value(Pin::new(f).await?.initial_metadata);
}
match &self.headers_f {
FutureOrValue::Value(v) => Ok(v),
// We still have reference to call.
FutureOrValue::Value(v) => Ok(unsafe { v.assume_valid() }),
_ => unreachable!(),
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/call/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::task::{Context, Poll};
use std::{ptr, slice};

use crate::grpc_sys::{self, grpc_call, grpc_call_error, grpcwrap_batch_context};
use crate::metadata::UnownedMetadata;
use crate::{cq::CompletionQueue, Metadata, MetadataBuilder};
use futures_util::ready;
use libc::c_void;
Expand Down Expand Up @@ -291,8 +292,8 @@ impl BatchContext {
///
/// If initial metadata is not fetched or the method has been called, empty metadata will be
/// returned.
pub fn take_initial_metadata(&mut self) -> Metadata {
let mut res = MetadataBuilder::with_capacity(0).build();
pub fn take_initial_metadata(&mut self) -> UnownedMetadata {
let mut res = UnownedMetadata::empty();
unsafe {
grpcio_sys::grpcwrap_batch_context_take_recv_initial_metadata(
self.ctx,
Expand All @@ -306,8 +307,8 @@ impl BatchContext {
///
/// If trailing metadata is not fetched or the method has been called, empty metadata will be
/// returned.
pub fn take_trailing_metadata(&mut self) -> Metadata {
let mut res = MetadataBuilder::with_capacity(0).build();
pub fn take_trailing_metadata(&mut self) -> UnownedMetadata {
let mut res = UnownedMetadata::empty();
unsafe {
grpc_sys::grpcwrap_batch_context_take_recv_status_on_client_trailing_metadata(
self.ctx,
Expand Down
38 changes: 33 additions & 5 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl MetadataBuilder {
///
/// Metadata value can be ascii string or bytes. They are distinguish by the
/// key suffix, key of bytes value should have suffix '-bin'.
#[repr(C)]
#[repr(transparent)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seem that all struct with single field using repr(C) can be replaced with repr(transparent).

pub struct Metadata(grpc_metadata_array);

impl Metadata {
Expand Down Expand Up @@ -234,10 +234,6 @@ impl Metadata {
}
&[]
}

pub(crate) fn as_mut_ptr(&mut self) -> *mut grpc_metadata_array {
&mut self.0 as _
}
}

impl fmt::Debug for Metadata {
Expand Down Expand Up @@ -273,6 +269,38 @@ impl Drop for Metadata {
unsafe impl Send for Metadata {}
unsafe impl Sync for Metadata {}

/// A special metadata that only for receiving metadata from remote.
///
/// gRPC C Core manages metadata internally, it's unsafe to read them unless
/// call is not destroyed.
#[repr(transparent)]
pub struct UnownedMetadata(grpc_metadata_array);

impl UnownedMetadata {
#[inline]
pub fn empty() -> UnownedMetadata {
unsafe { mem::transmute(Metadata::with_capacity(0)) }
}
#[inline]
pub unsafe fn assume_valid(&self) -> &Metadata {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe convert_to_owned better

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a convertion. It's more like a as_ref/deref. The returned value is not owned, it just reuses the struct for accessing APIs. That's why a reference is returned.

mem::transmute(self)
}

pub fn as_mut_ptr(&mut self) -> *mut grpc_metadata_array {
&mut self.0 as _
}
}

impl Drop for UnownedMetadata {
#[inline]
fn drop(&mut self) {
unsafe { grpcio_sys::grpcwrap_metadata_array_destroy_metadata_only(&mut self.0) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why free here because I don't see any pointer init

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

unsafe impl Send for UnownedMetadata {}
unsafe impl Sync for UnownedMetadata {}

/// Immutable metadata iterator
///
/// This struct is created by the iter method on `Metadata`.
Expand Down
2 changes: 1 addition & 1 deletion src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::error::{Error, Result};
use crate::server::RequestCallContext;

pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork};
pub use self::promise::BatchResult;
pub(crate) use self::promise::BatchResult;
pub use self::promise::BatchType;

/// A handle that is used to notify future that the task finishes.
Expand Down
14 changes: 7 additions & 7 deletions src/task/promise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;
use super::Inner;
use crate::call::{BatchContext, MessageReader, RpcStatusCode};
use crate::error::Error;
use crate::{Metadata, MetadataBuilder};
use crate::metadata::UnownedMetadata;

/// Batch job type.
#[derive(PartialEq, Debug)]
Expand All @@ -22,25 +22,25 @@ pub enum BatchType {
/// A promise result which stores a message reader with bundled metadata.
pub struct BatchResult {
pub message_reader: Option<MessageReader>,
pub initial_metadata: Metadata,
pub trailing_metadata: Metadata,
pub initial_metadata: UnownedMetadata,
pub trailing_metadata: UnownedMetadata,
}

impl BatchResult {
pub fn new(
message_reader: Option<MessageReader>,
initial_metadata: Option<Metadata>,
trailing_metadata: Option<Metadata>,
initial_metadata: Option<UnownedMetadata>,
trailing_metadata: Option<UnownedMetadata>,
) -> BatchResult {
let initial_metadata = if let Some(m) = initial_metadata {
m
} else {
MetadataBuilder::new().build()
UnownedMetadata::empty()
};
let trailing_metadata = if let Some(m) = trailing_metadata {
m
} else {
MetadataBuilder::new().build()
UnownedMetadata::empty()
};
BatchResult {
message_reader,
Expand Down