-
Notifications
You must be signed in to change notification settings - Fork 727
Make the DatadogExportSpanProcessor re-initialize itself after fork #932
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
dfbc61f
d1e472b
debf2ae
253ecaf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,3 +50,5 @@ where = src | |
|
||
[options.extras_require] | ||
test = | ||
opentelemetry-test-utils == 0.28b1 | ||
flaky |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,8 @@ | |
|
||
import itertools | ||
import logging | ||
import multiprocessing | ||
import os | ||
import sys | ||
import time | ||
import unittest | ||
|
@@ -30,6 +32,7 @@ | |
from opentelemetry.sdk.trace import Resource, sampling | ||
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo | ||
from opentelemetry.semconv.trace import SpanAttributes | ||
from opentelemetry.test.concurrency_test import ConcurrencyTestBase | ||
|
||
|
||
class MockDatadogSpanExporter(datadog.DatadogSpanExporter): | ||
|
@@ -41,6 +44,9 @@ def __init__(self, *args, **kwargs): | |
agent_writer_mock.exit_timeout = 1 | ||
self._agent_writer = agent_writer_mock | ||
|
||
def reset(self): | ||
self._agent_writer.reset_mock() | ||
|
||
|
||
def get_spans(tracer, exporter, shutdown=True): | ||
if shutdown: | ||
|
@@ -667,3 +673,52 @@ def test_service_name_fallback(self): | |
|
||
span = datadog_spans[0] | ||
self.assertEqual(span["service"], "fallback_service_name") | ||
|
||
|
||
class TestDatadogSpanProcessorConcurrency(ConcurrencyTestBase): | ||
@unittest.skipUnless( | ||
hasattr(os, "fork") and sys.version_info >= (3, 7), | ||
"needs *nix and minor version 7 or later", | ||
) | ||
def test_exports_with_fork(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is effectively the same sort of one from open-telemetry/opentelemetry-python#2242 with as few changes as possible. |
||
# pylint: disable=invalid-name | ||
tracer_provider = trace.TracerProvider() | ||
tracer = tracer_provider.get_tracer(__name__) | ||
|
||
exporter = MockDatadogSpanExporter() | ||
|
||
span_processor = datadog.DatadogExportSpanProcessor( | ||
exporter, schedule_delay_millis=50 | ||
) | ||
tracer_provider.add_span_processor(span_processor) | ||
with tracer.start_as_current_span("foo"): | ||
pass | ||
time.sleep(0.5) # give some time for the exporter to upload spans | ||
|
||
self.assertTrue(span_processor.force_flush()) | ||
self.assertEqual(len(get_spans(tracer, exporter, shutdown=False)), 1) | ||
exporter.reset() | ||
|
||
def child(conn): | ||
def _target(): | ||
with tracer.start_as_current_span("span") as s: | ||
s.set_attribute("i", "1") | ||
with tracer.start_as_current_span("temp"): | ||
pass | ||
|
||
self.run_with_many_threads(_target, 100) | ||
|
||
time.sleep(0.5) | ||
|
||
spans = get_spans(tracer, exporter, shutdown=False) | ||
conn.send(len(spans) == 200) | ||
conn.close() | ||
|
||
parent_conn, child_conn = multiprocessing.Pipe() | ||
fork_context = multiprocessing.get_context("fork") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm running this code on macOS where the default is |
||
p = fork_context.Process(target=child, args=(child_conn,)) | ||
p.start() | ||
self.assertTrue(parent_conn.recv()) | ||
p.join() | ||
|
||
span_processor.shutdown() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -360,7 +360,7 @@ commands_pre = | |
|
||
aiopg: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-dbapi pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-aiopg[test] | ||
|
||
datadog: pip install flaky {toxinidir}/exporter/opentelemetry-exporter-datadog[test] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made a bit more sense to me to keep this |
||
datadog: pip install {toxinidir}/exporter/opentelemetry-exporter-datadog[test] | ||
|
||
richconsole: pip install flaky {toxinidir}/exporter/opentelemetry-exporter-richconsole[test] | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really wasn't too sure here what I should/shouldn't re-initialize. I figured I would initialize the threading-related code just in case. What do you think? Are these the right things to re-initialize on the fork?