-
Notifications
You must be signed in to change notification settings - Fork 707
sdk/trace/exporters: add batch span processor exporter #153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
35205ed
ffe33fc
4784c15
3e69336
f4256b8
7559ee4
75a32dd
cf023ed
5366af1
0346b19
66c5191
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,10 +12,14 @@ | |
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import collections | ||
import logging | ||
import threading | ||
import typing | ||
from enum import Enum | ||
|
||
from opentelemetry.sdk import util | ||
|
||
from .. import Span, SpanProcessor | ||
|
||
logger = logging.getLogger(__name__) | ||
|
@@ -78,6 +82,124 @@ def shutdown(self) -> None: | |
self.span_exporter.shutdown() | ||
|
||
|
||
class BatchExportSpanProcessor(SpanProcessor): | ||
"""Batch span processor implementation. | ||
|
||
BatchExportSpanProcessor is an implementation of `SpanProcessor` that | ||
batches ended spans and pushes them to the configured `SpanExporter`. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
span_exporter: SpanExporter, | ||
max_queue_size: int = 2048, | ||
schedule_delay_millis: float = 5000, | ||
max_export_batch_size: int = 512, | ||
): | ||
if max_queue_size <= 0: | ||
raise ValueError("max_queue_size must be a positive integer.") | ||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if schedule_delay_millis <= 0: | ||
raise ValueError("schedule_delay_millis must be positive.") | ||
|
||
if max_export_batch_size <= 0: | ||
raise ValueError( | ||
"max_export_batch_size must be a positive integer." | ||
) | ||
|
||
if max_export_batch_size > max_queue_size: | ||
raise ValueError( | ||
"max_export_batch_size must be less than and equal to max_export_batch_size." | ||
) | ||
|
||
self.span_exporter = span_exporter | ||
self.queue = collections.deque([], max_queue_size) | ||
self.worker_thread = threading.Thread(target=self.worker, daemon=True) | ||
self.condition = threading.Condition(threading.Lock()) | ||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.schedule_delay_millis = schedule_delay_millis | ||
self.max_export_batch_size = max_export_batch_size | ||
self.max_queue_size = max_queue_size | ||
self.done = False | ||
# flag that indicates that spans are being dropped | ||
self._spans_dropped = False | ||
# precallocated list to send spans to exporter | ||
self.spans_list = [None] * self.max_export_batch_size | ||
self.worker_thread.start() | ||
|
||
def on_start(self, span: Span) -> None: | ||
pass | ||
|
||
def on_end(self, span: Span) -> None: | ||
if self.done: | ||
logging.warning("Already shutdown, dropping span.") | ||
return | ||
if len(self.queue) == self.max_queue_size: | ||
if not self._spans_dropped: | ||
logging.warning("Queue is full, likely spans will be dropped.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a consequence of adding items outside the condition, because the worker might wake up and consume some spans between here and I bet we'll want exact stats on dropped spans in the future, will probably need to change this behavior. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think if we want exact stats we can either:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @c24t yes you right. There is not a way to actually now if the queue will be full when |
||
self._spans_dropped = True | ||
|
||
self.queue.appendleft(span) | ||
|
||
if len(self.queue) >= self.max_queue_size // 2: | ||
with self.condition: | ||
self.condition.notify() | ||
|
||
def worker(self): | ||
timeout = self.schedule_delay_millis / 1e3 | ||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
while not self.done: | ||
if len(self.queue) < self.max_export_batch_size: | ||
with self.condition: | ||
self.condition.wait(timeout) | ||
if not self.queue: | ||
# spurious notification, let's wait again | ||
continue | ||
if self.done: | ||
# missing spans will be sent when calling flush | ||
break | ||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# substract the duration of this export call to the next timeout | ||
start = util.time_ns() | ||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.export() | ||
end = util.time_ns() | ||
duration = (end - start) / 1e9 | ||
timeout = self.schedule_delay_millis / 1e3 - duration | ||
|
||
# be sure that all spans are sent | ||
self._flush() | ||
|
||
def export(self) -> bool: | ||
"""Exports at most max_export_batch_size spans.""" | ||
idx = 0 | ||
|
||
# currently only a single thread acts as consumer, so queue.pop() will | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will raise or will not raise? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the |
||
# not raise an exception | ||
while idx < self.max_export_batch_size and self.queue: | ||
self.spans_list[idx] = self.queue.pop() | ||
idx += 1 | ||
try: | ||
self.span_exporter.export(self.spans_list[:idx]) | ||
# pylint: disable=broad-except | ||
except Exception: | ||
logger.exception("Exception while exporting data.") | ||
|
||
# clean up list | ||
for index in range(idx): | ||
self.spans_list[index] = None | ||
|
||
def _flush(self): | ||
# export all elements until queue is empty | ||
while self.queue: | ||
self.export() | ||
|
||
def shutdown(self) -> None: | ||
# signal the worker thread to finish and then wait for it | ||
self.done = True | ||
with self.condition: | ||
self.condition.notify_all() | ||
self.worker_thread.join() | ||
self.span_exporter.shutdown() | ||
|
||
|
||
class ConsoleSpanExporter(SpanExporter): | ||
"""Implementation of :class:`SpanExporter` that prints spans to the | ||
console. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,22 +12,34 @@ | |
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import time | ||
import unittest | ||
from unittest import mock | ||
|
||
from opentelemetry import trace as trace_api | ||
from opentelemetry.sdk import trace | ||
from opentelemetry.sdk.trace import export | ||
|
||
|
||
class TestSimpleExportSpanProcessor(unittest.TestCase): | ||
def test_simple_span_processor(self): | ||
class MySpanExporter(export.SpanExporter): | ||
def __init__(self, destination): | ||
self.destination = destination | ||
class MySpanExporter(export.SpanExporter): | ||
"""Very simple span exporter used for testing.""" | ||
|
||
def __init__(self, destination, max_export_batch_size=None): | ||
self.destination = destination | ||
self.max_export_batch_size = max_export_batch_size | ||
|
||
def export(self, spans: trace.Span) -> export.SpanExportResult: | ||
self.destination.extend(span.name for span in spans) | ||
return export.SpanExportResult.SUCCESS | ||
def export(self, spans: trace.Span) -> export.SpanExportResult: | ||
if ( | ||
self.max_export_batch_size is not None | ||
and len(spans) > self.max_export_batch_size | ||
): | ||
raise ValueError("Batch is too big") | ||
self.destination.extend(span.name for span in spans) | ||
return export.SpanExportResult.SUCCESS | ||
|
||
|
||
class TestSimpleExportSpanProcessor(unittest.TestCase): | ||
def test_simple_span_processor(self): | ||
tracer = trace.Tracer() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd vote to leave the tracer out of these tests and call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this idea, make them look more like test units than integration tests. |
||
|
||
spans_names_list = [] | ||
|
@@ -42,3 +54,142 @@ def export(self, spans: trace.Span) -> export.SpanExportResult: | |
pass | ||
|
||
self.assertListEqual(["xxx", "bar", "foo"], spans_names_list) | ||
|
||
|
||
def _create_start_and_end_span(name, span_processor): | ||
span = trace.Span( | ||
name, | ||
mock.Mock(spec=trace_api.SpanContext), | ||
span_processor=span_processor, | ||
) | ||
span.start() | ||
span.end() | ||
|
||
|
||
class TestBatchExportSpanProcessor(unittest.TestCase): | ||
def test_batch_span_processor(self): | ||
spans_names_list = [] | ||
|
||
my_exporter = MySpanExporter(destination=spans_names_list) | ||
span_processor = export.BatchExportSpanProcessor(my_exporter) | ||
|
||
span_names = ["xxx", "bar", "foo"] | ||
|
||
for name in span_names: | ||
_create_start_and_end_span(name, span_processor) | ||
|
||
span_processor.shutdown() | ||
self.assertListEqual(span_names, spans_names_list) | ||
|
||
def test_batch_span_processor_lossless(self): | ||
"""Test that no spans are lost when sending max_queue_size spans""" | ||
spans_names_list = [] | ||
|
||
my_exporter = MySpanExporter( | ||
destination=spans_names_list, max_export_batch_size=128 | ||
) | ||
span_processor = export.BatchExportSpanProcessor( | ||
my_exporter, max_queue_size=512, max_export_batch_size=128 | ||
) | ||
|
||
for _ in range(512): | ||
_create_start_and_end_span("foo", span_processor) | ||
|
||
span_processor.shutdown() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this test and the one above just seem to check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll add it. |
||
self.assertEqual(len(spans_names_list), 512) | ||
|
||
def test_batch_span_processor_many_spans(self): | ||
"""Test that no spans are lost when sending many spans""" | ||
spans_names_list = [] | ||
|
||
my_exporter = MySpanExporter( | ||
destination=spans_names_list, max_export_batch_size=128 | ||
) | ||
span_processor = export.BatchExportSpanProcessor( | ||
my_exporter, | ||
max_queue_size=256, | ||
max_export_batch_size=64, | ||
schedule_delay_millis=100, | ||
) | ||
|
||
for _ in range(4): | ||
for _ in range(256): | ||
_create_start_and_end_span("foo", span_processor) | ||
|
||
time.sleep(0.05) # give some time for the exporter to upload spans | ||
|
||
span_processor.shutdown() | ||
self.assertEqual(len(spans_names_list), 1024) | ||
|
||
def test_batch_span_processor_scheduled_delay(self): | ||
"""Test that spans are exported each schedule_delay_millis""" | ||
spans_names_list = [] | ||
|
||
my_exporter = MySpanExporter(destination=spans_names_list) | ||
span_processor = export.BatchExportSpanProcessor( | ||
my_exporter, schedule_delay_millis=50 | ||
) | ||
|
||
# create single span | ||
_create_start_and_end_span("foo", span_processor) | ||
|
||
time.sleep(0.05 + 0.02) | ||
# span should be already exported | ||
self.assertEqual(len(spans_names_list), 1) | ||
|
||
span_processor.shutdown() | ||
|
||
def test_batch_span_processor_parameters(self): | ||
# zero max_queue_size | ||
self.assertRaises( | ||
ValueError, export.BatchExportSpanProcessor, None, max_queue_size=0 | ||
) | ||
|
||
# negative max_queue_size | ||
self.assertRaises( | ||
ValueError, | ||
export.BatchExportSpanProcessor, | ||
None, | ||
max_queue_size=-500, | ||
) | ||
|
||
# zero schedule_delay_millis | ||
self.assertRaises( | ||
ValueError, | ||
export.BatchExportSpanProcessor, | ||
None, | ||
schedule_delay_millis=0, | ||
) | ||
|
||
# negative schedule_delay_millis | ||
self.assertRaises( | ||
ValueError, | ||
export.BatchExportSpanProcessor, | ||
None, | ||
schedule_delay_millis=-500, | ||
) | ||
|
||
# zero max_export_batch_size | ||
self.assertRaises( | ||
ValueError, | ||
export.BatchExportSpanProcessor, | ||
None, | ||
max_export_batch_size=0, | ||
) | ||
|
||
# negative max_export_batch_size | ||
self.assertRaises( | ||
ValueError, | ||
export.BatchExportSpanProcessor, | ||
None, | ||
max_export_batch_size=-500, | ||
) | ||
|
||
# max_export_batch_size > max_queue_size: | ||
self.assertRaises( | ||
ValueError, | ||
export.BatchExportSpanProcessor, | ||
None, | ||
max_queue_size=256, | ||
max_export_batch_size=512, | ||
) |
Uh oh!
There was an error while loading. Please reload this page.