Skip to content

Commit 100174b

Browse files
metrics: Implement release for handles and observers
This commit implements a solution for releasing instrument handles and observers. For the handles it is based on a ref count that is increased each time the handled is acquired, when the ref count reaches 0 the handle is removed on collection time. The direct call convention is updated to release the handle after it has been updated. The observer instrument is only updated on collection time, so it can be removed as soon as the user request to do so.
1 parent 888bed9 commit 100174b

File tree

5 files changed

+162
-26
lines changed

5 files changed

+162
-26
lines changed

examples/metrics/record.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,14 @@
6565
# a labelset. A handle is essentially metric data that corresponds to a specific
6666
# set of labels. Therefore, getting a handle using the same set of labels will
6767
# yield the same metric handle.
68+
# Get a handle when you have to perform multiple operations using the same
69+
# labelset
6870
counter_handle = counter.get_handle(label_set)
69-
counter_handle.add(100)
71+
for i in range(1000):
72+
counter_handle.add(i)
73+
74+
# You can release the handle we you are done
75+
counter_handle.release()
7076

7177
# Direct metric usage
7278
# You can record metrics directly using the metric instrument. You pass in a
@@ -78,4 +84,5 @@
7884
# (metric, value) pairs. The value would be recorded for each metric using the
7985
# specified labelset for each.
8086
meter.record_batch(label_set, [(counter, 50), (counter2, 70)])
81-
time.sleep(100)
87+
88+
time.sleep(10)

opentelemetry-api/src/opentelemetry/metrics/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ def record(self, value: ValueT) -> None:
5757
value: The value to record to the handle.
5858
"""
5959

60+
def release(self) -> None:
61+
"""No-op implementation of release."""
62+
6063

6164
class CounterHandle:
6265
def add(self, value: ValueT) -> None:
@@ -346,6 +349,14 @@ def register_observer(
346349
Returns: A new ``Observer`` metric instrument.
347350
"""
348351

