Skip to content

Commit 23ab7fd

Browse files
lzchenAlex Boten
authored and
Alex Boten
committed
Add start_pipeline to MeterProvider in SDK, atexit moved to MeterProvider (open-telemetry#791)
1 parent 240c0ef commit 23ab7fd

File tree

12 files changed

+111
-45
lines changed

12 files changed

+111
-45
lines changed

docs/examples/basic_meter/basic_metrics.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@
2626
from opentelemetry import metrics
2727
from opentelemetry.sdk.metrics import Counter, MeterProvider, ValueRecorder
2828
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
29-
from opentelemetry.sdk.metrics.export.controller import PushController
30-
31-
stateful = True
3229

3330
print(
3431
"Starting example, values will be printed to the console every 5 seconds."
@@ -37,17 +34,21 @@
3734
# Stateful determines whether how metrics are collected: if true, metrics
3835
# accumulate over the process lifetime. If false, metrics are reset at the
3936
# beginning of each collection interval.
40-
metrics.set_meter_provider(MeterProvider(stateful))
37+
stateful = True
38+
39+
# Sets the global MeterProvider instance
40+
metrics.set_meter_provider(MeterProvider())
41+
4142
# The Meter is responsible for creating and recording metrics. Each meter has a
4243
# unique name, which we set as the module's name here.
4344
meter = metrics.get_meter(__name__)
4445

4546
# Exporter to export metrics to the console
4647
exporter = ConsoleMetricsExporter()
4748

48-
# A PushController collects metrics created from meter and exports it via the
49-
# exporter every interval
50-
controller = PushController(meter=meter, exporter=exporter, interval=5)
49+
# start_pipeline will notify the MeterProvider to begin collecting/exporting
50+
# metrics with the given meter, exporter and interval in seconds
51+
metrics.get_meter_provider().start_pipeline(meter, exporter, 5)
5152

5253
# Metric instruments allow to capture measurements
5354
requests_counter = meter.create_metric(
@@ -77,7 +78,7 @@
7778
# Update the metric instruments using the direct calling convention
7879
requests_counter.add(25, staging_labels)
7980
requests_size.record(100, staging_labels)
80-
time.sleep(5)
81+
time.sleep(10)
8182

8283
requests_counter.add(50, staging_labels)
8384
requests_size.record(5000, staging_labels)

docs/examples/basic_meter/calling_conventions.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@
2121
from opentelemetry import metrics
2222
from opentelemetry.sdk.metrics import Counter, MeterProvider, ValueRecorder
2323
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
24-
from opentelemetry.sdk.metrics.export.controller import PushController
2524

2625
# Use the meter type provided by the SDK package
2726
metrics.set_meter_provider(MeterProvider())
2827
meter = metrics.get_meter(__name__)
29-
exporter = ConsoleMetricsExporter()
30-
controller = PushController(meter=meter, exporter=exporter, interval=5)
28+
metrics.get_meter_provider().start_pipeline(meter, ConsoleMetricsExporter(), 5)
3129

3230
requests_counter = meter.create_metric(
3331
name="requests",
@@ -62,7 +60,7 @@
6260
# You can record metrics directly using the metric instrument. You pass in
6361
# labels that you would like to record for.
6462
requests_counter.add(25, labels)
65-
time.sleep(5)
63+
time.sleep(10)
6664

6765
print("Updating using a bound instrument...")
6866
# You can record metrics with bound metric instruments. Bound metric

docs/examples/basic_meter/observer.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,10 @@
2121
from opentelemetry import metrics
2222
from opentelemetry.sdk.metrics import MeterProvider, ValueObserver
2323
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
24-
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
25-
from opentelemetry.sdk.metrics.export.controller import PushController
2624

2725
metrics.set_meter_provider(MeterProvider())
2826
meter = metrics.get_meter(__name__)
29-
exporter = ConsoleMetricsExporter()
30-
controller = PushController(meter=meter, exporter=exporter, interval=2)
27+
metrics.get_meter_provider().start_pipeline(meter, ConsoleMetricsExporter(), 5)
3128

3229

3330
# Callback to gather cpu usage

docs/examples/cloud_monitoring/basic_metrics.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@
2020
CloudMonitoringMetricsExporter,
2121
)
2222
from opentelemetry.sdk.metrics import Counter, MeterProvider
23-
from opentelemetry.sdk.metrics.export.controller import PushController
2423

25-
meter = metrics.get_meter(__name__, True)
26-
27-
# Gather and export metrics every 5 seconds
28-
controller = PushController(
29-
meter=meter, exporter=CloudMonitoringMetricsExporter(), interval=5
24+
metrics.set_meter_provider(MeterProvider())
25+
meter = metrics.get_meter(__name__)
26+
metrics.get_meter_provider().start_pipeline(
27+
meter, CloudMonitoringMetricsExporter(), 5
3028
)
3129

3230
requests_counter = meter.create_metric(

docs/examples/opencensus-exporter-metrics/collector.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@
2121
OpenCensusMetricsExporter,
2222
)
2323
from opentelemetry.sdk.metrics import Counter, MeterProvider
24-
from opentelemetry.sdk.metrics.export.controller import PushController
2524

2625
exporter = OpenCensusMetricsExporter(
2726
service_name="basic-service", endpoint="localhost:55678"
2827
)
2928

3029
metrics.set_meter_provider(MeterProvider())
3130
meter = metrics.get_meter(__name__)
32-
controller = PushController(meter, exporter, 5)
31+
metrics.get_meter_provider().start_pipeline(meter, exporter, 5)
3332

3433
requests_counter = meter.create_metric(
3534
name="requests",

ext/opentelemetry-ext-prometheus/src/opentelemetry/ext/prometheus/__init__.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,19 @@
2929
from opentelemetry import metrics
3030
from opentelemetry.ext.prometheus import PrometheusMetricsExporter
3131
from opentelemetry.sdk.metrics import Counter, Meter
32-
from opentelemetry.sdk.metrics.export.controller import PushController
3332
from prometheus_client import start_http_server
3433
3534
# Start Prometheus client
3635
start_http_server(port=8000, addr="localhost")
3736
3837
# Meter is responsible for creating and recording metrics
3938
metrics.set_meter_provider(MeterProvider())
40-
meter = metrics.meter()
39+
meter = metrics.get_meter(__name__)
4140
# exporter to export metrics to Prometheus
4241
prefix = "MyAppPrefix"
4342
exporter = PrometheusMetricsExporter(prefix)
44-
# controller collects metrics created from meter and exports it via the
45-
# exporter every interval
46-
controller = PushController(meter, exporter, 5)
43+
# Starts the collect/export pipeline for metrics
44+
metrics.get_meter_provider().start_pipeline(meter, exporter, 5)
4745
4846
counter = meter.create_metric(
4947
"requests",

ext/opentelemetry-ext-system-metrics/src/opentelemetry/ext/system_metrics/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,12 @@
2828
2929
.. code:: python
3030
31+
from opentelemetry import metrics
3132
from opentelemetry.ext.system_metrics import SystemMetrics
33+
from opentelemetry.sdk.metrics import MeterProvider,
3234
from opentelemetry.sdk.metrics.export import ConsoleMetricsExporter
3335
36+
metrics.set_meter_provider(MeterProvider())
3437
exporter = ConsoleMetricsExporter()
3538
SystemMetrics(exporter)
3639

opentelemetry-sdk/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
([#764](https://github.com/open-telemetry/opentelemetry-python/pull/764))
1515
- Add SumObserver, UpDownSumObserver and LastValueAggregator in metrics
1616
([#789](https://github.com/open-telemetry/opentelemetry-python/pull/789))
17+
- Add start_pipeline to MeterProvider
18+
([#791](https://github.com/open-telemetry/opentelemetry-python/pull/791))
1719

1820
## 0.8b0
1921

opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,19 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import atexit
1516
import logging
1617
import threading
1718
from typing import Dict, Sequence, Tuple, Type
1819

1920
from opentelemetry import metrics as metrics_api
21+
from opentelemetry.sdk.metrics.export import (
22+
ConsoleMetricsExporter,
23+
MetricsExporter,
24+
)
2025
from opentelemetry.sdk.metrics.export.aggregate import Aggregator
2126
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
27+
from opentelemetry.sdk.metrics.export.controller import PushController
2228
from opentelemetry.sdk.resources import Resource
2329
from opentelemetry.sdk.util.instrumentation import InstrumentationInfo
2430

@@ -449,24 +455,64 @@ class MeterProvider(metrics_api.MeterProvider):
449455
Args:
450456
stateful: Indicates whether meters created are going to be stateful
451457
resource: Resource for this MeterProvider
458+
shutdown_on_exit: Register an atexit hook to shut down when the
459+
application exists
452460
"""
453461

454462
def __init__(
455-
self, stateful=True, resource: Resource = Resource.create_empty(),
463+
self,
464+
stateful=True,
465+
resource: Resource = Resource.create_empty(),
466+
shutdown_on_exit: bool = True,
456467
):
457468
self.stateful = stateful
458469
self.resource = resource
470+
self._controllers = []
471+
self._exporters = set()
472+
self._atexit_handler = None
473+
if shutdown_on_exit:
474+
self._atexit_handler = atexit.register(self.shutdown)
459475

460476
def get_meter(
461477
self,
462478
instrumenting_module_name: str,
463479
instrumenting_library_version: str = "",
464480
) -> "metrics_api.Meter":
481+
"""See `opentelemetry.metrics.MeterProvider`.get_meter."""
465482
if not instrumenting_module_name: # Reject empty strings too.
466-
raise ValueError("get_meter called with missing module name.")
483+
instrumenting_module_name = "ERROR:MISSING MODULE NAME"
484+
logger.error("get_meter called with missing module name.")
467485
return Meter(
468486
self,
469487
InstrumentationInfo(
470-
instrumenting_module_name, instrumenting_library_version
488+
instrumenting_module_name, instrumenting_library_version,
471489
),
472490
)
491+
492+
def start_pipeline(
493+
self,
494+
meter: metrics_api.Meter,
495+
exporter: MetricsExporter = None,
496+
interval: float = 15.0,
497+
) -> None:
498+
"""Method to begin the collect/export pipeline.
499+
500+
Args:
501+
meter: The meter to collect metrics from.
502+
exporter: The exporter to export metrics to.
503+
interval: The collect/export interval in seconds.
504+
"""
505+
if not exporter:
506+
exporter = ConsoleMetricsExporter()
507+
self._exporters.add(exporter)
508+
# TODO: Controller type configurable?
509+
self._controllers.append(PushController(meter, exporter, interval))
510+
511+
def shutdown(self) -> None:
512+
for controller in self._controllers:
513+
controller.shutdown()
514+
for exporter in self._exporters:
515+
exporter.shutdown()
516+
if self._atexit_handler is not None:
517+
atexit.unregister(self._atexit_handler)
518+
self._atexit_handler = None

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/controller.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,35 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import atexit
1615
import threading
1716

1817
from opentelemetry.context import attach, detach, set_value
18+
from opentelemetry.metrics import Meter
19+
from opentelemetry.sdk.metrics.export import MetricsExporter
1920

2021

2122
class PushController(threading.Thread):
22-
"""A push based controller, used for exporting.
23+
"""A push based controller, used for collecting and exporting.
2324
2425
Uses a worker thread that periodically collects metrics for exporting,
2526
exports them and performs some post-processing.
27+
28+
Args:
29+
meter: The meter used to collect metrics.
30+
exporter: The exporter used to export metrics.
31+
interval: The collect/export interval in seconds.
2632
"""
2733

2834
daemon = True
2935

30-
def __init__(self, meter, exporter, interval, shutdown_on_exit=True):
36+
def __init__(
37+
self, meter: Meter, exporter: MetricsExporter, interval: float
38+
):
3139
super().__init__()
3240
self.meter = meter
3341
self.exporter = exporter
3442
self.interval = interval
3543
self.finished = threading.Event()
36-
self._atexit_handler = None
37-
if shutdown_on_exit:
38-
self._atexit_handler = atexit.register(self.shutdown)
3944
self.start()
4045

4146
def run(self):
@@ -46,17 +51,13 @@ def shutdown(self):
4651
self.finished.set()
4752
# Run one more collection pass to flush metrics batched in the meter
4853
self.tick()
49-
self.exporter.shutdown()
50-
if self._atexit_handler is not None:
51-
atexit.unregister(self._atexit_handler)
52-
self._atexit_handler = None
5354

5455
def tick(self):
5556
# Collect all of the meter's metrics to be exported
5657
self.meter.collect()
58+
# Export the collected metrics
5759
token = attach(set_value("suppress_instrumentation", True))
58-
# Export the given metrics in the batcher
5960
self.exporter.export(self.meter.batcher.checkpoint_set())
6061
detach(token)
61-
# Perform post-exporting logic based on batcher configuration
62+
# Perform post-exporting logic
6263
self.meter.batcher.finished_collection()

opentelemetry-sdk/tests/metrics/export/test_export.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,6 @@ def test_push_controller(self):
614614

615615
controller.shutdown()
616616
self.assertTrue(controller.finished.isSet())
617-
exporter.shutdown.assert_any_call()
618617

619618
# shutdown should flush the meter
620619
self.assertEqual(meter.collect.call_count, 1)

opentelemetry-sdk/tests/metrics/test_metrics.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,30 @@ def test_resource_empty(self):
3838
# pylint: disable=protected-access
3939
self.assertIs(meter.resource, resources._EMPTY_RESOURCE)
4040

41+
def test_start_pipeline(self):
42+
exporter = mock.Mock()
43+
meter_provider = metrics.MeterProvider()
44+
meter = meter_provider.get_meter(__name__)
45+
# pylint: disable=protected-access
46+
meter_provider.start_pipeline(meter, exporter, 6)
47+
try:
48+
self.assertEqual(len(meter_provider._exporters), 1)
49+
self.assertEqual(len(meter_provider._controllers), 1)
50+
finally:
51+
meter_provider.shutdown()
52+
53+
def test_shutdown(self):
54+
controller = mock.Mock()
55+
exporter = mock.Mock()
56+
meter_provider = metrics.MeterProvider()
57+
# pylint: disable=protected-access
58+
meter_provider._controllers = [controller]
59+
meter_provider._exporters = [exporter]
60+
meter_provider.shutdown()
61+
self.assertEqual(controller.shutdown.call_count, 1)
62+
self.assertEqual(exporter.shutdown.call_count, 1)
63+
self.assertIsNone(meter_provider._atexit_handler)
64+
4165

4266
class TestMeter(unittest.TestCase):
4367
def test_extends_api(self):

0 commit comments

Comments
 (0)