Skip to content

Commit 5dbe15d

Browse files
committed
Join worker on BatchSpanProcessor shutdown
1 parent 33ee164 commit 5dbe15d

File tree

1 file changed

+26
-4
lines changed

1 file changed

+26
-4
lines changed

src/sdk/trace/span_processor.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
//! those contexts.
5757
//!
5858
//! ```
59-
//! use futures::{stream};
59+
//! use futures::{FutureExt as _, stream};
6060
//! use opentelemetry::{api, sdk, global};
6161
//! use std::time::Duration;
6262
//!
@@ -68,7 +68,8 @@
6868
//! // Then build a batch processor. You can use whichever executor you have available, for
6969
//! // example if you are using `async-std` instead of `tokio` you can replace the spawn and
7070
//! // interval functions with `async_std::task::spawn` and `async_std::stream::interval`.
71-
//! let batch = sdk::BatchSpanProcessor::builder(exporter, tokio::spawn, tokio::time::interval)
71+
//! let spawn = |future| tokio::spawn(future).map(|result| result.expect("task failed"));
72+
//! let batch = sdk::BatchSpanProcessor::builder(exporter, spawn, tokio::time::interval)
7273
//! .with_max_queue_size(4096)
7374
//! .build();
7475
//!
@@ -96,6 +97,7 @@ use futures::{
9697
task::{Context, Poll},
9798
Future, Stream, StreamExt,
9899
};
100+
use std::fmt::Debug;
99101
use std::pin::Pin;
100102
use std::sync::{Arc, Mutex};
101103
use std::time;
@@ -134,9 +136,17 @@ impl api::SpanProcessor for SimpleSpanProcessor {
134136
/// them at a preconfigured interval.
135137
///
136138
/// [`SpanProcessor`]: ../../../api/trace/span_processor/trait.SpanProcessor.html
137-
#[derive(Debug)]
138139
pub struct BatchSpanProcessor {
139140
message_sender: Mutex<mpsc::Sender<BatchMessage>>,
141+
worker_handle: Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>>,
142+
}
143+
144+
impl Debug for BatchSpanProcessor {
145+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146+
f.debug_struct("BatchSpanProcessor")
147+
.field("message_sender", &self.message_sender)
148+
.finish()
149+
}
140150
}
141151

142152
impl api::SpanProcessor for BatchSpanProcessor {
@@ -154,6 +164,15 @@ impl api::SpanProcessor for BatchSpanProcessor {
154164
if let Ok(mut sender) = self.message_sender.lock() {
155165
let _ = sender.try_send(BatchMessage::Shutdown);
156166
}
167+
168+
if let Some(worker_handle) = self
169+
.worker_handle
170+
.lock()
171+
.expect("worker_handle Mutex panicked")
172+
.take()
173+
{
174+
futures::executor::block_on(worker_handle);
175+
}
157176
}
158177
}
159178

@@ -233,12 +252,13 @@ impl BatchSpanProcessor {
233252
S: Fn(BatchSpanProcessorWorker) -> SO,
234253
I: Fn(time::Duration) -> IS,
235254
IS: Stream<Item = ISI> + Send + 'static,
255+
SO: Future<Output = ()> + Send + Sync + 'static,
236256
{
237257
let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size);
238258
let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Tick);
239259

240260
// Spawn worker process via user-defined spawn function.
241-
spawn(BatchSpanProcessorWorker {
261+
let worker_handle = spawn(BatchSpanProcessorWorker {
242262
exporter,
243263
messages: Box::pin(futures::stream::select(message_receiver, ticker)),
244264
config,
@@ -248,6 +268,7 @@ impl BatchSpanProcessor {
248268
// Return batch processor with link to worker
249269
BatchSpanProcessor {
250270
message_sender: Mutex::new(message_sender),
271+
worker_handle: Mutex::new(Some(Box::pin(worker_handle))),
251272
}
252273
}
253274

@@ -316,6 +337,7 @@ where
316337
S: Fn(BatchSpanProcessorWorker) -> SO,
317338
I: Fn(time::Duration) -> IS,
318339
IS: Stream<Item = ISI> + Send + 'static,
340+
SO: Future<Output = ()> + Send + Sync + 'static,
319341
{
320342
/// Set max queue size for batches
321343
pub fn with_max_queue_size(self, size: usize) -> Self {

0 commit comments

Comments
 (0)