Skip to content

in_http: add configurable health endpoint #10441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/in_http/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ static struct flb_config_map config_map[] = {
"Set successful response code. 200, 201 and 204 are supported."
},

{
FLB_CONFIG_MAP_BOOL, "health_check", "false",
0, FLB_TRUE, offsetof(struct flb_http, enable_health_endpoint),
"Enable health check endpoint accessible via GET /health"
},

/* EOF */
{0}
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_http/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ struct flb_http {
struct mk_server *server;

int collector_id;

/* Health endpoint configuration */
int enable_health_endpoint;
};


Expand Down
82 changes: 73 additions & 9 deletions plugins/in_http/http_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#define HTTP_CONTENT_JSON 0
#define HTTP_CONTENT_URLENCODED 1
#define HTTP_HEALTH_ENDPOINT "/health"

static inline char hex2nibble(char c)
{
Expand Down Expand Up @@ -81,6 +82,37 @@ static int sds_uri_decode(flb_sds_t s)
return 0;
}

cfl_sds_t get_health_message()
{
char time_str[128];
cfl_sds_t msg = NULL;
struct tm tm_time;
struct flb_time tm;

flb_time_get(&tm);
gmtime_r(&tm.tm.tv_sec, &tm_time);

snprintf(time_str, sizeof(time_str) - 1, "%04d-%02d-%02dT%02d:%02d:%02d.%09ldZ",
tm_time.tm_year + 1900, tm_time.tm_mon + 1, tm_time.tm_mday,
tm_time.tm_hour, tm_time.tm_min, tm_time.tm_sec, tm.tm.tv_nsec);

msg = cfl_sds_create_size(512);
if (!msg) {
return NULL;
}

cfl_sds_printf(&msg,
"{\n"
" \"status\": \"ok\",\n"
" \"version\": \"%s\",\n"
" \"git_hash\": \"%s\",\n"
" \"timestamp\": \"%s\"\n"
"}\n",
FLB_VERSION_STR, FLB_GIT_HASH, time_str);

return msg;
}

