Skip to content

Commit 9fbc93b

Browse files
authored
bugfix: batch processor doesn't work with uwsgi (#2277)
1 parent b83c2ae commit 9fbc93b

File tree

4 files changed

+23
-9
lines changed

4 files changed

+23
-9
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_logs/export/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
from opentelemetry.context import attach, detach, set_value
2626
from opentelemetry.sdk._logs import LogData, LogProcessor, LogRecord
27+
from opentelemetry.util._once import Once
2728
from opentelemetry.util._time import _time_ns
2829

2930
_logger = logging.getLogger(__name__)
@@ -129,6 +130,9 @@ def __init__(self):
129130
self.num_log_records = 0
130131

131132

133+
_BSP_RESET_ONCE = Once()
134+
135+
132136
class BatchLogProcessor(LogProcessor):
133137
"""This is an implementation of LogProcessor which creates batches of
134138
received logs in the export-friendly LogData representation and
@@ -164,6 +168,7 @@ def __init__(
164168
os.register_at_fork(
165169
after_in_child=self._at_fork_reinit
166170
) # pylint: disable=protected-access
171+
self._pid = os.getpid()
167172

168173
def _at_fork_reinit(self):
169174
self._condition = threading.Condition(threading.Lock())
@@ -174,6 +179,7 @@ def _at_fork_reinit(self):
174179
daemon=True,
175180
)
176181
self._worker_thread.start()
182+
self._pid = os.getpid()
177183

178184
def worker(self):
179185
timeout = self._schedule_delay_millis / 1e3
@@ -293,6 +299,9 @@ def emit(self, log_data: LogData) -> None:
293299
"""
294300
if self._shutdown:
295301
return
302+
if self._pid != os.getpid():
303+
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
304+
296305
self._queue.appendleft(log_data)
297306
if len(self._queue) >= self._max_export_batch_size:
298307
with self._condition:

opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
OTEL_BSP_SCHEDULE_DELAY,
3737
)
3838
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
39+
from opentelemetry.util._once import Once
3940
from opentelemetry.util._time import _time_ns
4041

4142
logger = logging.getLogger(__name__)
@@ -119,6 +120,9 @@ def __init__(self):
119120
self.num_spans = 0
120121

121122

123+
_BSP_RESET_ONCE = Once()
124+
125+
122126
class BatchSpanProcessor(SpanProcessor):
123127
"""Batch span processor implementation.
124128
@@ -203,6 +207,7 @@ def __init__(
203207
os.register_at_fork(
204208
after_in_child=self._at_fork_reinit
205209
) # pylint: disable=protected-access
210+
self._pid = os.getpid()
206211

207212
def on_start(
208213
self, span: Span, parent_context: typing.Optional[Context] = None
@@ -215,6 +220,9 @@ def on_end(self, span: ReadableSpan) -> None:
215220
return
216221
if not span.context.trace_flags.sampled:
217222
return
223+
if self._pid != os.getpid():
224+
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
225+
218226
if len(self.queue) == self.max_queue_size:
219227
if not self._spans_dropped:
220228
logger.warning("Queue is full, likely spans will be dropped.")
@@ -236,6 +244,7 @@ def _at_fork_reinit(self):
236244
name="OtelBatchSpanProcessor", target=self.worker, daemon=True
237245
)
238246
self.worker_thread.start()
247+
self._pid = os.getpid()
239248

240249
def worker(self):
241250
timeout = self.schedule_delay_millis / 1e3

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import logging
1717
import multiprocessing
1818
import os
19-
import sys
2019
import time
2120
import unittest
2221
from concurrent.futures import ThreadPoolExecutor
@@ -44,8 +43,6 @@
4443
from opentelemetry.trace import TraceFlags
4544
from opentelemetry.trace.span import INVALID_SPAN_CONTEXT
4645

47-
supports_register_at_fork = hasattr(os, "fork") and sys.version_info >= (3, 7)
48-
4946

5047
class TestSimpleLogProcessor(unittest.TestCase):
5148
def test_simple_log_processor_default_level(self):
@@ -274,9 +271,9 @@ def bulk_log_and_flush(num_logs):
274271
finished_logs = exporter.get_finished_logs()
275272
self.assertEqual(len(finished_logs), 2415)
276273

277-
@unittest.skipIf(
278-
not supports_register_at_fork,
279-
"needs *nix and minor version 7 or later",
274+
@unittest.skipUnless(
275+
hasattr(os, "fork"),
276+
"needs *nix",
280277
)
281278
def test_batch_log_processor_fork(self):
282279
# pylint: disable=invalid-name

opentelemetry-sdk/tests/trace/export/test_export.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
import multiprocessing
1616
import os
17-
import sys
1817
import threading
1918
import time
2019
import unittest
@@ -369,8 +368,8 @@ def _check_fork_trace(self, exporter, expected):
369368
self.assertIn(span.name, expected)
370369

371370
@unittest.skipUnless(
372-
hasattr(os, "fork") and sys.version_info >= (3, 7),
373-
"needs *nix and minor version 7 or later",
371+
hasattr(os, "fork"),
372+
"needs *nix",
374373
)
375374
def test_batch_span_processor_fork(self):
376375
# pylint: disable=invalid-name

0 commit comments

Comments
 (0)