|
13 | 13 | # limitations under the License.
|
14 | 14 |
|
15 | 15 | """
|
16 |
| -The integration with MongoDB supports the `pymongo`_ library and is specified |
17 |
| -to ``trace_integration`` using ``'pymongo'``. |
| 16 | +The integration with MongoDB supports the `pymongo`_ library, it can be |
| 17 | +enabled using the ``PymongoInstrumentor``. |
18 | 18 |
|
19 | 19 | .. _pymongo: https://pypi.org/project/pymongo
|
20 | 20 |
|
|
26 | 26 | from pymongo import MongoClient
|
27 | 27 | from opentelemetry import trace
|
28 | 28 | from opentelemetry.trace import TracerProvider
|
29 |
| - from opentelemetry.trace.ext.pymongo import trace_integration |
| 29 | + from opentelemetry.trace.ext.pymongo import PymongoInstrumentor |
30 | 30 |
|
31 | 31 | trace.set_tracer_provider(TracerProvider())
|
32 | 32 |
|
33 |
| - trace_integration() |
| 33 | + PymongoInstrumentor().instrument() |
34 | 34 | client = MongoClient()
|
35 | 35 | db = client["MongoDB_Database"]
|
36 | 36 | collection = db["MongoDB_Collection"]
|
|
42 | 42 |
|
43 | 43 | from pymongo import monitoring
|
44 | 44 |
|
| 45 | +from opentelemetry import trace |
| 46 | +from opentelemetry.auto_instrumentation.instrumentor import BaseInstrumentor |
45 | 47 | from opentelemetry.ext.pymongo.version import __version__
|
46 | 48 | from opentelemetry.trace import SpanKind, get_tracer
|
47 | 49 | from opentelemetry.trace.status import Status, StatusCanonicalCode
|
|
50 | 52 | COMMAND_ATTRIBUTES = ["filter", "sort", "skip", "limit", "pipeline"]
|
51 | 53 |
|
52 | 54 |
|
53 |
| -def trace_integration(tracer_provider=None): |
54 |
| - """Integrate with pymongo to trace it using event listener. |
55 |
| - https://api.mongodb.com/python/current/api/pymongo/monitoring.html |
56 |
| -
|
57 |
| - Args: |
58 |
| - tracer_provider: The `TracerProvider` to use. If none is passed the |
59 |
| - current configured one is used. |
60 |
| - """ |
61 |
| - |
62 |
| - tracer = get_tracer(__name__, __version__, tracer_provider) |
63 |
| - |
64 |
| - monitoring.register(CommandTracer(tracer)) |
65 |
| - |
66 |
| - |
67 | 55 | class CommandTracer(monitoring.CommandListener):
|
68 | 56 | def __init__(self, tracer):
|
69 | 57 | self._tracer = tracer
|
70 | 58 | self._span_dict = {}
|
| 59 | + self.is_enabled = True |
71 | 60 |
|
72 | 61 | def started(self, event: monitoring.CommandStartedEvent):
|
73 | 62 | """ Method to handle a pymongo CommandStartedEvent """
|
| 63 | + if not self.is_enabled: |
| 64 | + return |
74 | 65 | command = event.command.get(event.command_name, "")
|
75 | 66 | name = DATABASE_TYPE + "." + event.command_name
|
76 | 67 | statement = event.command_name
|
@@ -103,38 +94,70 @@ def started(self, event: monitoring.CommandStartedEvent):
|
103 | 94 | if span is not None:
|
104 | 95 | span.set_status(Status(StatusCanonicalCode.INTERNAL, str(ex)))
|
105 | 96 | span.end()
|
106 |
| - self._remove_span(event) |
| 97 | + self._pop_span(event) |
107 | 98 |
|
108 | 99 | def succeeded(self, event: monitoring.CommandSucceededEvent):
|
109 | 100 | """ Method to handle a pymongo CommandSucceededEvent """
|
110 |
| - span = self._get_span(event) |
111 |
| - if span is not None: |
112 |
| - span.set_attribute( |
113 |
| - "db.mongo.duration_micros", event.duration_micros |
114 |
| - ) |
115 |
| - span.set_status(Status(StatusCanonicalCode.OK, event.reply)) |
116 |
| - span.end() |
117 |
| - self._remove_span(event) |
| 101 | + if not self.is_enabled: |
| 102 | + return |
| 103 | + span = self._pop_span(event) |
| 104 | + if span is None: |
| 105 | + return |
| 106 | + span.set_attribute("db.mongo.duration_micros", event.duration_micros) |
| 107 | + span.set_status(Status(StatusCanonicalCode.OK, event.reply)) |
| 108 | + span.end() |
118 | 109 |
|
119 | 110 | def failed(self, event: monitoring.CommandFailedEvent):
|
120 | 111 | """ Method to handle a pymongo CommandFailedEvent """
|
121 |
| - span = self._get_span(event) |
122 |
| - if span is not None: |
123 |
| - span.set_attribute( |
124 |
| - "db.mongo.duration_micros", event.duration_micros |
125 |
| - ) |
126 |
| - span.set_status(Status(StatusCanonicalCode.UNKNOWN, event.failure)) |
127 |
| - span.end() |
128 |
| - self._remove_span(event) |
129 |
| - |
130 |
| - def _get_span(self, event): |
131 |
| - return self._span_dict.get(_get_span_dict_key(event)) |
| 112 | + if not self.is_enabled: |
| 113 | + return |
| 114 | + span = self._pop_span(event) |
| 115 | + if span is None: |
| 116 | + return |
| 117 | + span.set_attribute("db.mongo.duration_micros", event.duration_micros) |
| 118 | + span.set_status(Status(StatusCanonicalCode.UNKNOWN, event.failure)) |
| 119 | + span.end() |
132 | 120 |
|
133 |
| - def _remove_span(self, event): |
134 |
| - self._span_dict.pop(_get_span_dict_key(event)) |
| 121 | + def _pop_span(self, event): |
| 122 | + return self._span_dict.pop(_get_span_dict_key(event), None) |
135 | 123 |
|
136 | 124 |
|
137 | 125 | def _get_span_dict_key(event):
|
138 | 126 | if event.connection_id is not None:
|
139 | 127 | return (event.request_id, event.connection_id)
|
140 | 128 | return event.request_id
|
| 129 | + |
| 130 | + |
| 131 | +class PymongoInstrumentor(BaseInstrumentor): |
| 132 | + _commandtracer_instance = None # type CommandTracer |
| 133 | + # The instrumentation for PyMongo is based on the event listener interface |
| 134 | + # https://api.mongodb.com/python/current/api/pymongo/monitoring.html. |
| 135 | + # This interface only allows to register listeners and does not provide |
| 136 | + # an unregister API. In order to provide a mechanishm to disable |
| 137 | + # instrumentation an enabled flag is implemented in CommandTracer, |
| 138 | + # it's checked in the different listeners. |
| 139 | + |
| 140 | + def _instrument(self, **kwargs): |
| 141 | + """Integrate with pymongo to trace it using event listener. |
| 142 | + https://api.mongodb.com/python/current/api/pymongo/monitoring.html |
| 143 | +
|
| 144 | + Args: |
| 145 | + tracer_provider: The `TracerProvider` to use. If none is passed the |
| 146 | + current configured one is used. |
| 147 | + """ |
| 148 | + |
| 149 | + tracer_provider = kwargs.get("tracer_provider") |
| 150 | + |
| 151 | + # Create and register a CommandTracer only the first time |
| 152 | + if self._commandtracer_instance is None: |
| 153 | + tracer = get_tracer(__name__, __version__, tracer_provider) |
| 154 | + |
| 155 | + self._commandtracer_instance = CommandTracer(tracer) |
| 156 | + monitoring.register(self._commandtracer_instance) |
| 157 | + |
| 158 | + # If already created, just enable it |
| 159 | + self._commandtracer_instance.is_enabled = True |
| 160 | + |
| 161 | + def _uninstrument(self, **kwargs): |
| 162 | + if self._commandtracer_instance is not None: |
| 163 | + self._commandtracer_instance.is_enabled = False |
0 commit comments