static int send_response(struct http_conn *conn, int http_status, char *message)
{
struct flb_http *context;
Expand Down Expand Up @@ -112,13 +144,25 @@ static int send_response(struct http_conn *conn, int http_status, char *message)
context->success_headers_str);
}
else if (http_status == 200) {
flb_sds_printf(&out,
"HTTP/1.1 200 OK\r\n"
"Server: Fluent Bit v%s\r\n"
"%s"
"Content-Length: 0\r\n\r\n",
FLB_VERSION_STR,
context->success_headers_str);
if (len > 0) {
flb_sds_printf(&out,
"HTTP/1.1 200 OK\r\n"
"Server: Fluent Bit v%s\r\n"
"%s"
"Content-Length: %i\r\n\r\n%s",
FLB_VERSION_STR,
context->success_headers_str,
len, message);
}
else {
flb_sds_printf(&out,
"HTTP/1.1 200 OK\r\n"
"Server: Fluent Bit v%s\r\n"
"%s"
"Content-Length: 0\r\n\r\n",
FLB_VERSION_STR,
context->success_headers_str);
}
}
else if (http_status == 204) {
flb_sds_printf(&out,
Expand Down Expand Up @@ -868,6 +912,7 @@ int http_prot_handle(struct flb_http *ctx, struct http_conn *conn,
int len;
char *uri;
char *qs;
flb_sds_t health;
off_t diff;
flb_sds_t tag;
struct mk_http_header *header;
Expand Down Expand Up @@ -920,8 +965,6 @@ int http_prot_handle(struct flb_http *ctx, struct http_conn *conn,
}
}

mk_mem_free(uri);

/* Check if we have a Host header: Hostname ; port */
mk_http_point_header(&request->host, &session->parser, MK_HEADER_HOST);

Expand All @@ -932,6 +975,7 @@ int http_prot_handle(struct flb_http *ctx, struct http_conn *conn,
/* HTTP/1.1 needs Host header */
if (!request->host.data && request->protocol == MK_HTTP_PROTOCOL_11) {
flb_sds_destroy(tag);
mk_mem_free(uri);
return -1;
}

Expand All @@ -948,14 +992,25 @@ int http_prot_handle(struct flb_http *ctx, struct http_conn *conn,
request->_content_length.data = NULL;
}

if (request->method == MK_METHOD_GET && strcmp(uri, HTTP_HEALTH_ENDPOINT) == 0 && ctx->enable_health_endpoint) {
flb_sds_destroy(tag);
mk_mem_free(uri);
health = get_health_message();
ret = send_response(conn, 200, health);
flb_sds_destroy(health);
return ret;
}

if (request->method != MK_METHOD_POST) {
flb_sds_destroy(tag);
mk_mem_free(uri);
send_response(conn, 400, "error: invalid HTTP method\n");
return -1;
}

ret = process_payload(ctx, conn, tag, session, request);
flb_sds_destroy(tag);
mk_mem_free(uri);

if (ret == 0) {
send_response(conn, ctx->successful_response_code, NULL);
Expand Down Expand Up @@ -1233,8 +1288,10 @@ int http_prot_handle_ng(struct flb_http_request *request,
int ret;
int len;
flb_sds_t tag;
flb_sds_t health;
struct flb_http *ctx;


ctx = (struct flb_http *) response->stream->user_data;
if (request->path[0] != '/') {
send_response_ng(response, 400, "error: invalid request\n");
Expand Down Expand Up @@ -1271,6 +1328,13 @@ int http_prot_handle_ng(struct flb_http_request *request,
return -1;
}

if (request->method == HTTP_METHOD_GET && strcmp(request->path, HTTP_HEALTH_ENDPOINT) == 0 && ctx->enable_health_endpoint) {
flb_sds_destroy(tag);
health = get_health_message();
send_response_ng(response, 200, health);
flb_sds_destroy(health);
return 0;
}
if (request->method != HTTP_METHOD_POST) {
send_response_ng(response, 400, "error: invalid HTTP method\n");
flb_sds_destroy(tag);
Expand Down
120 changes: 120 additions & 0 deletions tests/runtime/in_http.c
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,124 @@ void flb_test_http_tag_key_with_array_input()
test_http_tag_key("[{\"tag\":\"new_tag\",\"test\":\"msg\"}]");
}

void flb_test_http_health_endpoint()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
struct flb_http_client *c;
int ret;
size_t b_sent;

clear_output_num();

cb_data.cb = cb_check_result_json;
cb_data.data = "dummy";

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_input_set(ctx->flb, ctx->i_ffd,
"health_check", "true",
NULL);
TEST_CHECK(ret == 0);

ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "*",
"format", "json",
NULL);
TEST_CHECK(ret == 0);

/* Start the engine */
ret = flb_start(ctx->flb);
if (!TEST_CHECK(ret == 0)) {
TEST_MSG("flb_start failed with ret=%d", ret);
return;
}

ctx->httpc = http_client_ctx_create();
if (!TEST_CHECK(ctx->httpc != NULL)) {
TEST_MSG("http_client_ctx_create failed");
return;
}

/* Test GET /health endpoint */
c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_GET, "/health", NULL, 0,
"127.0.0.1", 9880, NULL, 0);
if (!TEST_CHECK(c != NULL)) {
TEST_MSG("flb_http_client failed");
return;
}

ret = flb_http_do(c, &b_sent);
TEST_CHECK(ret == 0);
TEST_CHECK(c->resp.status == 200);

/* Check response contains expected JSON fields */
TEST_CHECK(c->resp.payload != NULL && c->resp.payload_size > 0);
TEST_CHECK(strstr(c->resp.payload, "\"status\": \"ok\"") != NULL);
TEST_CHECK(strstr(c->resp.payload, "\"version\":") != NULL);
TEST_CHECK(strstr(c->resp.payload, "\"git_hash\":") != NULL);
TEST_CHECK(strstr(c->resp.payload, "\"timestamp\":") != NULL);

flb_http_client_destroy(c);
flb_upstream_conn_release(ctx->httpc->u_conn);
test_ctx_destroy(ctx);
}

void flb_test_http_health_endpoint_disabled()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
struct flb_http_client *c;
int ret;
size_t b_sent;

clear_output_num();

cb_data.cb = cb_check_result_json;
cb_data.data = "dummy";

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_input_set(ctx->flb, ctx->i_ffd,
"health_check", "false",
NULL);
TEST_CHECK(ret == 0);

ret = flb_output_set(ctx->flb, ctx->o_ffd,
"match", "*",
"format", "json",
NULL);
TEST_CHECK(ret == 0);

/* Start the engine */
ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

ctx->httpc = http_client_ctx_create();
TEST_CHECK(ctx->httpc != NULL);

/* Test GET /health endpoint - should return 400 for invalid method */
c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_GET, "/health", NULL, 0,
"127.0.0.1", 9880, NULL, 0);
TEST_CHECK(c != NULL);

ret = flb_http_do(c, &b_sent);
TEST_CHECK(ret == 0);
TEST_CHECK(c->resp.status == 400);

flb_http_client_destroy(c);
flb_upstream_conn_release(ctx->httpc->u_conn);
test_ctx_destroy(ctx);
}

TEST_LIST = {
{"http", flb_test_http},
{"successful_response_code_200", flb_test_http_successful_response_code_200},
Expand All @@ -679,5 +797,7 @@ TEST_LIST = {
{"failure_response_code_400_bad_disk_write", flb_test_http_failure_400_bad_disk_write},
{"tag_key_with_map_input", flb_test_http_tag_key_with_map_input},
{"tag_key_with_array_input", flb_test_http_tag_key_with_array_input},
{"health_endpoint", flb_test_http_health_endpoint},
{"health_endpoint_disabled", flb_test_http_health_endpoint_disabled},
{NULL, NULL}
};
Loading