Skip to content

Commit efaf724

Browse files
committed
Use futures.wait when waiting for parallel flush to finish
* instead of waiting on every future and recalculating the timeout, use the existing futures.wait method. * consolidate common multi span processor tests into a separate base class
1 parent 0336f3e commit efaf724

File tree

2 files changed

+69
-81
lines changed

2 files changed

+69
-81
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -223,17 +223,19 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
223223
for sp in self._span_processors: # type: SpanProcessor
224224
future = self._executor.submit(sp.force_flush, timeout_millis)
225225
futures.append(future)
226-
deadline_ns = time_ns() + timeout_millis * 1000000
227-
flushed_in_time = True
228-
for future in futures:
229-
try:
230-
timeout_sec = (deadline_ns - time_ns()) / 1e9
231-
flushed_in_time = (
232-
future.result(timeout_sec) and flushed_in_time
233-
)
234-
except concurrent.futures.TimeoutError:
235-
flushed_in_time = False
236-
return flushed_in_time
226+
227+
timeout_sec = timeout_millis / 1e3
228+
done_futures, not_done_futures = concurrent.futures.wait(
229+
futures, timeout_sec
230+
)
231+
if not_done_futures:
232+
return False
233+
234+
for future in done_futures:
235+
if not future.result():
236+
return False
237+
238+
return True
237239

238240

239241
class EventBase(abc.ABC):
@@ -817,9 +819,7 @@ def __init__(
817819
] = None,
818820
):
819821
self._active_span_processor = (
820-
SynchronousMultiSpanProcessor()
821-
if active_span_processor is None
822-
else active_span_processor
822+
active_span_processor or SynchronousMultiSpanProcessor()
823823
)
824824
self.resource = resource
825825
self.sampler = sampler

opentelemetry-sdk/tests/trace/test_span_processor.py

Lines changed: 55 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,14 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import abc
16+
import time
17+
import typing
1518
import unittest
1619
from threading import Event
1720
from unittest import mock
1821

22+
from opentelemetry import trace as trace_api
1923
from opentelemetry.sdk import trace
2024

2125

@@ -134,33 +138,50 @@ def test_add_span_processor_after_span_creation(self):
134138
self.assertListEqual(spans_calls_list, expected_list)
135139

136140

137-
class TestSynchronousMultiSpanProcessor(unittest.TestCase):
141+
class MultiSpanProcessorTestBase(abc.ABC):
142+
@abc.abstractmethod
143+
def create_multi_span_processor(
144+
self,
145+
) -> typing.Union[
146+
trace.SynchronousMultiSpanProcessor, trace.ConcurrentMultiSpanProcessor
147+
]:
148+
pass
149+
150+
@staticmethod
151+
def create_default_span() -> trace_api.Span:
152+
span_context = trace_api.SpanContext(37, 73, is_remote=False)
153+
return trace_api.DefaultSpan(span_context)
154+
138155
def test_on_start(self):
139-
multi_processor = trace.SynchronousMultiSpanProcessor()
156+
multi_processor = self.create_multi_span_processor()
140157

141158
mocks = [mock.Mock(spec=trace.SpanProcessor) for _ in range(0, 5)]
142159
for mock_processor in mocks:
143160
multi_processor.add_span_processor(mock_processor)
144161

145-
multi_processor.on_start(mock.Mock(spec=trace.Span))
162+
span = self.create_default_span()
163+
multi_processor.on_start(span)
146164

147165
for mock_processor in mocks:
148-
self.assertEqual(1, mock_processor.on_start.call_count)
166+
mock_processor.on_start.assert_called_once_with(span)
167+
multi_processor.shutdown()
149168

150169
def test_on_end(self):
151-
multi_processor = trace.SynchronousMultiSpanProcessor()
170+
multi_processor = self.create_multi_span_processor()
152171

153172
mocks = [mock.Mock(spec=trace.SpanProcessor) for _ in range(0, 5)]
154173
for mock_processor in mocks:
155174
multi_processor.add_span_processor(mock_processor)
156175

157-
multi_processor.on_end(mock.Mock(spec=trace.Span))
176+
span = self.create_default_span()
177+
multi_processor.on_end(span)
158178

159179
for mock_processor in mocks:
160-
self.assertEqual(1, mock_processor.on_end.call_count)
180+
mock_processor.on_end.assert_called_once_with(span)
181+
multi_processor.shutdown()
161182

162183
def test_on_shutdown(self):
163-
multi_processor = trace.SynchronousMultiSpanProcessor()
184+
multi_processor = self.create_multi_span_processor()
164185

165186
mocks = [mock.Mock(spec=trace.SpanProcessor) for _ in range(0, 5)]
166187
for mock_processor in mocks:
@@ -169,10 +190,10 @@ def test_on_shutdown(self):
169190
multi_processor.shutdown()
170191

171192
for mock_processor in mocks:
172-
self.assertEqual(1, mock_processor.shutdown.call_count)
193+
mock_processor.shutdown.assert_called_once_with()
173194

