Skip to content

Allow creating async instruments with either a callback function or generator #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions opentelemetry-api/src/opentelemetry/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,85 @@ def create_up_down_counter(
def create_observable_counter(
self, name, callback, unit="", description=""
) -> ObservableCounter:
"""Creates an observable counter instrument

An observable counter observes a monotonically increasing count by
calling a provided callback which returns multiple
:class:`~opentelemetry.metrics.measurement.Measurement`.

For example, an observable counter could be used to report system CPU
time periodically. Here is a basic implementation::

def cpu_time_callback() -> Iterable[Measurement]:
measurements = []
with open("/proc/stat") as procstat:
procstat.readline() # skip the first line
for line in procstat:
if not line.startswith("cpu"): break
cpu, *states = line.split()
measurements.append(Measurement(int(states[0]) // 100, {"cpu": cpu, "state": "user"}))
measurements.append(Measurement(int(states[1]) // 100, {"cpu": cpu, "state": "nice"}))
measurements.append(Measurement(int(states[2]) // 100, {"cpu": cpu, "state": "system"}))
# ... other states
return measurements

meter.create_observable_counter(
"system.cpu.time",
callback=cpu_time_callback,
unit="s",
description="CPU time"
)

To reduce memory usage, you can use generator callbacks instead of
building the full list::

def cpu_time_callback() -> Iterable[Measurement]:
with open("/proc/stat") as procstat:
procstat.readline() # skip the first line
for line in procstat:
if not line.startswith("cpu"): break
cpu, *states = line.split()
yield Measurement(int(states[0]) // 100, {"cpu": cpu, "state": "user"})
yield Measurement(int(states[1]) // 100, {"cpu": cpu, "state": "nice"})
# ... other states

Alternatively, you can pass a generator directly instead of a callback,
which should return iterables of
:class:`~opentelemetry.metrics.measurement.Measurement`::

def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Measurement]]:
while True:
measurements = []
with open("/proc/stat") as procstat:
procstat.readline() # skip the first line
for line in procstat:
if not line.startswith("cpu"): break
cpu, *states = line.split()
if "user" in states_to_include:
measurements.append(Measurement(int(states[0]) // 100, {"cpu": cpu, "state": "user"}))
if "nice" in states_to_include:
measurements.append(Measurement(int(states[1]) // 100, {"cpu": cpu, "state": "nice"}))
# ... other states
yield measurements

meter.create_observable_counter(
"system.cpu.time",
callback=cpu_time_callback({"user", "system"}),
unit="s",
description="CPU time"
)

Args:
name: The name of the instrument to be created
callback: A callback that returns an iterable of
:class:`~opentelemetry.metrics.measurement.Measurement`.
Alternatively, can be a generator that yields iterables of
:class:`~opentelemetry.metrics.measurement.Measurement`.
unit: The unit for measurements this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""

self._secure_instrument_name(name)

@abstractmethod
Expand Down
93 changes: 64 additions & 29 deletions opentelemetry-api/src/opentelemetry/metrics/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@

# type: ignore


from abc import ABC, abstractmethod
from collections import abc as collections_abc
from logging import getLogger
from re import compile as compile_
from types import GeneratorType
from typing import Callable, Generator, Iterable, Union

from opentelemetry.metrics.measurement import Measurement

_TInstrumentCallback = Callable[[], Iterable[Measurement]]
_TInstrumentCallbackGenerator = Generator[Iterable[Measurement], None, None]
TCallback = Union[_TInstrumentCallback, _TInstrumentCallbackGenerator]


_logger = getLogger(__name__)


Expand Down Expand Up @@ -75,30 +80,65 @@ class Synchronous(Instrument):
class Asynchronous(Instrument):
@abstractmethod
def __init__(
self, name, callback, *args, unit="", description="", **kwargs
self,
name,
callback: TCallback,
*args,
unit="",
description="",
**kwargs
):
super().__init__(name, *args, unit=unit, description="", **kwargs)

if not isinstance(callback, GeneratorType):
_logger.error("callback must be a generator")
super().__init__(
name, *args, unit=unit, description=description, **kwargs
)

else:
super().__init__(
name, unit=unit, description=description, *args, **kwargs
)
if isinstance(callback, collections_abc.Callable):
self._callback = callback
elif isinstance(callback, collections_abc.Generator):
self._callback = self._wrap_generator_callback(callback)
else:
_logger.error("callback must be a callable or generator")

def _wrap_generator_callback(
self,
generator_callback: _TInstrumentCallbackGenerator,
) -> _TInstrumentCallback:
"""Wraps a generator style callback into a callable one"""
has_items = True

def inner() -> Iterable[Measurement]:
nonlocal has_items
if not has_items:
return []

try:
return next(generator_callback)
except StopIteration:
has_items = False
_logger.error(
"callback generator for instrument %s ran out of measurements",
self._name,
)
return []

return inner

@property
def callback(self):
def function():
measurement = next(self._callback)
measurements = self._callback()
if not isinstance(measurements, collections_abc.Iterable):
_logger.error(
"Callback must return an iterable of Measurement, got %s",
type(measurements),
)
return
for measurement in measurements:
if not isinstance(measurement, Measurement):
_logger.error("Callback must return a Measurement")
return None

return measurement

return function
_logger.error(
"Callback must return an iterable of Measurement, "
"iterable contained type %s",
type(measurement),
)
yield measurement


class _Adding(Instrument):
Expand Down Expand Up @@ -147,18 +187,13 @@ def add(self, amount, attributes=None):


class ObservableCounter(_Monotonic, Asynchronous):
@property
def callback(self):
def function():
measurement = super(ObservableCounter, self).callback()
measurements = super().callback()

if measurement is not None and measurement.value < 0:
for measurement in measurements:
if measurement.value < 0:
_logger.error("Amount must be non-negative")
return None

return measurement

return function
yield measurement


class DefaultObservableCounter(ObservableCounter):
Expand Down
Loading