From 0322bf75e4e6516a285258e05ddda2eface8efac Mon Sep 17 00:00:00 2001 From: Makro <4398091+xmakro@users.noreply.github.com> Date: Sat, 21 Dec 2024 09:46:10 -0800 Subject: [PATCH 1/6] Disconnect gRPC client stub when shutting down OTLPSpanExporter --- .../src/opentelemetry/exporter/otlp/proto/grpc/exporter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 582d083e86f..2f491b7ed47 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -360,6 +360,7 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: # wait for the last export if any self._export_lock.acquire(timeout=timeout_millis / 1e3) self._shutdown = True + self._client = None self._export_lock.release() @property From 9d4e3168805f5719c8aa1531d1ae007f1086838a Mon Sep 17 00:00:00 2001 From: Makro <4398091+xmakro@users.noreply.github.com> Date: Mon, 23 Dec 2024 13:20:19 -0800 Subject: [PATCH 2/6] Update CHANGELOG.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ad1d754bd87..55088cc1b44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#4364](https://github.com/open-telemetry/opentelemetry-python/pull/4364)) - Add Python 3.13 support ([#4353](https://github.com/open-telemetry/opentelemetry-python/pull/4353)) +- Disconnect gRPC client stub when shutting down `OTLPSpanExporter` + ([#4370](https://github.com/open-telemetry/opentelemetry-python/pull/4370)) ## Version 1.29.0/0.50b0 (2024-12-11) From 3881da627a53542d1ae4cd4e34074f55d061eff0 Mon Sep 17 00:00:00 2001 From: Makro <4398091+xmakro@users.noreply.github.com> Date: Tue, 24 Dec 2024 11:08:50 -0800 Subject: [PATCH 3/6] Close channel instead of destroying client --- .../exporter/otlp/proto/grpc/exporter.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 2f491b7ed47..2e3b99500d9 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -243,9 +243,7 @@ def __init__( ) or Compression.NoCompression if insecure: - self._client = self._stub( - insecure_channel(self._endpoint, compression=compression) - ) + self._channel = insecure_channel(self._endpoint, compression=compression) else: credentials = _get_credentials( credentials, @@ -253,11 +251,10 @@ def __init__( OTEL_EXPORTER_OTLP_CLIENT_KEY, OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE, ) - self._client = self._stub( - secure_channel( - self._endpoint, credentials, compression=compression - ) + self._channel = secure_channel( + self._endpoint, credentials, compression=compression ) + self._client = self._stub(self._channel) self._export_lock = threading.Lock() self._shutdown = False @@ -360,7 +357,7 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: # wait for the last export if any self._export_lock.acquire(timeout=timeout_millis / 1e3) self._shutdown = True - self._client = None + self._channel.close() self._export_lock.release() @property From a80d72adf4c82fa366335e0fde2f07feca509d9f Mon Sep 17 00:00:00 2001 From: rjduffner Date: Tue, 18 Feb 2025 15:41:33 -0800 Subject: [PATCH 4/6] linty linty --- .../src/opentelemetry/exporter/otlp/proto/grpc/exporter.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 2e3b99500d9..4be75c5335e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -243,7 +243,9 @@ def __init__( ) or Compression.NoCompression if insecure: - self._channel = insecure_channel(self._endpoint, compression=compression) + self._channel = insecure_channel( + self._endpoint, compression=compression + ) else: credentials = _get_credentials( credentials, From 1234b930d21fd93cda51efc5d36472bc026f6a03 Mon Sep 17 00:00:00 2001 From: emdneto <9735060+emdneto@users.noreply.github.com> Date: Fri, 28 Feb 2025 14:07:35 -0300 Subject: [PATCH 5/6] add tests Signed-off-by: emdneto <9735060+emdneto@users.noreply.github.com> --- .../tests/test_otlp_metrics_exporter.py | 13 +++++++++++++ .../tests/test_otlp_trace_exporter.py | 13 +++++++++++++ 2 files changed, 26 insertions(+) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index 1d2bae2486d..c1f46aba0b2 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -874,6 +874,19 @@ def test_shutdown_wait_last_export(self): finally: export_thread.join() + def test_export_over_closed_grpc_channel(self): + add_MetricsServiceServicer_to_server( + MetricsServiceServicerSUCCESS(), self.server + ) + self.exporter.export(self.metrics["sum_int"]) + self.exporter.shutdown() + data = self.exporter._translate_data(self.metrics["sum_int"]) + with self.assertRaises(ValueError) as err: + self.exporter._client.Export(request=data) + self.assertEqual( + str(err.exception), + "Cannot invoke RPC on closed channel!" + ) def test_aggregation_temporality(self): # pylint: disable=protected-access diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index fe0b94ac787..3da720a4e2f 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -1017,6 +1017,19 @@ def test_shutdown_wait_last_export(self): finally: export_thread.join() + def test_export_over_closed_grpc_channel(self): + add_TraceServiceServicer_to_server( + TraceServiceServicerSUCCESS(), self.server + ) + self.exporter.export([self.span]) + self.exporter.shutdown() + data = self.exporter._translate_data([self.span]) + with self.assertRaises(ValueError) as err: + self.exporter._client.Export(request=data) + self.assertEqual( + str(err.exception), + "Cannot invoke RPC on closed channel!" + ) def _create_span_with_status(status: SDKStatus): span = _Span( From 6043a53f4f38fe2c29e685b29b5c07b90d888dd1 Mon Sep 17 00:00:00 2001 From: emdneto <9735060+emdneto@users.noreply.github.com> Date: Fri, 28 Feb 2025 14:16:20 -0300 Subject: [PATCH 6/6] fix ruff and pylint Signed-off-by: emdneto <9735060+emdneto@users.noreply.github.com> --- .../tests/test_otlp_metrics_exporter.py | 6 ++++-- .../tests/test_otlp_trace_exporter.py | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index c1f46aba0b2..9cd7ac38358 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -875,6 +875,8 @@ def test_shutdown_wait_last_export(self): export_thread.join() def test_export_over_closed_grpc_channel(self): + # pylint: disable=protected-access + add_MetricsServiceServicer_to_server( MetricsServiceServicerSUCCESS(), self.server ) @@ -884,9 +886,9 @@ def test_export_over_closed_grpc_channel(self): with self.assertRaises(ValueError) as err: self.exporter._client.Export(request=data) self.assertEqual( - str(err.exception), - "Cannot invoke RPC on closed channel!" + str(err.exception), "Cannot invoke RPC on closed channel!" ) + def test_aggregation_temporality(self): # pylint: disable=protected-access diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index 3da720a4e2f..f29b7fc611c 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -1018,6 +1018,8 @@ def test_shutdown_wait_last_export(self): export_thread.join() def test_export_over_closed_grpc_channel(self): + # pylint: disable=protected-access + add_TraceServiceServicer_to_server( TraceServiceServicerSUCCESS(), self.server ) @@ -1027,10 +1029,10 @@ def test_export_over_closed_grpc_channel(self): with self.assertRaises(ValueError) as err: self.exporter._client.Export(request=data) self.assertEqual( - str(err.exception), - "Cannot invoke RPC on closed channel!" + str(err.exception), "Cannot invoke RPC on closed channel!" ) + def _create_span_with_status(status: SDKStatus): span = _Span( "a",