Skip to content

Make the DatadogExportSpanProcessor re-initialize itself after fork #932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#903](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/903))
- `opentelemetry-instrumentation-falcon` Safer patching mechanism
([#895](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/895))
- `opentelemetry-exporter-datadog` re-initializes itself on fork to prevent deadlocks ([#932](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/932))

## [1.9.1-0.28b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.9.1-0.28b1) - 2022-01-29

Expand Down
2 changes: 2 additions & 0 deletions exporter/opentelemetry-exporter-datadog/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,5 @@ where = src

[options.extras_require]
test =
opentelemetry-test-utils == 0.28b1
flaky
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import collections
import logging
import os
import threading
import typing

Expand Down Expand Up @@ -67,7 +68,7 @@ def __init__(
self.traces_spans_count = collections.Counter()
self.traces_spans_ended_count = collections.Counter()

self.worker_thread = threading.Thread(target=self.worker, daemon=True)
self.worker_thread = self._create_worker_thread()

# threading conditions used for flushing and shutdown
self.condition = threading.Condition(threading.Lock())
Expand All @@ -81,6 +82,24 @@ def __init__(
self.schedule_delay_millis = schedule_delay_millis
self.done = False
self.worker_thread.start()
# Only available in *nix since py37.
if hasattr(os, "register_at_fork"):
os.register_at_fork(
after_in_child=self._at_fork_reinit
) # pylint: disable=protected-access

def _at_fork_reinit(self):
self.condition = threading.Condition(threading.Lock())
self.flush_condition = threading.Condition(threading.Lock())
self.traces_lock = threading.Lock()
self.check_traces_queue.clear()
self.worker_thread = self._create_worker_thread()
self.worker_thread.start()
Comment on lines +92 to +97
Copy link
Contributor Author

@phillipuniverse phillipuniverse Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really wasn't too sure here what I should/shouldn't re-initialize. I figured I would initialize the threading-related code just in case. What do you think? Are these the right things to re-initialize on the fork?


def _create_worker_thread(self) -> threading.Thread:
return threading.Thread(
name="DatadogProcessor", target=self.worker, daemon=True
)

def on_start(
self, span: Span, parent_context: typing.Optional[Context] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import itertools
import logging
import multiprocessing
import os
import sys
import time
import unittest
Expand All @@ -30,6 +32,7 @@
from opentelemetry.sdk.trace import Resource, sampling
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.concurrency_test import ConcurrencyTestBase


class MockDatadogSpanExporter(datadog.DatadogSpanExporter):
Expand All @@ -41,6 +44,9 @@ def __init__(self, *args, **kwargs):
agent_writer_mock.exit_timeout = 1
self._agent_writer = agent_writer_mock

def reset(self):
self._agent_writer.reset_mock()


def get_spans(tracer, exporter, shutdown=True):
if shutdown:
Expand Down Expand Up @@ -667,3 +673,52 @@ def test_service_name_fallback(self):

span = datadog_spans[0]
self.assertEqual(span["service"], "fallback_service_name")


class TestDatadogSpanProcessorConcurrency(ConcurrencyTestBase):
@unittest.skipUnless(
hasattr(os, "fork") and sys.version_info >= (3, 7),
"needs *nix and minor version 7 or later",
)
def test_exports_with_fork(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is effectively the same sort of one from open-telemetry/opentelemetry-python#2242 with as few changes as possible.

# pylint: disable=invalid-name
tracer_provider = trace.TracerProvider()
tracer = tracer_provider.get_tracer(__name__)

exporter = MockDatadogSpanExporter()

span_processor = datadog.DatadogExportSpanProcessor(
exporter, schedule_delay_millis=50
)
tracer_provider.add_span_processor(span_processor)
with tracer.start_as_current_span("foo"):
pass
time.sleep(0.5) # give some time for the exporter to upload spans

self.assertTrue(span_processor.force_flush())
self.assertEqual(len(get_spans(tracer, exporter, shutdown=False)), 1)
exporter.reset()

def child(conn):
def _target():
with tracer.start_as_current_span("span") as s:
s.set_attribute("i", "1")
with tracer.start_as_current_span("temp"):
pass

self.run_with_many_threads(_target, 100)

time.sleep(0.5)

spans = get_spans(tracer, exporter, shutdown=False)
conn.send(len(spans) == 200)
conn.close()

parent_conn, child_conn = multiprocessing.Pipe()
fork_context = multiprocessing.get_context("fork")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm running this code on macOS where the default is spawn. With that default I got a 'Can't pickle local object' when the created Process linked to this inner child function. I didn't really want to move out child but figured this small detail didn't matter much since the test should behave the same on all systems.

p = fork_context.Process(target=child, args=(child_conn,))
p.start()
self.assertTrue(parent_conn.recv())
p.join()

span_processor.shutdown()
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ commands_pre =

aiopg: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-dbapi pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-aiopg[test]

datadog: pip install flaky {toxinidir}/exporter/opentelemetry-exporter-datadog[test]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a bit more sense to me to keep this flaky dependency in the extras_require of the module instead of here in tox.ini.

datadog: pip install {toxinidir}/exporter/opentelemetry-exporter-datadog[test]

richconsole: pip install flaky {toxinidir}/exporter/opentelemetry-exporter-richconsole[test]

Expand Down