352+
@abc.abstractmethod
353+
def unregister_observer(self, observer: "Observer") -> None:
354+
"""Unregisters an ``Observer`` metric instrument.
355+
356+
Args:
357+
observer: The observer to unregister.
358+
"""
359+
349360
@abc.abstractmethod
350361
def get_label_set(self, labels: Dict[str, str]) -> "LabelSet":
351362
"""Gets a `LabelSet` with the given labels.
@@ -392,6 +403,9 @@ def register_observer(
392403
) -> "Observer":
393404
return DefaultObserver()
394405

406+
def unregister_observer(self, observer: "Observer") -> None:
407+
pass
408+
395409
def get_label_set(self, labels: Dict[str, str]) -> "LabelSet":
396410
# pylint: disable=no-self-use
397411
return DefaultLabelSet()

opentelemetry-api/tests/metrics/test_metrics.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ def test_measure_record(self):
4848
measure.record(1, label_set)
4949

5050
def test_default_handle(self):
51-
metrics.DefaultMetricHandle()
51+
handle = metrics.DefaultMetricHandle()
52+
handle.release()
5253

5354
def test_counter_handle(self):
5455
handle = metrics.CounterHandle()

opentelemetry-sdk/src/opentelemetry/sdk/metrics/__init__.py

Lines changed: 66 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import logging
16+
import threading
1617
from collections import OrderedDict
1718
from typing import Dict, Sequence, Tuple, Type
1819

@@ -70,6 +71,8 @@ def __init__(
7071
self.enabled = enabled
7172
self.aggregator = aggregator
7273
self.last_update_timestamp = time_ns()
74+
self._ref_count = 0
75+
self._ref_count_lock = threading.Lock()
7376

7477
def _validate_update(self, value: metrics_api.ValueT) -> bool:
7578
if not self.enabled:
@@ -85,6 +88,21 @@ def update(self, value: metrics_api.ValueT):
8588
self.last_update_timestamp = time_ns()
8689
self.aggregator.update(value)
8790

91+
def release(self):
92+
self.decrease_ref_count()
93+
94+
def decrease_ref_count(self):
95+
with self._ref_count_lock:
96+
self._ref_count -= 1
97+
98+
def increase_ref_count(self):
99+
with self._ref_count_lock:
100+
self._ref_count += 1
101+
102+
def ref_count(self):
103+
with self._ref_count_lock:
104+
return self._ref_count
105+
88106
def __repr__(self):
89107
return '{}(data="{}", last_update_timestamp={})'.format(
90108
type(self).__name__,
@@ -136,18 +154,21 @@ def __init__(
136154
self.label_keys = label_keys
137155
self.enabled = enabled
138156
self.handles = {}
157+
self.handles_lock = threading.Lock()
139158

140159
def get_handle(self, label_set: LabelSet) -> BaseHandle:
141160
"""See `opentelemetry.metrics.Metric.get_handle`."""
142-
handle = self.handles.get(label_set)
143-
if not handle:
144-
handle = self.HANDLE_TYPE(
145-
self.value_type,
146-
self.enabled,
147-
# Aggregator will be created based off type of metric
148-
self.meter.batcher.aggregator_for(self.__class__),
149-
)
150-
self.handles[label_set] = handle
161+
with self.handles_lock:
162+
handle = self.handles.get(label_set)
163+
if handle is None:
164+
handle = self.HANDLE_TYPE(
165+
self.value_type,
166+
self.enabled,
167+
# Aggregator will be created based off type of metric
168+
self.meter.batcher.aggregator_for(self.__class__),
169+
)
170+
self.handles[label_set] = handle
171+
handle.increase_ref_count()
151172
return handle
152173

153174
def __repr__(self):
@@ -186,7 +207,9 @@ def __init__(
186207

187208
def add(self, value: metrics_api.ValueT, label_set: LabelSet) -> None:
188209
"""See `opentelemetry.metrics.Counter.add`."""
189-
self.get_handle(label_set).add(value)
210+
handle = self.get_handle(label_set)
211+
handle.add(value)
212+
handle.release()
190213

191214
UPDATE_FUNCTION = add
192215

@@ -198,7 +221,9 @@ class Measure(Metric, metrics_api.Measure):
198221

199222
def record(self, value: metrics_api.ValueT, label_set: LabelSet) -> None:
200223
"""See `opentelemetry.metrics.Measure.record`."""
201-
self.get_handle(label_set).record(value)
224+
handle = self.get_handle(label_set)
225+
handle.record(value)
226+
handle.release()
202227

203228
UPDATE_FUNCTION = record
204229

@@ -295,6 +320,7 @@ def __init__(
295320
self.metrics = set()
296321
self.observers = set()
297322
self.batcher = UngroupedBatcher(stateful)
323+
self.observers_lock = threading.Lock()
298324

299325
def collect(self) -> None:
300326
"""Collects all the metrics created with this `Meter` for export.
@@ -309,26 +335,39 @@ def collect(self) -> None:
309335

310336
def _collect_metrics(self) -> None:
311337
for metric in self.metrics:
312-
if metric.enabled:
338+
if not metric.enabled:
339+
continue
340+
341+
to_remove = []
342+
343+
with metric.handles_lock:
313344
for label_set, handle in metric.handles.items():
314345
# TODO: Consider storing records in memory?
315346
record = Record(metric, label_set, handle.aggregator)
316347
# Checkpoints the current aggregators
317348
# Applies different batching logic based on type of batcher
318349
self.batcher.process(record)
319350

351+
if handle.ref_count() == 0:
352+
to_remove.append(label_set)
353+
354+
# Remove handles that were released
355+
for label_set in to_remove:
356+
del metric.handles[label_set]
357+
320358
def _collect_observers(self) -> None:
321-
for observer in self.observers:
322-
if not observer.enabled:
323-
continue
359+
with self.observers_lock:
360+
for observer in self.observers:
361+
if not observer.enabled:
362+
continue
324363

325-
# TODO: capture timestamp?
326-
if not observer.run():
327-
continue
364+
# TODO: capture timestamp?
365+
if not observer.run():
366+
continue
328367

329-
for label_set, aggregator in observer.aggregators.items():
330-
record = Record(observer, label_set, aggregator)
331-
self.batcher.process(record)
368+
for label_set, aggregator in observer.aggregators.items():
369+
record = Record(observer, label_set, aggregator)
370+
self.batcher.process(record)
332371

