|
16 | 16 |
|
17 | 17 | import logging
|
18 | 18 |
|
| 19 | +from backoff import expo, on_predicate |
| 20 | +from grpc import StatusCode |
| 21 | +from typing import Sequence |
| 22 | + |
19 | 23 | from opentelemetry.sdk.metrics.export import MetricsExporter
|
| 24 | +from opentelemetry.sdk.metrics.export import ( |
| 25 | + MetricRecord, |
| 26 | + MetricsExportResult, |
| 27 | +) |
20 | 28 |
|
21 | 29 | logger = logging.getLogger(__name__)
|
22 | 30 |
|
|
25 | 33 | class OTLPMetricsExporter(MetricsExporter):
|
26 | 34 | """OTLP metrics exporter"""
|
27 | 35 |
|
28 |
| - def export(self): |
29 |
| - pass |
| 36 | + |
| 37 | + def __init__(self): |
| 38 | + self._client = trace_service_pb2_grpc.TraceServiceStub( |
| 39 | + grpc.insecure_channel(self.endpoint) |
| 40 | + ) |
| 41 | + |
| 42 | + def export( |
| 43 | + self, metric_records: Sequence[MetricRecord] |
| 44 | + ) -> MetricsExportResult: |
| 45 | + # expo returns a generator that yields delay values which grow |
| 46 | + # exponentially. Once delay is greater than max_value, the yielded |
| 47 | + # value will remain constant. |
| 48 | + # max_value is set to 900 (900 seconds is 15 minutes) to use the same |
| 49 | + # value as used in the Go implementation. |
| 50 | + for delay in expo(max_value=900): |
| 51 | + try: |
| 52 | + for _ in self.client.Export( |
| 53 | + self.generate_metrics_requests(metric_records) |
| 54 | + ): |
| 55 | + pass |
| 56 | + |
| 57 | + return MetricsExportResult.SUCESS |
| 58 | + |
| 59 | + except grpc.RpcError as error: |
| 60 | + |
| 61 | + if error.code() in [ |
| 62 | + StatusCode.CANCELLED, |
| 63 | + StatusCode.DEADLINE_EXCEEDED, |
| 64 | + StatusCode.PERMISSION_DENIED, |
| 65 | + StatusCode.UNAUTHENTICATED, |
| 66 | + StatusCode.RESOURCE_EXHAUSTED, |
| 67 | + StatusCode.ABORTED, |
| 68 | + StatusCode.OUT_OF_RANGE, |
| 69 | + StatusCode.UNAVAILABLE, |
| 70 | + StatusCode.DATA_LOSS, |
| 71 | + ]: |
| 72 | + sleep(delay) |
| 73 | + continue |
| 74 | + |
| 75 | + if error.code() == StatusCode.OK: |
| 76 | + return MetricsExportResult.SUCESS |
| 77 | + |
| 78 | + return MetricsExportResult.FAILURE |
| 79 | + |
| 80 | + |
| 81 | + |
| 82 | + |
| 83 | + # Find out from the error code if another attempt is to be made. |
| 84 | + # Find out if the server has returned a delay, if so, use it to |
| 85 | + # wait instead of exponential backoff. |
| 86 | + return MetricsExportResult.FAILURE |
| 87 | + |
| 88 | + return MetricsExportResult.SUCESS |
30 | 89 |
|
31 | 90 | def shutdown(self):
|
32 | 91 | pass
|
0 commit comments