Skip to content

Commit dfbc61f

Browse files
Make the DatadogExportSpanProcessor re-initialize itself after fork
1 parent 1080b04 commit dfbc61f

File tree

3 files changed

+77
-1
lines changed

3 files changed

+77
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2020
([#903](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/903))
2121
- `opentelemetry-instrumentation-falcon` Safer patching mechanism
2222
([#895](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/895))
23+
- `opentelemetry-exporter-datadog` re-initializes itself on fork to prevent deadlocks ([#932](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/932))
2324

2425
## [1.9.1-0.28b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.9.1-0.28b1) - 2022-01-29
2526

exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/spanprocessor.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import collections
1616
import logging
17+
import os
1718
import threading
1819
import typing
1920

@@ -67,7 +68,7 @@ def __init__(
6768
self.traces_spans_count = collections.Counter()
6869
self.traces_spans_ended_count = collections.Counter()
6970

70-
self.worker_thread = threading.Thread(target=self.worker, daemon=True)
71+
self.worker_thread = self._create_worker_thread()
7172

7273
# threading conditions used for flushing and shutdown
7374
self.condition = threading.Condition(threading.Lock())
@@ -81,6 +82,24 @@ def __init__(
8182
self.schedule_delay_millis = schedule_delay_millis
8283
self.done = False
8384
self.worker_thread.start()
85+
# Only available in *nix since py37.
86+
if hasattr(os, "register_at_fork"):
87+
os.register_at_fork(
88+
after_in_child=self._at_fork_reinit
89+
) # pylint: disable=protected-access
90+
91+
def _at_fork_reinit(self):
92+
self.condition = threading.Condition(threading.Lock())
93+
self.flush_condition = threading.Condition(threading.Lock())
94+
self.traces_lock = threading.Lock()
95+
self.check_traces_queue.clear()
96+
self.worker_thread = self._create_worker_thread()
97+
self.worker_thread.start()
98+
99+
def _create_worker_thread(self) -> threading.Thread:
100+
return threading.Thread(
101+
name="DatadogProcessor", target=self.worker, daemon=True
102+
)
84103

85104
def on_start(
86105
self, span: Span, parent_context: typing.Optional[Context] = None

exporter/opentelemetry-exporter-datadog/tests/test_datadog_exporter.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
import itertools
1616
import logging
17+
import multiprocessing
18+
import os
1719
import sys
1820
import time
1921
import unittest
@@ -30,6 +32,7 @@
3032
from opentelemetry.sdk.trace import Resource, sampling
3133
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
3234
from opentelemetry.semconv.trace import SpanAttributes
35+
from opentelemetry.test.concurrency_test import ConcurrencyTestBase
3336

3437

3538
class MockDatadogSpanExporter(datadog.DatadogSpanExporter):
@@ -41,6 +44,9 @@ def __init__(self, *args, **kwargs):
4144
agent_writer_mock.exit_timeout = 1
4245
self._agent_writer = agent_writer_mock
4346

47+
def reset(self):
48+
self._agent_writer.reset_mock()
49+
4450

4551
def get_spans(tracer, exporter, shutdown=True):
4652
if shutdown:
@@ -667,3 +673,53 @@ def test_service_name_fallback(self):
667673

668674
span = datadog_spans[0]
669675
self.assertEqual(span["service"], "fallback_service_name")
676+
677+
678+
class TestDatadogSpanProcessorConcurrency(ConcurrencyTestBase):
679+
680+
@unittest.skipUnless(
681+
hasattr(os, "fork") and sys.version_info >= (3, 7),
682+
"needs *nix and minor version 7 or later",
683+
)
684+
def test_exports_with_fork(self):
685+
# pylint: disable=invalid-name
686+
tracer_provider = trace.TracerProvider()
687+
tracer = tracer_provider.get_tracer(__name__)
688+
689+
exporter = MockDatadogSpanExporter()
690+
691+
span_processor = datadog.DatadogExportSpanProcessor(
692+
exporter, schedule_delay_millis=50
693+
)
694+
tracer_provider.add_span_processor(span_processor)
695+
with tracer.start_as_current_span("foo"):
696+
pass
697+
time.sleep(0.5) # give some time for the exporter to upload spans
698+
699+
self.assertTrue(span_processor.force_flush())
700+
self.assertEqual(len(get_spans(tracer, exporter, shutdown=False)), 1)
701+
exporter.reset()
702+
703+
def child(conn):
704+
def _target():
705+
with tracer.start_as_current_span("span") as s:
706+
s.set_attribute("i", "1")
707+
with tracer.start_as_current_span("temp"):
708+
pass
709+
710+
self.run_with_many_threads(_target, 100)
711+
712+
time.sleep(0.5)
713+
714+
spans = get_spans(tracer, exporter, shutdown=False)
715+
conn.send(len(spans) == 200)
716+
conn.close()
717+
718+
parent_conn, child_conn = multiprocessing.Pipe()
719+
fork_context = multiprocessing.get_context("fork")
720+
p = fork_context.Process(target=child, args=(child_conn,))
721+
p.start()
722+
self.assertTrue(parent_conn.recv())
723+
p.join()
724+
725+
span_processor.shutdown()

0 commit comments

Comments
 (0)