Skip to content

Commit cac2474

Browse files
committed
fix
Signed-off-by: Shikugawa <[email protected]>
1 parent b634b08 commit cac2474

File tree

1 file changed

+29
-11
lines changed

1 file changed

+29
-11
lines changed

src/dispatcher.rs

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -399,26 +399,44 @@ impl Dispatcher {
399399
}
400400
}
401401

402-
fn on_grpc_call_response(&self, token_id: u32, status_code: u32, response_size: usize) {
403-
let context_id = self
404-
.grpc_callouts
405-
.borrow_mut()
406-
.remove(&token_id)
407-
.expect("invalid token_id");
402+
fn on_grpc_receive(&self, token_id: u32, response_size: usize) {
403+
if let Some(context_id) = self.grpc_callouts.borrow_mut().remove(&token_id) {
404+
if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
405+
self.active_id.set(context_id);
406+
hostcalls::set_effective_context(context_id).unwrap();
407+
http_stream.on_grpc_call_response(token_id, 0, response_size);
408+
} else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
409+
self.active_id.set(context_id);
410+
hostcalls::set_effective_context(context_id).unwrap();
411+
stream.on_grpc_call_response(token_id, 0, response_size);
412+
} else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
413+
self.active_id.set(context_id);
414+
hostcalls::set_effective_context(context_id).unwrap();
415+
root.on_grpc_call_response(token_id, 0, response_size);
416+
}
417+
} else {
418+
// TODO(shikugawa): Try to check grpc stream tokens here
419+
}
420+
}
408421

422+
fn on_grpc_close(&self, token_id: u32, status_code: u32) {
423+
if let Some(context_id) = self.grpc_callouts.borrow_mut().remove(&token_id) {
409424
if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
410425
self.active_id.set(context_id);
411426
hostcalls::set_effective_context(context_id).unwrap();
412-
http_stream.on_grpc_call_response(token_id, status_code, response_size);
427+
http_stream.on_grpc_call_response(token_id, status_code, 0);
413428
} else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
414429
self.active_id.set(context_id);
415430
hostcalls::set_effective_context(context_id).unwrap();
416-
stream.on_grpc_call_response(token_id, status_code, response_size);
431+
stream.on_grpc_call_response(token_id, status_code, 0);
417432
} else if let Some(root) = self.roots.borrow_mut().get_mut(&context_id) {
418433
self.active_id.set(context_id);
419434
hostcalls::set_effective_context(context_id).unwrap();
420-
root.on_grpc_call_response(token_id, status_code, response_size);
435+
root.on_grpc_call_response(token_id, status_code, 0);
421436
}
437+
} else {
438+
// TODO(shikugawa): Try to check grpc stream tokens here
439+
}
422440
}
423441
}
424442

@@ -551,10 +569,10 @@ pub extern "C" fn proxy_on_http_call_response(
551569

552570
#[no_mangle]
553571
pub extern "C" fn proxy_on_grpc_receive(_context_id: u32, token_id: u32, response_size: usize) {
554-
DISPATCHER.with(|dispatcher| dispatcher.on_grpc_call_response(token_id, 0, response_size))
572+
DISPATCHER.with(|dispatcher| dispatcher.on_grpc_receive(token_id, response_size))
555573
}
556574

557575
#[no_mangle]
558576
pub extern "C" fn proxy_on_grpc_close(_context_id: u32, token_id: u32, status_code: u32) {
559-
DISPATCHER.with(|dispatcher| dispatcher.on_grpc_call_response(token_id, status_code, 0))
577+
DISPATCHER.with(|dispatcher| dispatcher.on_grpc_close(token_id, status_code))
560578
}

0 commit comments

Comments
 (0)