333372
def record_batch(
334373
self,
@@ -383,9 +422,14 @@ def register_observer(
383422
label_keys,
384423
enabled,
385424
)
386-
self.observers.add(ob)
425+
with self.observers_lock:
426+
self.observers.add(ob)
387427
return ob
388428

429+
def unregister_observer(self, observer: "Observer") -> None:
430+
with self.observers_lock:
431+
self.observers.remove(observer)
432+
389433
def get_label_set(self, labels: Dict[str, str]):
390434
"""See `opentelemetry.metrics.Meter.create_metric`.
391435

opentelemetry-sdk/tests/metrics/test_metrics.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def test_collect(self):
3535
)
3636
kvp = {"key1": "value1"}
3737
label_set = meter.get_label_set(kvp)
38-
counter.add(label_set, 1.0)
38+
counter.add(1.0, label_set)
3939
meter.metrics.add(counter)
4040
meter.collect()
4141
self.assertTrue(batcher_mock.process.called)
@@ -163,6 +163,18 @@ def test_register_observer(self):
163163
self.assertEqual(observer.label_keys, ())
164164
self.assertTrue(observer.enabled)
165165

166+
def test_unregister_observer(self):
167+
meter = metrics.MeterProvider().get_meter(__name__)
168+
169+
callback = mock.Mock()
170+
171+
observer = meter.register_observer(
172+
callback, "name", "desc", "unit", int, (), True
173+
)
174+
175+
meter.unregister_observer(observer)
176+
self.assertEqual(len(meter.observers), 0)
177+
166178
def test_get_label_set(self):
167179
meter = metrics.MeterProvider().get_meter(__name__)
168180
kvp = {"environment": "staging", "a": "z"}
@@ -177,6 +189,64 @@ def test_get_label_set_empty(self):
177189
label_set = meter.get_label_set(kvp)
178190
self.assertEqual(label_set, metrics.EMPTY_LABEL_SET)
179191

192+
def test_direct_call_release_handle(self):
193+
meter = metrics.MeterProvider().get_meter(__name__)
194+
label_keys = ("key1",)
195+
kvp = {"key1": "value1"}
196+
label_set = meter.get_label_set(kvp)
197+
198+
counter = metrics.Counter(
199+
"name", "desc", "unit", float, meter, label_keys
200+
)
201+
meter.metrics.add(counter)
202+
counter.add(4.0, label_set)
203+
204+
measure = metrics.Measure(
205+
"name", "desc", "unit", float, meter, label_keys
206+
)
207+
meter.metrics.add(measure)
208+
measure.record(42.0, label_set)
209+
210+
self.assertEqual(len(counter.handles), 1)
211+
self.assertEqual(len(measure.handles), 1)
212+
213+
meter.collect()
214+
215+
self.assertEqual(len(counter.handles), 0)
216+
self.assertEqual(len(measure.handles), 0)
217+
218+
def test_release_handle(self):
219+
meter = metrics.MeterProvider().get_meter(__name__)
220+
label_keys = ("key1",)
221+
kvp = {"key1": "value1"}
222+
label_set = meter.get_label_set(kvp)
223+
224+
counter = metrics.Counter(
225+
"name", "desc", "unit", float, meter, label_keys
226+
)
227+
meter.metrics.add(counter)
228+
counter_handle = counter.get_handle(label_set)
229+
counter_handle.add(4.0)
230+
231+
measure = metrics.Measure(
232+
"name", "desc", "unit", float, meter, label_keys
233+
)
234+
meter.metrics.add(measure)
235+
measure_handle = measure.get_handle(label_set)
236+
measure_handle.record(42)
237+
238+
counter_handle.release()
239+
measure_handle.release()
240+
241+
# be sure that handles are only released after collection
242+
self.assertEqual(len(counter.handles), 1)
243+
self.assertEqual(len(measure.handles), 1)
244+
245+
meter.collect()
246+
247+
self.assertEqual(len(counter.handles), 0)
248+
self.assertEqual(len(measure.handles), 0)
249+
180250

181251
class TestMetric(unittest.TestCase):
182252
def test_get_handle(self):

0 commit comments

Comments
 (0)