@@ -82,8 +82,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
82
82
/// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
83
83
/// already set). This method is called synchronously within the `Span::end`
84
84
/// API, therefore it should not block or throw an exception.
85
- /// TODO - This method should take reference to `SpanData`
86
- fn on_end ( & self , span : SpanData ) ;
85
+ fn on_end ( & self , span : & mut SpanData ) ;
87
86
/// Force the spans lying in the cache to be exported.
88
87
fn force_flush ( & self ) -> OTelSdkResult ;
89
88
/// Shuts down the processor. Called when SDK is shut down. This is an
@@ -129,7 +128,7 @@ impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
129
128
// Ignored
130
129
}
131
130
132
- fn on_end ( & self , span : SpanData ) {
131
+ fn on_end ( & self , span : & mut SpanData ) {
133
132
if !span. span_context . is_sampled ( ) {
134
133
return ;
135
134
}
@@ -138,7 +137,7 @@ impl<T: SpanExporter> SpanProcessor for SimpleSpanProcessor<T> {
138
137
. exporter
139
138
. lock ( )
140
139
. map_err ( |_| OTelSdkError :: InternalFailure ( "SimpleSpanProcessor mutex poison" . into ( ) ) )
141
- . and_then ( |exporter| futures_executor:: block_on ( exporter. export ( vec ! [ span] ) ) ) ;
140
+ . and_then ( |exporter| futures_executor:: block_on ( exporter. export ( vec ! [ span. clone ( ) ] ) ) ) ;
142
141
143
142
if let Err ( err) = result {
144
143
// TODO: check error type, and log `error` only if the error is user-actionable, else log `debug`
@@ -460,7 +459,7 @@ impl BatchSpanProcessor {
460
459
E : SpanExporter + Send + Sync + ' static ,
461
460
{
462
461
// Get upto `max_export_batch_size` amount of spans from the channel and push them to the span vec
463
- while let Ok ( span) = spans_receiver. try_recv ( ) {
462
+ while let Ok ( span) = spans_receiver. try_recv ( ) {
464
463
spans. push ( span) ;
465
464
if spans. len ( ) == config. max_export_batch_size {
466
465
break ;
@@ -512,7 +511,7 @@ impl SpanProcessor for BatchSpanProcessor {
512
511
}
513
512
514
513
/// Handles span end.
515
- fn on_end ( & self , span : SpanData ) {
514
+ fn on_end ( & self , span : & mut SpanData ) {
516
515
if self . is_shutdown . load ( Ordering :: Relaxed ) {
517
516
// this is a warning, as the user is trying to emit after the processor has been shutdown
518
517
otel_warn ! (
@@ -521,7 +520,7 @@ impl SpanProcessor for BatchSpanProcessor {
521
520
) ;
522
521
return ;
523
522
}
524
- let result = self . span_sender . try_send ( span) ;
523
+ let result = self . span_sender . try_send ( span. clone ( ) ) ;
525
524
526
525
if result. is_err ( ) {
527
526
// Increment dropped span count. The first time we have to drop a span,
@@ -875,8 +874,8 @@ mod tests {
875
874
fn simple_span_processor_on_end_calls_export ( ) {
876
875
let exporter = InMemorySpanExporterBuilder :: new ( ) . build ( ) ;
877
876
let processor = SimpleSpanProcessor :: new ( exporter. clone ( ) ) ;
878
- let span_data = new_test_export_span_data ( ) ;
879
- processor. on_end ( span_data. clone ( ) ) ;
877
+ let mut span_data = new_test_export_span_data ( ) ;
878
+ processor. on_end ( & mut span_data) ;
880
879
assert_eq ! ( exporter. get_finished_spans( ) . unwrap( ) [ 0 ] , span_data) ;
881
880
let _result = processor. shutdown ( ) ;
882
881
}
@@ -885,7 +884,7 @@ mod tests {
885
884
fn simple_span_processor_on_end_skips_export_if_not_sampled ( ) {
886
885
let exporter = InMemorySpanExporterBuilder :: new ( ) . build ( ) ;
887
886
let processor = SimpleSpanProcessor :: new ( exporter. clone ( ) ) ;
888
- let unsampled = SpanData {
887
+ let mut unsampled = SpanData {
889
888
span_context : SpanContext :: empty_context ( ) ,
890
889
parent_span_id : SpanId :: INVALID ,
891
890
span_kind : SpanKind :: Internal ,
@@ -899,16 +898,16 @@ mod tests {
899
898
status : Status :: Unset ,
900
899
instrumentation_scope : Default :: default ( ) ,
901
900
} ;
902
- processor. on_end ( unsampled) ;
901
+ processor. on_end ( & mut unsampled) ;
903
902
assert ! ( exporter. get_finished_spans( ) . unwrap( ) . is_empty( ) ) ;
904
903
}
905
904
906
905
#[ test]
907
906
fn simple_span_processor_shutdown_calls_shutdown ( ) {
908
907
let exporter = InMemorySpanExporterBuilder :: new ( ) . build ( ) ;
909
908
let processor = SimpleSpanProcessor :: new ( exporter. clone ( ) ) ;
910
- let span_data = new_test_export_span_data ( ) ;
911
- processor. on_end ( span_data. clone ( ) ) ;
909
+ let mut span_data = new_test_export_span_data ( ) ;
910
+ processor. on_end ( & mut span_data) ;
912
911
assert ! ( !exporter. get_finished_spans( ) . unwrap( ) . is_empty( ) ) ;
913
912
let _result = processor. shutdown ( ) ;
914
913
// Assume shutdown is called by ensuring spans are empty in the exporter
@@ -1109,8 +1108,8 @@ mod tests {
1109
1108
. build ( ) ;
1110
1109
let processor = BatchSpanProcessor :: new ( exporter, config) ;
1111
1110
1112
- let test_span = create_test_span ( "test_span" ) ;
1113
- processor. on_end ( test_span. clone ( ) ) ;
1111
+ let mut test_span = create_test_span ( "test_span" ) ;
1112
+ processor. on_end ( & mut test_span) ;
1114
1113
1115
1114
// Wait for flush interval to ensure the span is processed
1116
1115
std:: thread:: sleep ( Duration :: from_secs ( 6 ) ) ;
@@ -1132,8 +1131,8 @@ mod tests {
1132
1131
let processor = BatchSpanProcessor :: new ( exporter, config) ;
1133
1132
1134
1133
// Create a test span and send it to the processor
1135
- let test_span = create_test_span ( "force_flush_span" ) ;
1136
- processor. on_end ( test_span. clone ( ) ) ;
1134
+ let mut test_span = create_test_span ( "force_flush_span" ) ;
1135
+ processor. on_end ( & mut test_span) ;
1137
1136
1138
1137
// Call force_flush to immediately export the spans
1139
1138
let flush_result = processor. force_flush ( ) ;
@@ -1161,8 +1160,8 @@ mod tests {
1161
1160
let processor = BatchSpanProcessor :: new ( exporter, config) ;
1162
1161
1163
1162
// Create a test span and send it to the processor
1164
- let test_span = create_test_span ( "shutdown_span" ) ;
1165
- processor. on_end ( test_span. clone ( ) ) ;
1163
+ let mut test_span = create_test_span ( "shutdown_span" ) ;
1164
+ processor. on_end ( & mut test_span) ;
1166
1165
1167
1166
// Call shutdown to flush and export all pending spans
1168
1167
let shutdown_result = processor. shutdown ( ) ;
@@ -1196,13 +1195,13 @@ mod tests {
1196
1195
let processor = BatchSpanProcessor :: new ( exporter, config) ;
1197
1196
1198
1197
// Create test spans and send them to the processor
1199
- let span1 = create_test_span ( "span1" ) ;
1200
- let span2 = create_test_span ( "span2" ) ;
1201
- let span3 = create_test_span ( "span3" ) ; // This span should be dropped
1198
+ let mut span1 = create_test_span ( "span1" ) ;
1199
+ let mut span2 = create_test_span ( "span2" ) ;
1200
+ let mut span3 = create_test_span ( "span3" ) ; // This span should be dropped
1202
1201
1203
- processor. on_end ( span1. clone ( ) ) ;
1204
- processor. on_end ( span2. clone ( ) ) ;
1205
- processor. on_end ( span3. clone ( ) ) ; // This span exceeds the queue size
1202
+ processor. on_end ( & mut span1) ;
1203
+ processor. on_end ( & mut span2) ;
1204
+ processor. on_end ( & mut span3) ; // This span exceeds the queue size
1206
1205
1207
1206
// Wait for the scheduled delay to expire
1208
1207
std:: thread:: sleep ( Duration :: from_secs ( 3 ) ) ;
@@ -1242,7 +1241,7 @@ mod tests {
1242
1241
KeyValue :: new( "key1" , "value1" ) ,
1243
1242
KeyValue :: new( "key2" , "value2" ) ,
1244
1243
] ;
1245
- processor. on_end ( span_data. clone ( ) ) ;
1244
+ processor. on_end ( & mut span_data) ;
1246
1245
1247
1246
// Force flush to export the span
1248
1247
let _ = processor. force_flush ( ) ;
@@ -1272,8 +1271,8 @@ mod tests {
1272
1271
processor. set_resource ( & resource) ;
1273
1272
1274
1273
// Create a span and send it to the processor
1275
- let test_span = create_test_span ( "resource_test" ) ;
1276
- processor. on_end ( test_span. clone ( ) ) ;
1274
+ let mut test_span = create_test_span ( "resource_test" ) ;
1275
+ processor. on_end ( & mut test_span) ;
1277
1276
1278
1277
// Force flush to ensure the span is exported
1279
1278
let _ = processor. force_flush ( ) ;
@@ -1307,8 +1306,8 @@ mod tests {
1307
1306
let processor = BatchSpanProcessor :: new ( exporter, config) ;
1308
1307
1309
1308
for _ in 0 ..4 {
1310
- let span = new_test_export_span_data ( ) ;
1311
- processor. on_end ( span) ;
1309
+ let mut span = new_test_export_span_data ( ) ;
1310
+ processor. on_end ( & mut span) ;
1312
1311
}
1313
1312
1314
1313
processor. force_flush ( ) . unwrap ( ) ;
@@ -1330,8 +1329,8 @@ mod tests {
1330
1329
let processor = BatchSpanProcessor :: new ( exporter, config) ;
1331
1330
1332
1331
for _ in 0 ..4 {
1333
- let span = new_test_export_span_data ( ) ;
1334
- processor. on_end ( span) ;
1332
+ let mut span = new_test_export_span_data ( ) ;
1333
+ processor. on_end ( & mut span) ;
1335
1334
}
1336
1335
1337
1336
processor. force_flush ( ) . unwrap ( ) ;
@@ -1357,8 +1356,8 @@ mod tests {
1357
1356
for _ in 0 ..10 {
1358
1357
let processor_clone = Arc :: clone ( & processor) ;
1359
1358
let handle = tokio:: spawn ( async move {
1360
- let span = new_test_export_span_data ( ) ;
1361
- processor_clone. on_end ( span) ;
1359
+ let mut span = new_test_export_span_data ( ) ;
1360
+ processor_clone. on_end ( & mut span) ;
1362
1361
} ) ;
1363
1362
handles. push ( handle) ;
1364
1363
}
0 commit comments