This project provides a Benthos configuration that acts as a dynamic orchestrator. It listens for control messages on a GCP Pub/Sub subscription and, based on each message, generates and launches a new, separate Benthos instance to perform a data processing task (e.g., Kafka to Pub/Sub).
This serves as a foundational pattern for scenarios where Benthos pipelines need to be created and managed dynamically using Benthos itself.
The generated job is run as a benthos streams process, in a separate process, which is running in parallel to the controller. We can call this the benthos worker.
- Control Message Reception: The main orchestrator Benthos instance listens to a GCP Pub/Sub subscription (e.g.,
benthos-control-sub
). - Dynamic Configuration Generation:
- Upon receiving a control message (JSON payload), it uses one of the Bloblang template to construct a complete Benthos YAML configuration string.
- This generated YAML defines a new Benthos pipeline that includes:
- A source (e.g., Kafka) to consume messages.
- A processor that applies a transformation based on the control message's
data
field. - A sink (e.g., GCP Pub/Sub) to publish processed messages.
- The control message can also include a custom Bloblang mapping string in its
data
field, which will be embedded into the generated pipeline for data transformation.
- Stream Launch:
- The generated YAML is POST-ed to the streams http server.
- A Benthos process is launched in the background within the streams benthos server
- Acknowledgement: Once the streams process is completed, the control message is acknowledged.
config.yaml
: The configuration for the main orchestrator Benthos instance.blob_configs
: The Bloblang templates used to generate configurations for the dynamically launched Benthos instances.
- Benthos: Installed and accessible in your PATH. (Version 4.x recommended). You can use the standalone
benthos
binary or tools that bundle it, likerpk
. - GCP Project:
- A Pub/Sub topic for control messages.
- A Pub/Sub subscription for the orchestrator to listen to (e.g.,
benthos-control-sub
). - Appropriate GCP credentials configured for Benthos (e.g., via
GOOGLE_APPLICATION_CREDENTIALS
environment variable).
- Required resources for the requested job (e.g., Kafka topic, GCP Pub/Sub topic) must exist and be accessible by the dynamically launched Benthos instances.
- Ensure all prerequisites are met and environment variables are set.
- Place
config.yaml
,blob_configs
andobservability.yaml
in the same directory. - Navigate to that directory and run:
- Using
rpk
(Redpanda's tool, which bundles Benthos):rpk connect run config.yaml
- Or, using a standalone Benthos binary:
benthos -c benthos.yaml
- Using
- Open a separate shell and run (to run worker process):
rpk connect streams -o observability.yaml
Publish a JSON message to the GCP Pub/Sub topic that the orchestrator is subscribed to (e.g., projects/<YourProject>/topics/your-control-topic
).
Example Control Message Payload:
{
"kafka_topic": "source-events-main",
"publish_project": "<YourGCPProjectID>",
"publish_topic": "processed-events-destination",
"consumer_name": "dynamic-consumer-group-main",
"client_id": "my-dynamic-benthos-1",
"benthos_meta_info": "ANY META DATA YOU WANT IN LOGS",
"data":"{\"data\":{\"userId\":this.distinct_id.string(),\"key1\":\"val1\",\"key1\":\"val1\",\"metaInfo\":{\"abc\":\"23432\",\"id\":234}}}"
}
Output:
{
"data":{
"metaInfo":{
"abc":"23432",
"id":234
},
"key1":"val1",
"key2":"val2",
"userId":"841622982"
}
}
The data
field within your JSON control message is special. Its value must be a JSON string that itself represents the body of a Bloblang mapping. This mapping will be applied by the dynamically launched Benthos instance to each message it processes from its source (e.g., Kafka).
The orchestrator's dynamic_k2p_config.blobl
takes this string and effectively does:
root.pipeline.processors = root.pipeline.processors.append({ "mapping": "root = " + this.data })
(Where this.data
is the string you provide in the control message).
How to Create the data
String:
- Define Your Target JSON Structure: Decide what the output JSON from the dynamic Benthos instance should look like for each processed message.
- Embed Bloblang Expressions for Dynamic Values: For parts of your target JSON that need to come from the source message (e.g., a field from a Kafka message), use Bloblang expressions.
this.field_name
: Accesses a field from the source message..string()
: Ensures the value is treated as a string..number()
: Ensures the value is treated as a number.meta("kafka_key")
: Accesses Kafka message metadata.
- Format as a String Literal: The entire structure, including Bloblang expressions, must be formatted as a single JSON string value for the
data
key.
Example:
Let's say your source Kafka messages look like:
{
"distinct_id": "841622982",
"event_name": "page_view",
"properties": {
"page_url": "/home"
}
}
And you want to transform it to:
{
"userId": "841622982",
"key1": "val1",
"key2": "val2",
"metaInfo": {
"abc": "23432",
"id": 234
}
}
The data
field in your control message would look like:
{
"data": "{\"data\":{\"userId\":this.distinct_id.string(),\"key1\":\"val1\",\"key2\":\"val2\",\"metaInfo\":{\"abc\":\"23432\",\"id\":234}}}"
}
While the current subprocess-based approach is functional, for a more robust and production-ready orchestrator, we'll move to using the Benthos Streams API. This transition will provide several advantages, including better lifecycle management, observability, and resource efficiency.:
Transition to Benthos Streams API:
Why this is the most impactful change:
The Benthos HTTP API (specifically the /streams
endpoint) allows for creating, managing (reading status, updating), and deleting Benthos streams (pipelines) within a single, long-running Benthos process.
Key Benefits:
- Eliminates Subprocesses: No more
nohup benthos ... &
. This significantly simplifies process management and reduces system overhead. - No Temporary Files: Configurations are sent directly via the API, removing the need for
/tmp/config_temp.yaml
and associated race condition concerns (even if currently mitigated bymax_outstanding_messages: 1
). - Centralized Observability: Logs and metrics from all dynamically created streams can be managed and exposed by the main orchestrator Benthos instance, providing a unified view.
- Lifecycle Management: The Streams API provides endpoints to
CREATE
streams, check their status (GET /streams/{id}
which shows uptime and activity, allowing inference of completion for finite inputs likeread_until
), update (PUT /streams/{id}
), and delete (DELETE /streams/{id}
) dynamic pipelines gracefully. This enables programmatic polling of stream status and robust lifecycle control. - Resource Efficiency: Running multiple stream configurations within one Benthos process is generally more resource-efficient than spawning many individual Benthos processes.
How it would work:
- The orchestrator's main Benthos pipeline would change. Instead of
command
processors to write a file and launch a subprocess, it would use anhttp_client
processor (orhttp
output). - This
http_client
would makePOST /streams/{id}
requests to its own Benthos API endpoint (e.g.,http://localhost:4195/streams/{dynamic_stream_id}
), assuming the main orchestrator's HTTP server is enabled. - The body of the HTTP
POST
request would be the dynamically generated Benthos configuration. Crucially, thedynamic_k2p_config.blobl
(or a similar Bloblang mapping used for this purpose) would need to be adjusted to output a Benthos configuration in JSON format, as the Streams API endpoint expects JSON, not YAML.
Contributions are welcome! Please open an issue to discuss any changes or new features before submitting a pull request. Focus areas include implementing the "Future Enhancements" listed above, especially the transition to the Benthos Streams API.
This project is licensed under the MIT License.