-
Notifications
You must be signed in to change notification settings - Fork 10
Implement ingress client #42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
cdee0c1
0854d8e
ace42a6
4c9fe9f
52c83f3
ca36456
7f6c852
993547e
a91c42a
e186ae1
7dbaeb5
2b6bb35
84cfb1b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,168 @@ | ||
use std::time::Duration; | ||
|
||
use reqwest::{header::HeaderMap, Url}; | ||
use serde::{de::DeserializeOwned, Deserialize, Serialize}; | ||
|
||
use super::{ | ||
request::{IngressRequestOptions, SendResponse, SendStatus}, | ||
result::{IngressResultOptions, ResultOp, ResultTarget}, | ||
}; | ||
use crate::{context::RequestTarget, errors::TerminalError}; | ||
|
||
const IDEMPOTENCY_KEY_HEADER: &str = "Idempotency-Key"; | ||
|
||
#[derive(Deserialize)] | ||
#[serde(rename_all = "camelCase")] | ||
struct SendResponseSchema { | ||
invocation_id: String, | ||
status: SendStatusSchema, | ||
} | ||
|
||
#[derive(Deserialize)] | ||
enum SendStatusSchema { | ||
Accepted, | ||
PreviouslyAccepted, | ||
} | ||
|
||
impl From<SendStatusSchema> for SendStatus { | ||
fn from(value: SendStatusSchema) -> Self { | ||
match value { | ||
SendStatusSchema::Accepted => SendStatus::Accepted, | ||
SendStatusSchema::PreviouslyAccepted => SendStatus::PreviouslyAccepted, | ||
} | ||
} | ||
} | ||
|
||
#[derive(Deserialize)] | ||
struct TerminalErrorSchema { | ||
code: Option<u16>, | ||
message: String, | ||
} | ||
|
||
pub(super) struct IngressInternal { | ||
pub(super) client: reqwest::Client, | ||
pub(super) url: Url, | ||
pub(super) headers: HeaderMap, | ||
} | ||
|
||
impl IngressInternal { | ||
pub(super) async fn call<Req: Serialize, Res: DeserializeOwned>( | ||
&self, | ||
target: RequestTarget, | ||
req: Req, | ||
opts: IngressRequestOptions, | ||
) -> Result<Result<Res, TerminalError>, reqwest::Error> { | ||
patrickariel marked this conversation as resolved.
Show resolved
Hide resolved
patrickariel marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let mut headers = self.headers.clone(); | ||
if let Some(key) = opts.idempotency_key { | ||
headers.append(IDEMPOTENCY_KEY_HEADER, key); | ||
} | ||
|
||
let url = format!("{}/{target}", self.url.as_str().trim_end_matches("/")); | ||
|
||
let mut builder = self.client.post(url).headers(headers).json(&req); | ||
|
||
if let Some(timeout) = opts.timeout { | ||
builder = builder.timeout(timeout); | ||
} | ||
|
||
let res = builder.send().await?; | ||
|
||
if let Err(e) = res.error_for_status_ref() { | ||
let status = res.status().as_u16(); | ||
if let Ok(e) = res.json::<TerminalErrorSchema>().await { | ||
Ok(Err(TerminalError::new_with_code( | ||
e.code.unwrap_or(status), | ||
e.message, | ||
))) | ||
} else { | ||
Err(e) | ||
} | ||
} else { | ||
Ok(Ok(res.json::<Res>().await?)) | ||
} | ||
} | ||
|
||
pub(super) async fn send<Req: Serialize>( | ||
&self, | ||
target: RequestTarget, | ||
req: Req, | ||
opts: IngressRequestOptions, | ||
delay: Option<Duration>, | ||
) -> Result<Result<SendResponse, TerminalError>, reqwest::Error> { | ||
let mut headers = self.headers.clone(); | ||
let attachable = if let Some(key) = opts.idempotency_key { | ||
headers.append(IDEMPOTENCY_KEY_HEADER, key); | ||
true | ||
} else { | ||
false | ||
}; | ||
|
||
let url = if let Some(delay) = delay { | ||
format!( | ||
"{}/{target}/send?delay={}ms", | ||
self.url.as_str().trim_end_matches("/"), | ||
delay.as_millis() | ||
) | ||
} else { | ||
format!("{}/{target}/send", self.url.as_str().trim_end_matches("/")) | ||
}; | ||
|
||
let mut builder = self.client.post(url).headers(headers).json(&req); | ||
|
||
if let Some(timeout) = opts.timeout { | ||
builder = builder.timeout(timeout); | ||
} | ||
|
||
let res = builder.send().await?; | ||
|
||
if let Err(e) = res.error_for_status_ref() { | ||
let status = res.status().as_u16(); | ||
if let Ok(e) = res.json::<TerminalErrorSchema>().await { | ||
Ok(Err(TerminalError::new_with_code( | ||
e.code.unwrap_or(status), | ||
e.message, | ||
))) | ||
} else { | ||
Err(e) | ||
} | ||
} else { | ||
let res = res.json::<SendResponseSchema>().await?; | ||
Ok(Ok(SendResponse { | ||
invocation_id: res.invocation_id, | ||
status: res.status.into(), | ||
attachable, | ||
})) | ||
} | ||
} | ||
|
||
pub(super) async fn result<Res: DeserializeOwned>( | ||
&self, | ||
target: ResultTarget, | ||
op: ResultOp, | ||
opts: IngressResultOptions, | ||
) -> Result<Result<Res, TerminalError>, reqwest::Error> { | ||
let url = format!("{}/{target}/{op}", self.url.as_str().trim_end_matches("/")); | ||
|
||
let mut builder = self.client.get(url).headers(self.headers.clone()); | ||
|
||
if let Some(timeout) = opts.timeout { | ||
builder = builder.timeout(timeout); | ||
} | ||
|
||
let res = builder.send().await?; | ||
|
||
if let Err(e) = res.error_for_status_ref() { | ||
let status = res.status().as_u16(); | ||
if let Ok(e) = res.json::<TerminalErrorSchema>().await { | ||
Ok(Err(TerminalError::new_with_code( | ||
e.code.unwrap_or(status), | ||
e.message, | ||
))) | ||
} else { | ||
Err(e) | ||
} | ||
} else { | ||
Ok(Ok(res.json::<Res>().await?)) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
use reqwest::{header::HeaderMap, Url}; | ||
|
||
use self::{ | ||
internal::IngressInternal, | ||
request::IngressRequest, | ||
result::{IngressResult, ResultTarget}, | ||
}; | ||
use crate::context::RequestTarget; | ||
|
||
pub mod internal; | ||
pub mod request; | ||
pub mod result; | ||
|
||
/// A client for invoking handlers via the ingress. | ||
pub struct IngressClient { | ||
inner: IngressInternal, | ||
} | ||
|
||
impl IngressClient { | ||
/// Create a new [`IngressClient`]. | ||
pub fn new(url: Url) -> Self { | ||
Self { | ||
inner: IngressInternal { | ||
client: reqwest::Client::new(), | ||
url, | ||
headers: Default::default(), | ||
}, | ||
} | ||
} | ||
|
||
/// Create a new [`IngressClient`] with custom headers. | ||
pub fn new_with_headers(url: Url, headers: HeaderMap) -> Self { | ||
Self { | ||
inner: IngressInternal { | ||
client: reqwest::Client::new(), | ||
url, | ||
headers, | ||
}, | ||
} | ||
} | ||
|
||
/// Create a new [`IngressRequest`]. | ||
pub fn request<Req, Res>(&self, target: RequestTarget, req: Req) -> IngressRequest<Req, Res> { | ||
IngressRequest::new(&self.inner, target, req) | ||
} | ||
|
||
/// Create a new [`IngressResult`]. | ||
pub fn result<Res>(&self, target: ResultTarget) -> IngressResult<Res> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the naming result should be replaced by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem here is that the method returns a type that the user will have to either call The actual usage will look something like this (see my first post for more examples): let res = client
.service_result::<MyServiceResult>()
.my_handler("lorem_ipsum") // <- this is the idempotency key
.attach() // or .output()
.await?;
let res = client
.workflow_result::<MyWorkflowResult>("Me")
.attach() // or .output()
.await?; The term "result" was taken from this part of the docs:
Do you have any suggestions for a better term? Some ideas: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ahhhh got it, i think then the name should be |
||
IngressResult::new(&self.inner, target) | ||
} | ||
|
||
pub fn service_ingress<'a, I>(&'a self) -> I | ||
where | ||
I: IntoServiceIngress<'a>, | ||
{ | ||
I::create_ingress(self) | ||
} | ||
|
||
pub fn object_ingress<'a, I>(&'a self, key: impl Into<String>) -> I | ||
where | ||
I: IntoObjectIngress<'a>, | ||
{ | ||
I::create_ingress(self, key.into()) | ||
} | ||
|
||
pub fn workflow_ingress<'a, I>(&'a self, id: impl Into<String>) -> I | ||
where | ||
I: IntoWorkflowIngress<'a>, | ||
{ | ||
I::create_ingress(self, id.into()) | ||
} | ||
|
||
pub fn invocation_result<'a, Res>( | ||
&'a self, | ||
invocation_id: impl Into<String>, | ||
) -> IngressResult<'a, Res> { | ||
self.result(ResultTarget::invocation(invocation_id)) | ||
} | ||
|
||
pub fn service_result<'a, R>(&'a self) -> R | ||
where | ||
R: IntoServiceResult<'a>, | ||
{ | ||
R::create_result(self) | ||
} | ||
|
||
pub fn object_result<'a, R>(&'a self, key: impl Into<String>) -> R | ||
where | ||
R: IntoObjectResult<'a>, | ||
{ | ||
R::create_result(self, key.into()) | ||
} | ||
|
||
pub fn workflow_result<'a, R>(&'a self, id: impl Into<String>) -> R | ||
where | ||
R: IntoWorkflowResult<'a>, | ||
{ | ||
R::create_result(self, id.into()) | ||
} | ||
} | ||
|
||
/// Trait used by codegen to use the service ingress. | ||
pub trait IntoServiceIngress<'a>: Sized { | ||
fn create_ingress(client: &'a IngressClient) -> Self; | ||
} | ||
|
||
/// Trait used by codegen to use the object ingress. | ||
pub trait IntoObjectIngress<'a>: Sized { | ||
fn create_ingress(client: &'a IngressClient, key: String) -> Self; | ||
} | ||
|
||
/// Trait used by codegen to use the workflow ingress. | ||
pub trait IntoWorkflowIngress<'a>: Sized { | ||
fn create_ingress(client: &'a IngressClient, id: String) -> Self; | ||
} | ||
|
||
/// Trait used by codegen to retrieve the service result. | ||
pub trait IntoServiceResult<'a>: Sized { | ||
fn create_result(client: &'a IngressClient) -> Self; | ||
} | ||
|
||
/// Trait used by codegen to retrieve the object result. | ||
pub trait IntoObjectResult<'a>: Sized { | ||
fn create_result(client: &'a IngressClient, key: String) -> Self; | ||
} | ||
|
||
/// Trait used by codegen to retrieve the workflow result. | ||
pub trait IntoWorkflowResult<'a>: Sized { | ||
fn create_result(client: &'a IngressClient, id: String) -> Self; | ||
} |
Uh oh!
There was an error while loading. Please reload this page.