Skip to content

Commit d0946cd

Browse files
mauriciovasquezbernalOberon00
authored andcommitted
sdk/trace/exporters: add batch span processor exporter (#153)
The exporters specification states that two built-in span processors should be implemented, the simple processor span and the batch processor span. This commit implements the latter, it is mainly based on the opentelemetry/java one. The algorithm implements the following logic: - a condition variable is used to notify the worker thread in case the queue is half full, so that exporting can start before the queue gets full and spans are dropped. - export is called each schedule_delay_millis if there is a least one new span to export. - when the processor is shutdown all remaining spans are exported.
1 parent 84f589b commit d0946cd

File tree

2 files changed

+281
-8
lines changed

2 files changed

+281
-8
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import collections
1516
import logging
17+
import threading
1618
import typing
1719
from enum import Enum
1820

21+
from opentelemetry.sdk import util
22+
1923
from .. import Span, SpanProcessor
2024

2125
logger = logging.getLogger(__name__)
@@ -78,6 +82,124 @@ def shutdown(self) -> None:
7882
self.span_exporter.shutdown()
7983

8084

85+
class BatchExportSpanProcessor(SpanProcessor):
86+
"""Batch span processor implementation.
87+
88+
BatchExportSpanProcessor is an implementation of `SpanProcessor` that
89+
batches ended spans and pushes them to the configured `SpanExporter`.
90+
"""
91+
92+
def __init__(
93+
self,
94+
span_exporter: SpanExporter,
95+
max_queue_size: int = 2048,
96+
schedule_delay_millis: float = 5000,
97+
max_export_batch_size: int = 512,
98+
):
99+
if max_queue_size <= 0:
100+
raise ValueError("max_queue_size must be a positive integer.")
101+
102+
if schedule_delay_millis <= 0:
103+
raise ValueError("schedule_delay_millis must be positive.")
104+
105+
if max_export_batch_size <= 0:
106+
raise ValueError(
107+
"max_export_batch_size must be a positive integer."
108+
)
109+
110+
if max_export_batch_size > max_queue_size:
111+
raise ValueError(
112+
"max_export_batch_size must be less than and equal to max_export_batch_size."
113+
)
114+
115+
self.span_exporter = span_exporter
116+
self.queue = collections.deque([], max_queue_size)
117+
self.worker_thread = threading.Thread(target=self.worker, daemon=True)
118+
self.condition = threading.Condition(threading.Lock())
119+
self.schedule_delay_millis = schedule_delay_millis
120+
self.max_export_batch_size = max_export_batch_size
121+
self.max_queue_size = max_queue_size
122+
self.done = False
123+
# flag that indicates that spans are being dropped
124+
self._spans_dropped = False
125+
# precallocated list to send spans to exporter
126+
self.spans_list = [None] * self.max_export_batch_size
127+
self.worker_thread.start()
128+
129+
def on_start(self, span: Span) -> None:
130+
pass
131+
132+
def on_end(self, span: Span) -> None:
133+
if self.done:
134+
logging.warning("Already shutdown, dropping span.")
135+
return
136+
if len(self.queue) == self.max_queue_size:
137+
if not self._spans_dropped:
138+
logging.warning("Queue is full, likely spans will be dropped.")
139+
self._spans_dropped = True
140+
141+
self.queue.appendleft(span)
142+
143+
if len(self.queue) >= self.max_queue_size // 2:
144+
with self.condition:
145+
self.condition.notify()
146+
147+
def worker(self):
148+
timeout = self.schedule_delay_millis / 1e3
149+
while not self.done:
150+
if len(self.queue) < self.max_export_batch_size:
151+
with self.condition:
152+
self.condition.wait(timeout)
153+
if not self.queue:
154+
# spurious notification, let's wait again
155+
continue
156+
if self.done:
157+
# missing spans will be sent when calling flush
158+
break
159+
160+
# substract the duration of this export call to the next timeout
161+
start = util.time_ns()
162+
self.export()
163+
end = util.time_ns()
164+
duration = (end - start) / 1e9
165+
timeout = self.schedule_delay_millis / 1e3 - duration
166+
167+
# be sure that all spans are sent
168+
self._flush()
169+
170+
def export(self) -> bool:
171+
"""Exports at most max_export_batch_size spans."""
172+
idx = 0
173+
174+
# currently only a single thread acts as consumer, so queue.pop() will
175+
# not raise an exception
176+
while idx < self.max_export_batch_size and self.queue:
177+
self.spans_list[idx] = self.queue.pop()
178+
idx += 1
179+
try:
180+
self.span_exporter.export(self.spans_list[:idx])
181+
# pylint: disable=broad-except
182+
except Exception:
183+
logger.exception("Exception while exporting data.")
184+
185+
# clean up list
186+
for index in range(idx):
187+
self.spans_list[index] = None
188+
189+
def _flush(self):
190+
# export all elements until queue is empty
191+
while self.queue:
192+
self.export()
193+
194+
def shutdown(self) -> None:
195+
# signal the worker thread to finish and then wait for it
196+
self.done = True
197+
with self.condition:
198+
self.condition.notify_all()
199+
self.worker_thread.join()
200+
self.span_exporter.shutdown()
201+
202+
81203
class ConsoleSpanExporter(SpanExporter):
82204
"""Implementation of :class:`SpanExporter` that prints spans to the
83205
console.

opentelemetry-sdk/tests/trace/export/test_export.py

Lines changed: 159 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,34 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import time
1516
import unittest
17+
from unittest import mock
1618

19+
from opentelemetry import trace as trace_api
1720
from opentelemetry.sdk import trace
1821
from opentelemetry.sdk.trace import export
1922

2023

21-
class TestSimpleExportSpanProcessor(unittest.TestCase):
22-
def test_simple_span_processor(self):
23-
class MySpanExporter(export.SpanExporter):
24-
def __init__(self, destination):
25-
self.destination = destination
24+
class MySpanExporter(export.SpanExporter):
25+
"""Very simple span exporter used for testing."""
26+
27+
def __init__(self, destination, max_export_batch_size=None):
28+
self.destination = destination
29+
self.max_export_batch_size = max_export_batch_size
2630

27-
def export(self, spans: trace.Span) -> export.SpanExportResult:
28-
self.destination.extend(span.name for span in spans)
29-
return export.SpanExportResult.SUCCESS
31+
def export(self, spans: trace.Span) -> export.SpanExportResult:
32+
if (
33+
self.max_export_batch_size is not None
34+
and len(spans) > self.max_export_batch_size
35+
):
36+
raise ValueError("Batch is too big")
37+
self.destination.extend(span.name for span in spans)
38+
return export.SpanExportResult.SUCCESS
3039

40+
41+
class TestSimpleExportSpanProcessor(unittest.TestCase):
42+
def test_simple_span_processor(self):
3143
tracer = trace.Tracer()
3244

3345
spans_names_list = []
@@ -42,3 +54,142 @@ def export(self, spans: trace.Span) -> export.SpanExportResult:
4254
pass
4355

4456
self.assertListEqual(["xxx", "bar", "foo"], spans_names_list)
57+
58+
59+
def _create_start_and_end_span(name, span_processor):
60+
span = trace.Span(
61+
name,
62+
mock.Mock(spec=trace_api.SpanContext),
63+
span_processor=span_processor,
64+
)
65+
span.start()
66+
span.end()
67+
68+
69+
class TestBatchExportSpanProcessor(unittest.TestCase):
70+
def test_batch_span_processor(self):
71+
spans_names_list = []
72+
73+
my_exporter = MySpanExporter(destination=spans_names_list)
74+
span_processor = export.BatchExportSpanProcessor(my_exporter)
75+
76+
span_names = ["xxx", "bar", "foo"]
77+
78+
for name in span_names:
79+
_create_start_and_end_span(name, span_processor)
80+
81+
span_processor.shutdown()
82+
self.assertListEqual(span_names, spans_names_list)
83+
84+
def test_batch_span_processor_lossless(self):
85+
"""Test that no spans are lost when sending max_queue_size spans"""
86+
spans_names_list = []
87+
88+
my_exporter = MySpanExporter(
89+
destination=spans_names_list, max_export_batch_size=128
90+
)
91+
span_processor = export.BatchExportSpanProcessor(
92+
my_exporter, max_queue_size=512, max_export_batch_size=128
93+
)
94+
95+
for _ in range(512):
96+
_create_start_and_end_span("foo", span_processor)
97+
98+
span_processor.shutdown()
99+
self.assertEqual(len(spans_names_list), 512)
100+
101+
def test_batch_span_processor_many_spans(self):
102+
"""Test that no spans are lost when sending many spans"""
103+
spans_names_list = []
104+
105+
my_exporter = MySpanExporter(
106+
destination=spans_names_list, max_export_batch_size=128
107+
)
108+
span_processor = export.BatchExportSpanProcessor(
109+
my_exporter,
110+
max_queue_size=256,
111+
max_export_batch_size=64,
112+
schedule_delay_millis=100,
113+
)
114+
115+
for _ in range(4):
116+
for _ in range(256):
117+
_create_start_and_end_span("foo", span_processor)
118+
119+
time.sleep(0.05) # give some time for the exporter to upload spans
120+
121+
span_processor.shutdown()
122+
self.assertEqual(len(spans_names_list), 1024)
123+
124+
def test_batch_span_processor_scheduled_delay(self):
125+
"""Test that spans are exported each schedule_delay_millis"""
126+
spans_names_list = []
127+
128+
my_exporter = MySpanExporter(destination=spans_names_list)
129+
span_processor = export.BatchExportSpanProcessor(
130+
my_exporter, schedule_delay_millis=50
131+
)
132+
133+
# create single span
134+
_create_start_and_end_span("foo", span_processor)
135+
136+
time.sleep(0.05 + 0.02)
137+
# span should be already exported
138+
self.assertEqual(len(spans_names_list), 1)
139+
140+
span_processor.shutdown()
141+
142+
def test_batch_span_processor_parameters(self):
143+
# zero max_queue_size
144+
self.assertRaises(
145+
ValueError, export.BatchExportSpanProcessor, None, max_queue_size=0
146+
)
147+
148+
# negative max_queue_size
149+
self.assertRaises(
150+
ValueError,
151+
export.BatchExportSpanProcessor,
152+
None,
153+
max_queue_size=-500,
154+
)
155+
156+
# zero schedule_delay_millis
157+
self.assertRaises(
158+
ValueError,
159+
export.BatchExportSpanProcessor,
160+
None,
161+
schedule_delay_millis=0,
162+
)
163+
164+
# negative schedule_delay_millis
165+
self.assertRaises(
166+
ValueError,
167+
export.BatchExportSpanProcessor,
168+
None,
169+
schedule_delay_millis=-500,
170+
)
171+
172+
# zero max_export_batch_size
173+
self.assertRaises(
174+
ValueError,
175+
export.BatchExportSpanProcessor,
176+
None,
177+
max_export_batch_size=0,
178+
)
179+
180+
# negative max_export_batch_size
181+
self.assertRaises(
182+
ValueError,
183+
export.BatchExportSpanProcessor,
184+
None,
185+
max_export_batch_size=-500,
186+
)
187+
188+
# max_export_batch_size > max_queue_size:
189+
self.assertRaises(
190+
ValueError,
191+
export.BatchExportSpanProcessor,
192+
None,
193+
max_queue_size=256,
194+
max_export_batch_size=512,
195+
)

0 commit comments

Comments
 (0)