Skip to content

Commit ad3e239

Browse files
authored
Update PeriodicExportingMetricReader to never call export() concurrently (#2873)
1 parent b26c5e8 commit ad3e239

File tree

3 files changed

+122
-2
lines changed

3 files changed

+122
-2
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
- Update PeriodicExportingMetricReader to never call export() concurrently
11+
([#2873](https://github.com/open-telemetry/opentelemetry-python/pull/2873))
12+
1013
## [1.12.0-0.33b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0-0.33b0) - 2022-08-08
1114

1215
- Add `force_flush` method to metrics exporter

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from logging import getLogger
1919
from os import environ, linesep
2020
from sys import stdout
21-
from threading import Event, RLock, Thread
21+
from threading import Event, Lock, RLock, Thread
2222
from typing import IO, Callable, Dict, Iterable, Optional
2323

2424
from typing_extensions import final
@@ -404,6 +404,9 @@ class PeriodicExportingMetricReader(MetricReader):
404404
"""`PeriodicExportingMetricReader` is an implementation of `MetricReader`
405405
that collects metrics based on a user-configurable time interval, and passes the
406406
metrics to the configured exporter.
407+
408+
The configured exporter's :py:meth:`~MetricExporter.export` method will not be called
409+
concurrently.
407410
"""
408411

409412
def __init__(
@@ -417,6 +420,12 @@ def __init__(
417420
preferred_temporality=exporter._preferred_temporality,
418421
preferred_aggregation=exporter._preferred_aggregation,
419422
)
423+
424+
# This lock is held whenever calling self._exporter.export() to prevent concurrent
425+
# execution of MetricExporter.export()
426+
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exportbatch
427+
self._export_lock = Lock()
428+
420429
self._exporter = exporter
421430
if export_interval_millis is None:
422431
try:
@@ -479,7 +488,10 @@ def _receive_metrics(
479488
return
480489
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
481490
try:
482-
self._exporter.export(metrics_data, timeout_millis=timeout_millis)
491+
with self._export_lock:
492+
self._exporter.export(
493+
metrics_data, timeout_millis=timeout_millis
494+
)
483495
except Exception as e: # pylint: disable=broad-except,invalid-name
484496
_logger.exception("Exception while exporting metrics %s", str(e))
485497
detach(token)
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import time
16+
from threading import Lock
17+
18+
from opentelemetry.metrics import CallbackOptions, Observation
19+
from opentelemetry.sdk.metrics import MeterProvider
20+
from opentelemetry.sdk.metrics.export import (
21+
MetricExporter,
22+
MetricExportResult,
23+
MetricsData,
24+
PeriodicExportingMetricReader,
25+
)
26+
from opentelemetry.test.concurrency_test import ConcurrencyTestBase
27+
28+
29+
class MaxCountExporter(MetricExporter):
30+
def __init__(self) -> None:
31+
super().__init__(None, None)
32+
self._lock = Lock()
33+
34+
# the number of threads inside of export()
35+
self.count_in_export = 0
36+
37+
# the total count of calls to export()
38+
self.export_count = 0
39+
40+
# the maximum number of threads in export() ever
41+
self.max_count_in_export = 0
42+
43+
def export(
44+
self,
45+
metrics_data: MetricsData,
46+
timeout_millis: float = 10_000,
47+
**kwargs,
48+
) -> MetricExportResult:
49+
with self._lock:
50+
self.export_count += 1
51+
self.count_in_export += 1
52+
53+
# yield to other threads
54+
time.sleep(0)
55+
56+
with self._lock:
57+
self.max_count_in_export = max(
58+
self.max_count_in_export, self.count_in_export
59+
)
60+
self.count_in_export -= 1
61+
62+
def force_flush(self, timeout_millis: float = 10_000) -> bool:
63+
return True
64+
65+
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
66+
pass
67+
68+
69+
class TestExporterConcurrency(ConcurrencyTestBase):
70+
"""
71+
Tests the requirement that:
72+
73+
> `Export` will never be called concurrently for the same exporter instance. `Export` can
74+
> be called again only after the current call returns.
75+
76+
https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exportbatch
77+
"""
78+
79+
def test_exporter_not_called_concurrently(self):
80+
exporter = MaxCountExporter()
81+
reader = PeriodicExportingMetricReader(
82+
exporter=exporter,
83+
export_interval_millis=100_000,
84+
)
85+
meter_provider = MeterProvider(metric_readers=[reader])
86+
87+
def counter_cb(options: CallbackOptions):
88+
yield Observation(2)
89+
90+
meter_provider.get_meter(__name__).create_observable_counter(
91+
"testcounter", callbacks=[counter_cb]
92+
)
93+
94+
# call collect from a bunch of threads to try and enter export() concurrently
95+
def test_many_threads():
96+
reader.collect()
97+
98+
self.run_with_many_threads(test_many_threads, num_threads=100)
99+
100+
# no thread should be in export() now
101+
self.assertEqual(exporter.count_in_export, 0)
102+
# should be one call for each thread
103+
self.assertEqual(exporter.export_count, 100)
104+
# should never have been more than one concurrent call
105+
self.assertEqual(exporter.max_count_in_export, 1)

0 commit comments

Comments
 (0)