Skip to content

Commit f3fa471

Browse files
olegrokfilonenko-mikhail
authored andcommitted
Implement stats callback
This patch introduces a way to speicify stats_callback for producer and consumer that will receive json string with librdkafka stats. See also: - https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md - https://github.com/edenhill/librdkafka/blob/f092c290995ca81b3afb4015fcc3350ba02caa96/src/rdkafka.h#L2068 Closes #43
1 parent ce79a31 commit f3fa471

File tree

11 files changed

+237
-18
lines changed

11 files changed

+237
-18
lines changed

kafka/callbacks.c

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,21 @@ log_callback(const rd_kafka_t *rd_kafka, int level, const char *fac, const char
6060
}
6161
}
6262

63+
int
64+
stats_callback(rd_kafka_t *rd_kafka, char *json, size_t json_len, void *opaque) {
65+
(void)opaque;
66+
(void)json_len;
67+
event_queues_t *event_queues = rd_kafka_opaque(rd_kafka);
68+
if (event_queues != NULL && event_queues->stats_queue != NULL) {
69+
if (json != NULL) {
70+
if (queue_push(event_queues->stats_queue, json) != 0)
71+
return 0; // destroy json after return
72+
return 1; // json should be freed manually
73+
}
74+
}
75+
return 0;
76+
}
77+
6378
/**
6479
* Handle errors from RDKafka
6580
*/
@@ -291,6 +306,8 @@ event_queues_t *new_event_queues() {
291306
event_queues->error_cb_ref = LUA_REFNIL;
292307
event_queues->log_queue = NULL;
293308
event_queues->log_cb_ref = LUA_REFNIL;
309+
event_queues->stats_queue = NULL;
310+
event_queues->stats_cb_ref = LUA_REFNIL;
294311
event_queues->delivery_queue = NULL;
295312
event_queues->rebalance_cb_ref = LUA_REFNIL;
296313
event_queues->rebalance_queue = NULL;
@@ -321,6 +338,15 @@ void destroy_event_queues(struct lua_State *L, event_queues_t *event_queues) {
321338
}
322339
destroy_queue(event_queues->log_queue);
323340
}
341+
if (event_queues->stats_queue != NULL) {
342+
char *stats_json = NULL;
343+
while (true) {
344+
stats_json = queue_pop(event_queues->stats_queue);
345+
if (stats_json == NULL)
346+
break;
347+
}
348+
destroy_queue(event_queues->stats_queue);
349+
}
324350
if (event_queues->error_queue != NULL) {
325351
error_msg_t *msg = NULL;
326352
while (true) {
@@ -357,15 +383,9 @@ void destroy_event_queues(struct lua_State *L, event_queues_t *event_queues) {
357383
}
358384
destroy_queue(event_queues->rebalance_queue);
359385
}
360-
if (event_queues->error_cb_ref != LUA_REFNIL) {
361-
luaL_unref(L, LUA_REGISTRYINDEX, event_queues->error_cb_ref);
362-
}
363-
364-
if (event_queues->log_cb_ref != LUA_REFNIL) {
365-
luaL_unref(L, LUA_REGISTRYINDEX, event_queues->log_cb_ref);
366-
}
367-
if (event_queues->rebalance_cb_ref != LUA_REFNIL) {
368-
luaL_unref(L, LUA_REGISTRYINDEX, event_queues->rebalance_cb_ref);
369-
}
386+
luaL_unref(L, LUA_REGISTRYINDEX, event_queues->error_cb_ref);
387+
luaL_unref(L, LUA_REGISTRYINDEX, event_queues->log_cb_ref);
388+
luaL_unref(L, LUA_REGISTRYINDEX, event_queues->stats_cb_ref);
389+
luaL_unref(L, LUA_REGISTRYINDEX, event_queues->rebalance_cb_ref);
370390
free(event_queues);
371391
}

kafka/callbacks.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define TNT_KAFKA_CALLBACKS_H
33

44
#include <pthread.h>
5+
#include <stddef.h>
56