174195
def test_force_flush(self):
175-
multi_processor = trace.SynchronousMultiSpanProcessor()
196+
multi_processor = self.create_multi_span_processor()
176197

177198
mocks = [mock.Mock(spec=trace.SpanProcessor) for _ in range(0, 5)]
178199
for mock_processor in mocks:
@@ -181,22 +202,35 @@ def test_force_flush(self):
181202

182203
flushed = multi_processor.force_flush(timeout_millis)
183204

205+
# pylint: disable=no-member
184206
self.assertTrue(flushed)
185207
for mock_processor in mocks:
208+
# pylint: disable=no-member
186209
self.assertEqual(1, mock_processor.force_flush.call_count)
210+
multi_processor.shutdown()
211+
212+
213+
class TestSynchronousMultiSpanProcessor(
214+
MultiSpanProcessorTestBase, unittest.TestCase
215+
):
216+
def create_multi_span_processor(
217+
self,
218+
) -> trace.SynchronousMultiSpanProcessor:
219+
return trace.SynchronousMultiSpanProcessor()
187220

188221
def test_force_flush_late_by_timeout(self):
189222
multi_processor = trace.SynchronousMultiSpanProcessor()
190223

224+
def delayed_flush(_):
225+
time.sleep(0.055)
226+
191227
mock_processor1 = mock.Mock(spec=trace.SpanProcessor)
228+
mock_processor1.force_flush = mock.Mock(side_effect=delayed_flush)
192229
multi_processor.add_span_processor(mock_processor1)
193230
mock_processor2 = mock.Mock(spec=trace.SpanProcessor)
194231
multi_processor.add_span_processor(mock_processor2)
195232

196-
with (
197-
mock.patch.object(trace, "time_ns", side_effect=(0, 0, 100000000))
198-
):
199-
flushed = multi_processor.force_flush(50)
233+
flushed = multi_processor.force_flush(50)
200234

201235
self.assertFalse(flushed)
202236
self.assertEqual(1, mock_processor1.force_flush.call_count)
@@ -217,59 +251,13 @@ def test_force_flush_late_by_span_processor(self):
217251
self.assertEqual(0, mock_processor2.force_flush.call_count)
218252

219253

220-
class TestConcurrentMultiSpanProcessor(unittest.TestCase):
221-
def test_on_start(self):
222-
multi_processor = trace.ConcurrentMultiSpanProcessor(3)
223-
224-
mocks = [mock.Mock(spec=trace.SpanProcessor) for _ in range(0, 5)]
225-
for mock_processor in mocks:
226-
multi_processor.add_span_processor(mock_processor)
227-
228-
multi_processor.on_start(mock.Mock(spec=trace.Span))
229-
230-
for mock_processor in mocks:
231-
self.assertEqual(1, mock_processor.on_start.call_count)
232-
multi_processor.shutdown()
233-
234-
def test_on_end(self):
235-
multi_processor = trace.ConcurrentMultiSpanProcessor(3)
236-
237-
mocks = [mock.Mock(spec=trace.SpanProcessor) for _ in range(0, 5)]
238-
for mock_processor in mocks:
239-
multi_processor.add_span_processor(mock_processor)
240-
mock_span = mock.Mock(spec=trace.Span)
241-
242-
multi_processor.on_end(mock_span)
243-
244-
for mock_processor in mocks:
245-
self.assertEqual(1, mock_processor.on_end.call_count)
246-
multi_processor.shutdown()
247-
248-
def test_on_shutdown(self):
249-
multi_processor = trace.ConcurrentMultiSpanProcessor(3)
250-
251-
mocks = [mock.Mock(spec=trace.SpanProcessor) for _ in range(0, 5)]
252-
for mock_processor in mocks:
253-
multi_processor.add_span_processor(mock_processor)
254-
255-
multi_processor.shutdown()
256-
257-
for mock_processor in mocks:
258-
self.assertEqual(1, mock_processor.shutdown.call_count)
259-
260-
def test_force_flush(self):
261-
multi_processor = trace.ConcurrentMultiSpanProcessor(3)
262-
263-
mocks = [mock.Mock(spec=trace.SpanProcessor) for _ in range(0, 5)]
264-
for mock_processor in mocks:
265-
multi_processor.add_span_processor(mock_processor)
266-
267-
flushed = multi_processor.force_flush(100)
268-
269-
self.assertTrue(flushed)
270-
for mock_processor in mocks:
271-
self.assertEqual(1, mock_processor.force_flush.call_count)
272-
multi_processor.shutdown()
254+
class TestConcurrentMultiSpanProcessor(
255+
MultiSpanProcessorTestBase, unittest.TestCase
256+
):
257+
def create_multi_span_processor(
258+
self,
259+
) -> trace.ConcurrentMultiSpanProcessor:
260+
return trace.ConcurrentMultiSpanProcessor(3)
273261

274262
def test_force_flush_late_by_timeout(self):
275263
multi_processor = trace.ConcurrentMultiSpanProcessor(5)

0 commit comments

Comments
 (0)