Skip to content

Commit 97b7d6d

Browse files
authored
Merge pull request #5 from aabmass/callbacks_and_generators
Allow creating async instruments with either a callback function or generator
2 parents 2f3f10e + 9ac4650 commit 97b7d6d

File tree

4 files changed

+403
-39
lines changed

4 files changed

+403
-39
lines changed

opentelemetry-api/src/opentelemetry/metrics/__init__.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,85 @@ def create_up_down_counter(
127127
def create_observable_counter(
128128
self, name, callback, unit="", description=""
129129
) -> ObservableCounter:
130+
"""Creates an observable counter instrument
131+
132+
An observable counter observes a monotonically increasing count by
133+
calling a provided callback which returns multiple
134+
:class:`~opentelemetry.metrics.measurement.Measurement`.
135+
136+
For example, an observable counter could be used to report system CPU
137+
time periodically. Here is a basic implementation::
138+
139+
def cpu_time_callback() -> Iterable[Measurement]:
140+
measurements = []
141+
with open("/proc/stat") as procstat:
142+
procstat.readline() # skip the first line
143+
for line in procstat:
144+
if not line.startswith("cpu"): break
145+
cpu, *states = line.split()
146+
measurements.append(Measurement(int(states[0]) // 100, {"cpu": cpu, "state": "user"}))
147+
measurements.append(Measurement(int(states[1]) // 100, {"cpu": cpu, "state": "nice"}))
148+
measurements.append(Measurement(int(states[2]) // 100, {"cpu": cpu, "state": "system"}))
149+
# ... other states
150+
return measurements
151+
152+
meter.create_observable_counter(
153+
"system.cpu.time",
154+
callback=cpu_time_callback,
155+
unit="s",
156+
description="CPU time"
157+
)
158+
159+
To reduce memory usage, you can use generator callbacks instead of
160+
building the full list::
161+
162+
def cpu_time_callback() -> Iterable[Measurement]:
163+
with open("/proc/stat") as procstat:
164+
procstat.readline() # skip the first line
165+
for line in procstat:
166+
if not line.startswith("cpu"): break
167+
cpu, *states = line.split()
168+
yield Measurement(int(states[0]) // 100, {"cpu": cpu, "state": "user"})
169+
yield Measurement(int(states[1]) // 100, {"cpu": cpu, "state": "nice"})
170+
# ... other states
171+
172+
Alternatively, you can pass a generator directly instead of a callback,
173+
which should return iterables of
174+
:class:`~opentelemetry.metrics.measurement.Measurement`::
175+
176+
def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Measurement]]:
177+
while True:
178+
measurements = []
179+
with open("/proc/stat") as procstat:
180+
procstat.readline() # skip the first line
181+
for line in procstat:
182+
if not line.startswith("cpu"): break
183+
cpu, *states = line.split()
184+
if "user" in states_to_include:
185+
measurements.append(Measurement(int(states[0]) // 100, {"cpu": cpu, "state": "user"}))
186+
if "nice" in states_to_include:
187+
measurements.append(Measurement(int(states[1]) // 100, {"cpu": cpu, "state": "nice"}))
188+
# ... other states
189+
yield measurements
190+
191+
meter.create_observable_counter(
192+
"system.cpu.time",
193+
callback=cpu_time_callback({"user", "system"}),
194+
unit="s",
195+
description="CPU time"
196+
)
197+
198+
Args:
199+
name: The name of the instrument to be created
200+
callback: A callback that returns an iterable of
201+
:class:`~opentelemetry.metrics.measurement.Measurement`.
202+
Alternatively, can be a generator that yields iterables of
203+
:class:`~opentelemetry.metrics.measurement.Measurement`.
204+
unit: The unit for measurements this instrument reports. For
205+
example, ``By`` for bytes. UCUM units are recommended.
206+
description: A description for this instrument and what it measures.
207+
"""
208+
130209
self._secure_instrument_name(name)
131210

132211
@abstractmethod

opentelemetry-api/src/opentelemetry/metrics/instrument.py

Lines changed: 64 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@
1616

1717
# type: ignore
1818

19-
2019
from abc import ABC, abstractmethod
20+
from collections import abc as collections_abc
2121
from logging import getLogger
2222
from re import compile as compile_
23-
from types import GeneratorType
23+
from typing import Callable, Generator, Iterable, Union
2424

2525
from opentelemetry.metrics.measurement import Measurement
2626

27+
_TInstrumentCallback = Callable[[], Iterable[Measurement]]
28+
_TInstrumentCallbackGenerator = Generator[Iterable[Measurement], None, None]
29+
TCallback = Union[_TInstrumentCallback, _TInstrumentCallbackGenerator]
30+
31+
2732
_logger = getLogger(__name__)
2833

2934

@@ -75,30 +80,65 @@ class Synchronous(Instrument):
7580
class Asynchronous(Instrument):
7681
@abstractmethod
7782
def __init__(
78-
self, name, callback, *args, unit="", description="", **kwargs
83+
self,
84+
name,
85+
callback: TCallback,
86+
*args,
87+
unit="",
88+
description="",
89+
**kwargs
7990
):
80-
super().__init__(name, *args, unit=unit, description="", **kwargs)
81-
82-
if not isinstance(callback, GeneratorType):
83-
_logger.error("callback must be a generator")
91+
super().__init__(
92+
name, *args, unit=unit, description=description, **kwargs
93+
)
8494

85-
else:
86-
super().__init__(
87-
name, unit=unit, description=description, *args, **kwargs
88-
)
95+
if isinstance(callback, collections_abc.Callable):
8996
self._callback = callback
97+
elif isinstance(callback, collections_abc.Generator):
98+
self._callback = self._wrap_generator_callback(callback)
99+
else:
100+
_logger.error("callback must be a callable or generator")
101+
102+
def _wrap_generator_callback(
103+
self,
104+
generator_callback: _TInstrumentCallbackGenerator,
105+
) -> _TInstrumentCallback:
106+
"""Wraps a generator style callback into a callable one"""
107+
has_items = True
108+
109+
def inner() -> Iterable[Measurement]:
110+
nonlocal has_items
111+
if not has_items:
112+
return []
113+
114+
try:
115+
return next(generator_callback)
116+
except StopIteration:
117+
has_items = False
118+
_logger.error(
119+
"callback generator for instrument %s ran out of measurements",
120+
self._name,
121+
)
122+
return []
123+
124+
return inner
90125

91-
@property
92126
def callback(self):
93-
def function():
94-
measurement = next(self._callback)
127+
measurements = self._callback()
128+
if not isinstance(measurements, collections_abc.Iterable):
129+
_logger.error(
130+
"Callback must return an iterable of Measurement, got %s",
131+
type(measurements),
132+
)
133+
return
134+
for measurement in measurements:
95135
if not isinstance(measurement, Measurement):
96-
_logger.error("Callback must return a Measurement")
97-
return None
98-
99-
return measurement
100-
101-
return function
136+
_logger.error(
137+
"Callback must return an iterable of Measurement, "
138+
"iterable contained type %s",
139+
type(measurement),
140+
)
141+
yield measurement
102142

103143

104144
class _Adding(Instrument):
@@ -147,18 +187,13 @@ def add(self, amount, attributes=None):
147187

148188

149189
class ObservableCounter(_Monotonic, Asynchronous):
150-
@property
151190
def callback(self):
152-
def function():
153-
measurement = super(ObservableCounter, self).callback()
191+
measurements = super().callback()
154192

155-
if measurement is not None and measurement.value < 0:
193+
for measurement in measurements:
194+
if measurement.value < 0:
156195
_logger.error("Amount must be non-negative")
157-
return None
158-
159-
return measurement
160-
161-
return function
196+
yield measurement
162197

163198

164199
class DefaultObservableCounter(ObservableCounter):

0 commit comments

Comments
 (0)