67
#include <lua.h>
78
#include <lualib.h>
@@ -32,6 +33,12 @@ void destroy_log_msg(log_msg_t *msg);
3233

3334
void log_callback(const rd_kafka_t *rd_kafka, int level, const char *fac, const char *buf);
3435

36+
/**
37+
* Handle stats from RDKafka
38+
*/
39+
40+
int stats_callback(rd_kafka_t *rd_kafka, char *json, size_t json_len, void *opaque);
41+
3542

3643
/**
3744
* Handle errors from RDKafka
@@ -96,6 +103,8 @@ typedef struct {
96103
queue_t *consume_queue;
97104
queue_t *log_queue;
98105
int log_cb_ref;
106+
queue_t *stats_queue;
107+
int stats_cb_ref;
99108
queue_t *error_queue;
100109
int error_cb_ref;
101110
queue_t *delivery_queue;

kafka/consumer.c

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,48 @@ lua_consumer_poll_logs(struct lua_State *L) {
301301
return 2;
302302
}
303303

304+
int
305+
lua_consumer_poll_stats(struct lua_State *L) {
306+
if (lua_gettop(L) != 2)
307+
luaL_error(L, "Usage: count, err = consumer:poll_stats(limit)");
308+
309+
consumer_t *consumer = lua_check_consumer(L, 1);
310+
if (consumer->event_queues == NULL ||
311+
consumer->event_queues->stats_queue == NULL ||
312+
consumer->event_queues->stats_cb_ref == LUA_REFNIL) {
313+
lua_pushnumber(L, 0);
314+
lua_pushliteral(L, "Consumer poll stats error: callback for logs is not set");
315+
return 2;
316+
}
317+
318+
int limit = lua_tonumber(L, 2);
319+
char *json = NULL;
320+
int count = 0;
321+
char *err_str = NULL;
322+
while (count < limit) {
323+
json = queue_pop(consumer->event_queues->stats_queue);
324+
if (json == NULL)
325+
break;
326+
count++;
327+
lua_rawgeti(L, LUA_REGISTRYINDEX, consumer->event_queues->stats_cb_ref);
328+
lua_pushstring(L, json);
329+
/* do the call (1 arguments, 0 result) */
330+
if (lua_pcall(L, 1, 0, 0) != 0)
331+
err_str = (char*)lua_tostring(L, -1);
332+
333+
free(json);
334+
335+
if (err_str != NULL)
336+
break;
337+
}
338+
lua_pushnumber(L, (double)count);
339+
if (err_str != NULL) {
340+
lua_pushstring(L, err_str);
341+
return 2;
342+
}
343+
return 1;
344+
}
345+
304346
int
305347
lua_consumer_poll_errors(struct lua_State *L) {
306348
if (lua_gettop(L) != 2)
@@ -660,7 +702,7 @@ lua_create_consumer(struct lua_State *L) {
660702
}
661703

662704
lua_pushstring(L, "log_callback");
663-
lua_gettable(L, -2 );
705+
lua_gettable(L, -2);
664706
if (lua_isfunction(L, -1)) {
665707
event_queues->log_cb_ref = luaL_ref(L, LUA_REGISTRYINDEX);
666708
event_queues->log_queue = new_queue();
@@ -669,6 +711,16 @@ lua_create_consumer(struct lua_State *L) {
669711
lua_pop(L, 1);
670712
}
671713

714+
lua_pushstring(L, "stats_callback");
715+
lua_gettable(L, -2);
716+
if (lua_isfunction(L, -1)) {
717+
event_queues->stats_cb_ref = luaL_ref(L, LUA_REGISTRYINDEX);
718+
event_queues->stats_queue = new_queue();
719+
rd_kafka_conf_set_stats_cb(rd_config, stats_callback);
720+
} else {
721+
lua_pop(L, 1);
722+
}
723+
672724
lua_pushstring(L, "rebalance_callback");
673725
lua_gettable(L, -2 );
674726
if (lua_isfunction(L, -1)) {

kafka/consumer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ int lua_consumer_poll_msg(struct lua_State *L);
4242

4343
int lua_consumer_poll_logs(struct lua_State *L);
4444

45+
int lua_consumer_poll_stats(struct lua_State *L);
46+
4547
int lua_consumer_poll_errors(struct lua_State *L);
4648

4749
int lua_consumer_poll_rebalances(struct lua_State *L);

kafka/init.lua

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ function Consumer.create(config)
3333
end)
3434
end
3535

36+
if config.stats_callback ~= nil then
37+
new._poll_stats_fiber = fiber.create(function()
38+
new:_poll_stats()
39+
end)
40+
end
41+
3642
if config.error_callback ~= nil then
3743
new._poll_errors_fiber = fiber.create(function()
3844
new:_poll_errors()
@@ -85,6 +91,25 @@ end
8591

8692
jit.off(Consumer._poll_logs)
8793

94+
function Consumer:_poll_stats()
95+
local count, err
96+
while true do
97+
count, err = self._consumer:poll_stats(100)
98+
if err ~= nil then
99+
log.error("Consumer poll stats error: %s", err)
100+
-- throttling poll
101+
fiber.sleep(0.1)
102+
elseif count > 0 then
103+
fiber.yield()
104+
else
105+
-- throttling poll
106+
fiber.sleep(1)
107+
end
108+
end
109+
end
110+
111+
jit.off(Consumer._poll_stats)
112+
88113
function Consumer:_poll_errors()
89114
local count, err
90115
while true do
@@ -138,6 +163,9 @@ function Consumer:close()
138163
if self._poll_logs_fiber ~= nil then
139164
self._poll_logs_fiber:cancel()
140165
end
166+
if self._poll_stats_fiber ~= nil then
167+
self._poll_stats_fiber:cancel()
168+
end
141169
if self._poll_errors_fiber ~= nil then
142170
self._poll_errors_fiber:cancel()
143171
end
@@ -200,6 +228,12 @@ function Producer.create(config)
200228
end)
201229
end
202230

231+
if config.stats_callback ~= nil then
232+
new._poll_stats_fiber = fiber.create(function()
233+
new:_poll_stats()
234+
end)
235+
end
236+
203237
if config.error_callback ~= nil then
204238
new._poll_errors_fiber = fiber.create(function()
205239
new:_poll_errors()
@@ -249,6 +283,25 @@ end
249283

250284
jit.off(Producer._poll_logs)
251285

286+
function Producer:_poll_stats()
287+
local count, err
288+
while true do
289+
count, err = self._producer:poll_stats(100)
290+
if err ~= nil then
291+
log.error("Producer poll stats error: %s", err)
292+
-- throttling poll
293+
fiber.sleep(0.1)
294+
elseif count > 0 then
295+
fiber.yield()
296+
else
297+
-- throttling poll
298+
fiber.sleep(1)
299+
end
300+
end
301+
end
302+
303+
jit.off(Producer._poll_stats)
304+
252305
function Producer:_poll_errors()
253306
local count, err
254307
while true do
@@ -303,6 +356,9 @@ function Producer:close()
303356
if self._poll_logs_fiber ~= nil then
304357
self._poll_logs_fiber:cancel()
305358
end
359+
if self._poll_stats_fiber ~= nil then
360+
self._poll_stats_fiber:cancel()
361+
end
306362
if self._poll_errors_fiber ~= nil then
307363
self._poll_errors_fiber:cancel()
308364
end

kafka/producer.c

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,48 @@ lua_producer_poll_logs(struct lua_State *L) {
266266
return 2;
267267
}
268268

269+
int
270+
lua_producer_poll_stats(struct lua_State *L) {
271+
if (lua_gettop(L) != 2)
272+
luaL_error(L, "Usage: count, err = producer:poll_stats(limit)");
273+
274+
producer_t *producer = lua_check_producer(L, 1);
275+
if (producer->event_queues == NULL ||
276+
producer->event_queues->stats_queue == NULL ||
277+
producer->event_queues->stats_cb_ref == LUA_REFNIL) {
278+
lua_pushnumber(L, 0);
279+
lua_pushliteral(L, "Consumer poll stats error: callback for logs is not set");
280+
return 2;
281+
}
282+
283+
int limit = lua_tonumber(L, 2);
284+
char *json = NULL;
285+
int count = 0;
286+
char *err_str = NULL;
287+
while (count < limit) {
288+
json = queue_pop(producer->event_queues->stats_queue);
289+
if (json == NULL)
290+
break;
291+
count++;
292+
lua_rawgeti(L, LUA_REGISTRYINDEX, producer->event_queues->stats_cb_ref);
293+
lua_pushstring(L, json);
294+
/* do the call (1 arguments, 0 result) */
295+
if (lua_pcall(L, 1, 0, 0) != 0)
296+
err_str = (char*)lua_tostring(L, -1);
297+
298+
free(json);
299+
300+
if (err_str != NULL)
301+
break;
302+
}
303+
lua_pushnumber(L, (double)count);
304+
if (err_str != NULL) {
305+
lua_pushstring(L, err_str);
306+
return 2;
307+
}
308+
return 1;
309+
}
310+
269311
int
270312
lua_producer_poll_errors(struct lua_State *L) {
271313
if (lua_gettop(L) != 2)
@@ -537,7 +579,7 @@ lua_create_producer(struct lua_State *L) {
537579
}
538580

539581
lua_pushstring(L, "log_callback");
540-
lua_gettable(L, -2 );
582+
lua_gettable(L, -2);
541583
if (lua_isfunction(L, -1)) {
542584
event_queues->log_cb_ref = luaL_ref(L, LUA_REGISTRYINDEX);
543585
event_queues->log_queue = new_queue();
@@ -546,6 +588,16 @@ lua_create_producer(struct lua_State *L) {
546588
lua_pop(L, 1);
547589
}
548590

591+
lua_pushstring(L, "stats_callback");
592+
lua_gettable(L, -2);
593+
if (lua_isfunction(L, -1)) {
594+
event_queues->stats_cb_ref = luaL_ref(L, LUA_REGISTRYINDEX);
595+
event_queues->stats_queue = new_queue();
596+
rd_kafka_conf_set_stats_cb(rd_config, stats_callback);
597+
} else {
598+
lua_pop(L, 1);
599+
}
600+
549601
rd_kafka_conf_set_opaque(rd_config, event_queues);
550602

551603
lua_pushstring(L, "options");

kafka/producer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ int lua_producer_msg_delivery_poll(struct lua_State *L);
4949

5050
int lua_producer_poll_logs(struct lua_State *L);
5151

52+
int lua_producer_poll_stats(struct lua_State *L);
53+
5254
int lua_producer_poll_errors(struct lua_State *L);
5355

5456
int lua_producer_produce(struct lua_State *L);

kafka/tnt_kafka.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ luaopen_kafka_tntkafka(lua_State *L) {
2121
{"unsubscribe", lua_consumer_unsubscribe},
2222
{"poll_msg", lua_consumer_poll_msg},
2323
{"poll_logs", lua_consumer_poll_logs},
24+
{"poll_stats", lua_consumer_poll_stats},
2425
{"poll_errors", lua_consumer_poll_errors},
2526
{"poll_rebalances", lua_consumer_poll_rebalances},
2627
{"store_offset", lua_consumer_store_offset},
@@ -61,6 +62,7 @@ luaopen_kafka_tntkafka(lua_State *L) {
6162
{"produce", lua_producer_produce},
6263
{"msg_delivery_poll", lua_producer_msg_delivery_poll},
6364
{"poll_logs", lua_producer_poll_logs},
65+
{"poll_stats", lua_producer_poll_stats},
6466
{"poll_errors", lua_producer_poll_errors},
6567
{"close", lua_producer_close},
6668
{"destroy", lua_producer_destroy},

tests/app.lua

100644100755
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#!/usr/bin/env tarantool
2+
13
local box = require('box')
24

35
box.cfg{

0 commit comments

Comments
 (0)