Skip to content

Commit 9e8ab4c

Browse files
committed
chore: catch Throwable in callback proxies
- Exceptions during ffi callbacks are fatal errors - log error message for debugging
1 parent 7cfafc5 commit 9e8ab4c

9 files changed

+93
-53
lines changed

src/RdKafka/FFI/ConsumeCallbackProxy.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,13 @@ class ConsumeCallbackProxy extends CallbackProxy
1111
{
1212
public function __invoke(CData $nativeMessage, ?CData $opaque = null): void
1313
{
14-
($this->callback)(
15-
new Message($nativeMessage),
16-
OpaqueMap::get($opaque)
17-
);
14+
try {
15+
($this->callback)(
16+
new Message($nativeMessage),
17+
OpaqueMap::get($opaque)
18+
);
19+
} catch (\Throwable $exception) {
20+
error_log($exception->getMessage(), E_ERROR);
21+
}
1822
}
1923
}

src/RdKafka/FFI/DrMsgCallbackProxy.php

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@ class DrMsgCallbackProxy extends CallbackProxy
1212
{
1313
public function __invoke(CData $producer, CData $nativeMessage, ?CData $opaque = null): void
1414
{
15-
($this->callback)(
16-
RdKafka::resolveFromCData($producer),
17-
new Message($nativeMessage),
18-
OpaqueMap::get($opaque)
19-
);
15+
try {
16+
($this->callback)(
17+
RdKafka::resolveFromCData($producer),
18+
new Message($nativeMessage),
19+
OpaqueMap::get($opaque)
20+
);
21+
} catch (\Throwable $exception) {
22+
error_log($exception->getMessage(), E_ERROR);
23+
}
2024
}
2125
}

src/RdKafka/FFI/ErrorCallbackProxy.php

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@ class ErrorCallbackProxy extends CallbackProxy
1111
{
1212
public function __invoke(CData $consumerOrProducer, int $err, string $reason, ?CData $opaque = null): void
1313
{
14-
($this->callback)(
15-
RdKafka::resolveFromCData($consumerOrProducer),
16-
$err,
17-
$reason,
18-
OpaqueMap::get($opaque)
19-
);
14+
try {
15+
($this->callback)(
16+
RdKafka::resolveFromCData($consumerOrProducer),
17+
$err,
18+
$reason,
19+
OpaqueMap::get($opaque)
20+
);
21+
} catch (\Throwable $exception) {
22+
error_log($exception->getMessage(), E_ERROR);
23+
}
2024
}
2125
}

src/RdKafka/FFI/LogCallbackProxy.php

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@ class LogCallbackProxy extends CallbackProxy
1111
{
1212
public function __invoke(CData $rdkafka, int $level, string $facility, string $message): void
1313
{
14-
($this->callback)(
15-
RdKafka::resolveFromCData($rdkafka),
16-
$level,
17-
$facility,
18-
$message
19-
);
14+
try {
15+
($this->callback)(
16+
RdKafka::resolveFromCData($rdkafka),
17+
$level,
18+
$facility,
19+
$message
20+
);
21+
} catch (\Throwable $exception) {
22+
error_log($exception->getMessage(), E_ERROR);
23+
}
2024
}
2125
}

src/RdKafka/FFI/NativePartitionerCallbackProxy.php

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,20 @@ public function __invoke(
2525
?CData $topic_opaque = null,
2626
?CData $msg_opaque = null
2727
): int {
28-
return (int) Library::{$this->partitionerMethod}(
29-
$topic,
30-
$keydata,
31-
$keylen,
32-
$partition_cnt,
33-
OpaqueMap::get($topic_opaque),
34-
OpaqueMap::get($msg_opaque)
35-
);
28+
try {
29+
return (int) Library::{$this->partitionerMethod}(
30+
$topic,
31+
$keydata,
32+
$keylen,
33+
$partition_cnt,
34+
OpaqueMap::get($topic_opaque),
35+
OpaqueMap::get($msg_opaque)
36+
);
37+
} catch (\Throwable $exception) {
38+
error_log($exception->getMessage(), E_ERROR);
39+
}
40+
41+
return RD_KAFKA_PARTITION_UA;
3642
}
3743

3844
public static function create(string $partitionerMethod): Closure

src/RdKafka/FFI/OffsetCommitCallbackProxy.php

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@ class OffsetCommitCallbackProxy extends CallbackProxy
1212
{
1313
public function __invoke(CData $consumer, int $err, CData $nativeTopicPartitionList, ?CData $opaque = null): void
1414
{
15-
($this->callback)(
16-
RdKafka::resolveFromCData($consumer),
17-
$err,
18-
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
19-
OpaqueMap::get($opaque)
20-
);
15+
try {
16+
($this->callback)(
17+
RdKafka::resolveFromCData($consumer),
18+
$err,
19+
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
20+
OpaqueMap::get($opaque)
21+
);
22+
} catch (\Throwable $exception) {
23+
error_log($exception->getMessage(), E_ERROR);
24+
}
2125
}
2226
}

src/RdKafka/FFI/PartitionerCallbackProxy.php

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,17 @@ public function __invoke(
1717
?CData $topic_opaque = null,
1818
?CData $msg_opaque = null
1919
): int {
20-
return (int) ($this->callback)(
21-
$keydata === null ? null : FFI::string($keydata, $keylen),
22-
$partition_cnt,
23-
OpaqueMap::get($topic_opaque),
24-
OpaqueMap::get($msg_opaque)
25-
);
20+
try {
21+
return (int) ($this->callback)(
22+
$keydata === null ? null : FFI::string($keydata, $keylen),
23+
$partition_cnt,
24+
OpaqueMap::get($topic_opaque),
25+
OpaqueMap::get($msg_opaque)
26+
);
27+
} catch (\Throwable $exception) {
28+
error_log($exception->getMessage(), E_ERROR);
29+
}
30+
31+
return RD_KAFKA_PARTITION_UA;
2632
}
2733
}

src/RdKafka/FFI/RebalanceCallbackProxy.php

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@ class RebalanceCallbackProxy extends CallbackProxy
1212
{
1313
public function __invoke(CData $consumer, int $err, CData $nativeTopicPartitionList, ?CData $opaque = null): void
1414
{
15-
($this->callback)(
16-
RdKafka::resolveFromCData($consumer),
17-
$err,
18-
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
19-
OpaqueMap::get($opaque)
20-
);
15+
try {
16+
($this->callback)(
17+
RdKafka::resolveFromCData($consumer),
18+
$err,
19+
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
20+
OpaqueMap::get($opaque)
21+
);
22+
} catch (\Throwable $exception) {
23+
error_log($exception->getMessage(), E_ERROR);
24+
}
2125
}
2226
}

src/RdKafka/FFI/StatsCallbackProxy.php

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,16 @@ class StatsCallbackProxy extends CallbackProxy
1212
{
1313
public function __invoke(CData $consumerOrProducer, CData $json, int $json_len, ?CData $opaque = null): int
1414
{
15-
($this->callback)(
16-
RdKafka::resolveFromCData($consumerOrProducer),
17-
FFI::string($json, $json_len),
18-
$json_len,
19-
OpaqueMap::get($opaque)
20-
);
15+
try {
16+
($this->callback)(
17+
RdKafka::resolveFromCData($consumerOrProducer),
18+
FFI::string($json, $json_len),
19+
$json_len,
20+
OpaqueMap::get($opaque)
21+
);
22+
} catch (\Throwable $exception) {
23+
error_log($exception->getMessage(), E_ERROR);
24+
}
2125

2226
return 0;
2327
}

0 commit comments

Comments
 (0)