Skip to content

Discovery protocol v3 #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 111 additions & 7 deletions endpoint_manifest_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
"$schema": "https://json-schema.org/draft/2020-12/schema",
"type": "object",
"title": "Endpoint",
"description": "Restate endpoint manifest v1",
"description": "Restate endpoint manifest v3",
"properties": {
"protocolMode": {
"title": "ProtocolMode",
"enum": ["BIDI_STREAM", "REQUEST_RESPONSE"]
"enum": [
"BIDI_STREAM",
"REQUEST_RESPONSE"
]
},
"minProtocolVersion": {
"type": "integer",
Expand All @@ -31,9 +34,17 @@
"type": "string",
"pattern": "^([a-zA-Z]|_[a-zA-Z0-9])[a-zA-Z0-9._-]*$"
},
"documentation": {
"type": "string",
"description": "Documentation for this service definition. No format is enforced, but generally Markdown is assumed."
},
"ty": {
"title": "ServiceType",
"enum": ["VIRTUAL_OBJECT", "SERVICE", "WORKFLOW"]
"enum": [
"VIRTUAL_OBJECT",
"SERVICE",
"WORKFLOW"
]
},
"handlers": {
"type": "array",
Expand All @@ -45,9 +56,17 @@
"type": "string",
"pattern": "^([a-zA-Z]|_[a-zA-Z0-9])[a-zA-Z0-9_]*$"
},
"documentation": {
"type": "string",
"description": "Documentation for this handler definition. No format is enforced, but generally Markdown is assumed."
},
"ty": {
"title": "HandlerType",
"enum": ["WORKFLOW", "EXCLUSIVE", "SHARED"],
"enum": [
"WORKFLOW",
"EXCLUSIVE",
"SHARED"
],
"description": "If unspecified, defaults to EXCLUSIVE for Virtual Object or WORKFLOW for Workflows. This should be unset for Services."
},
"input": {
Expand Down Expand Up @@ -122,18 +141,103 @@
"setContentTypeIfEmpty": true
}
}
},
"inactivityTimeout": {
"type": "integer",
"minimum": 0,
"description": "Inactivity timeout duration, expressed in milliseconds."
},
"abortTimeout": {
"type": "integer",
"minimum": 0,
"description": "Abort timeout duration, expressed in milliseconds."
},
"journalRetention": {
"type": "integer",
"minimum": 0,
"description": "Journal retention duration, expressed in milliseconds."
},
"idempotencyRetention": {
"type": "integer",
"minimum": 0,
"description": "Idempotency retention duration, expressed in milliseconds. This is NOT VALID when HandlerType == WORKFLOW"
},
"workflowCompletionRetention": {
"type": "integer",
"minimum": 0,
"description": "Workflow completion retention duration, expressed in milliseconds. This is valid ONLY when HandlerType == WORKFLOW"
},
"enableLazyState": {
"type": "boolean",
"description": "If true, lazy state is enabled."
},
"ingressPrivate": {
"type": "boolean",
"description": "If true, the service cannot be invoked from the HTTP nor Kafka ingress."
},
"metadata": {
"type": "object",
"description": "Custom metadata of this handler definition. This metadata is shown on the Admin API when querying the service/handler definition.",
"additionalProperties": {
"type": "string"
}
}
},
"required": ["name"],
"required": [
"name"
],
"additionalProperties": false
}
},
"inactivityTimeout": {
"type": "integer",
"minimum": 0,
"description": "Inactivity timeout duration, expressed in milliseconds."
},
"abortTimeout": {
"type": "integer",
"minimum": 0,
"description": "Abort timeout duration, expressed in milliseconds."
},
"journalRetention": {
"type": "integer",
"minimum": 0,
"description": "Journal retention duration, expressed in milliseconds."
},
"idempotencyRetention": {
"type": "integer",
"minimum": 0,
"description": "Idempotency retention duration, expressed in milliseconds. When ServiceType == WORKFLOW, this option will be applied only to the shared handlers. See workflowCompletionRetention for more details."
},
"enableLazyState": {
"type": "boolean",
"description": "If true, lazy state is enabled."
},
"ingressPrivate": {
"type": "boolean",
"description": "If true, the service cannot be invoked from the HTTP nor Kafka ingress."
},
"metadata": {
"type": "object",
"description": "Custom metadata of this service definition. This metadata is shown on the Admin API when querying the service definition.",
"additionalProperties": {
"type": "string"
}
}
},
"required": ["name", "ty", "handlers"],
"required": [
"name",
"ty",
"handlers"
],
"additionalProperties": false
}
}
},
"required": ["minProtocolVersion", "maxProtocolVersion", "services"],
"required": [
"minProtocolVersion",
"maxProtocolVersion",
"services"
],
"additionalProperties": false
}
17 changes: 17 additions & 0 deletions macros/src/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,15 @@ impl<'a> ServiceGenerator<'a> {
input: #input_schema,
output: #output_schema,
ty: #handler_ty,
documentation: None,
metadata: Default::default(),
abort_timeout: None,
inactivity_timeout: None,
journal_retention: None,
idempotency_retention: None,
workflow_completion_retention: None,
enable_lazy_state: None,
ingress_private: None,
}
}
});
Expand All @@ -235,6 +244,14 @@ impl<'a> ServiceGenerator<'a> {
name: ::restate_sdk::discovery::ServiceName::try_from(#service_literal.to_string())
.expect("Service name valid"),
handlers: vec![#( #handlers ),*],
documentation: None,
metadata: Default::default(),
abort_timeout: None,
inactivity_timeout: None,
journal_retention: None,
idempotency_retention: None,
enable_lazy_state: None,
ingress_private: None,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[toolchain]
channel = "1.81.0"
channel = "1.82.0"
profile = "minimal"
components = ["rustfmt", "clippy"]
44 changes: 42 additions & 2 deletions src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ mod generated {
include!(concat!(env!("OUT_DIR"), "/endpoint_manifest.rs"));
}

pub use generated::*;

use crate::endpoint::{HandlerOptions, ServiceOptions};
use crate::serde::PayloadMetadata;
pub use generated::*;

impl InputPayload {
pub fn empty() -> Self {
Expand Down Expand Up @@ -48,3 +48,43 @@ impl OutputPayload {
}
}
}

impl Service {
pub(crate) fn apply_options(&mut self, options: ServiceOptions) {
// Apply service options
self.metadata.extend(options.metadata);
self.inactivity_timeout = options.inactivity_timeout.map(|d| d.as_millis() as u64);
self.abort_timeout = options.abort_timeout.map(|d| d.as_millis() as u64);
self.inactivity_timeout = options.inactivity_timeout.map(|d| d.as_millis() as u64);
self.idempotency_retention = options.idempotency_retention.map(|d| d.as_millis() as u64);
self.journal_retention = options.journal_retention.map(|d| d.as_millis() as u64);
self.enable_lazy_state = options.enable_lazy_state;
self.ingress_private = options.ingress_private;

// Apply handler specific options
for (handler_name, handler_options) in options.handler_options {
let handler = self
.handlers
.iter_mut()
.find(|h| handler_name == h.name.as_str())
.expect("Invalid handler name provided in the options");
handler.apply_options(handler_options);
}
}
}

impl Handler {
pub(crate) fn apply_options(&mut self, options: HandlerOptions) {
// Apply handler options
self.metadata.extend(options.metadata);
self.inactivity_timeout = options.inactivity_timeout.map(|d| d.as_millis() as u64);
self.abort_timeout = options.abort_timeout.map(|d| d.as_millis() as u64);
self.inactivity_timeout = options.inactivity_timeout.map(|d| d.as_millis() as u64);
self.idempotency_retention = options.idempotency_retention.map(|d| d.as_millis() as u64);
self.journal_retention = options.journal_retention.map(|d| d.as_millis() as u64);
self.workflow_completion_retention =
options.workflow_retention.map(|d| d.as_millis() as u64);
self.enable_lazy_state = options.enable_lazy_state;
self.ingress_private = options.ingress_private;
}
}
Loading
Loading