17
17
package org .springframework .web .server .adapter ;
18
18
19
19
import java .util .Set ;
20
+ import java .util .concurrent .atomic .AtomicBoolean ;
20
21
21
22
import io .micrometer .observation .Observation ;
22
23
import io .micrometer .observation .ObservationRegistry ;
23
24
import io .micrometer .observation .contextpropagation .ObservationThreadLocalAccessor ;
24
25
import org .apache .commons .logging .Log ;
25
26
import org .apache .commons .logging .LogFactory ;
26
- import org . reactivestreams . Publisher ;
27
+ import reactor . core . observability . DefaultSignalListener ;
27
28
import reactor .core .publisher .Mono ;
29
+ import reactor .util .context .Context ;
28
30
29
31
import org .springframework .context .ApplicationContext ;
30
32
import org .springframework .core .NestedExceptionUtils ;
@@ -302,7 +304,9 @@ public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response)
302
304
ServerRequestObservationContext .CURRENT_OBSERVATION_CONTEXT_ATTRIBUTE , observationContext );
303
305
304
306
return getDelegate ().handle (exchange )
305
- .transformDeferred (call -> transform (exchange , observationContext , call ))
307
+ .doOnSuccess (aVoid -> logResponse (exchange ))
308
+ .onErrorResume (ex -> handleUnresolvedError (exchange , observationContext , ex ))
309
+ .tap (() -> new ObservationSignalListener (observationContext ))
306
310
.then (exchange .cleanupMultipart ())
307
311
.then (Mono .defer (response ::setComplete ));
308
312
}
@@ -324,42 +328,6 @@ protected String formatRequest(ServerHttpRequest request) {
324
328
return "HTTP " + request .getMethod () + " \" " + request .getPath () + query + "\" " ;
325
329
}
326
330
327
- private Publisher <Void > transform (ServerWebExchange exchange , ServerRequestObservationContext observationContext , Mono <Void > call ) {
328
- Observation observation = ServerHttpObservationDocumentation .HTTP_REACTIVE_SERVER_REQUESTS .observation (
329
- this .observationConvention , DEFAULT_OBSERVATION_CONVENTION , () -> observationContext , this .observationRegistry );
330
- observation .start ();
331
- return call
332
- .doOnSuccess (aVoid -> {
333
- logResponse (exchange );
334
- stopObservation (observation , exchange );
335
- })
336
- .onErrorResume (ex -> handleUnresolvedError (exchange , observationContext , ex ))
337
- .doOnCancel (() -> cancelObservation (observationContext , observation ))
338
- .contextWrite (context -> context .put (ObservationThreadLocalAccessor .KEY , observation ));
339
- }
340
-
341
- private void stopObservation (Observation observation , ServerWebExchange exchange ) {
342
- Throwable throwable = exchange .getAttribute (ExceptionHandlingWebHandler .HANDLED_WEB_EXCEPTION );
343
- if (throwable != null ) {
344
- observation .error (throwable );
345
- }
346
- ServerHttpResponse response = exchange .getResponse ();
347
- if (response .isCommitted ()) {
348
- observation .stop ();
349
- }
350
- else {
351
- response .beforeCommit (() -> {
352
- observation .stop ();
353
- return Mono .empty ();
354
- });
355
- }
356
- }
357
-
358
- private void cancelObservation (ServerRequestObservationContext observationContext , Observation observation ) {
359
- observationContext .setConnectionAborted (true );
360
- observation .stop ();
361
- }
362
-
363
331
private void logResponse (ServerWebExchange exchange ) {
364
332
LogFormatUtils .traceDebug (logger , traceOn -> {
365
333
HttpStatusCode status = exchange .getResponse ().getStatusCode ();
@@ -415,4 +383,66 @@ private boolean isDisconnectedClientError(Throwable ex) {
415
383
return DISCONNECTED_CLIENT_EXCEPTIONS .contains (ex .getClass ().getSimpleName ());
416
384
}
417
385
386
+ private final class ObservationSignalListener extends DefaultSignalListener <Void > {
387
+
388
+ private final ServerRequestObservationContext observationContext ;
389
+
390
+ private final Observation observation ;
391
+
392
+ private AtomicBoolean observationRecorded = new AtomicBoolean ();
393
+
394
+ public ObservationSignalListener (ServerRequestObservationContext observationContext ) {
395
+ this .observationContext = observationContext ;
396
+ this .observation = ServerHttpObservationDocumentation .HTTP_REACTIVE_SERVER_REQUESTS .observation (observationConvention ,
397
+ DEFAULT_OBSERVATION_CONVENTION , () -> observationContext , observationRegistry );
398
+ }
399
+
400
+ @ Override
401
+ public void doOnSubscription () throws Throwable {
402
+ this .observation .start ();
403
+ }
404
+
405
+ @ Override
406
+ public Context addToContext (Context originalContext ) {
407
+ return originalContext .put (ObservationThreadLocalAccessor .KEY , this .observation );
408
+ }
409
+
410
+ @ Override
411
+ public void doOnCancel () throws Throwable {
412
+ if (this .observationRecorded .compareAndSet (false , true )) {
413
+ this .observationContext .setConnectionAborted (true );
414
+ this .observation .stop ();
415
+ }
416
+ }
417
+
418
+ @ Override
419
+ public void doOnComplete () throws Throwable {
420
+ if (this .observationRecorded .compareAndSet (false , true )) {
421
+ ServerHttpResponse response = this .observationContext .getResponse ();
422
+ Throwable throwable = (Throwable ) this .observationContext .getAttributes ()
423
+ .get (ExceptionHandlingWebHandler .HANDLED_WEB_EXCEPTION );
424
+ if (throwable != null ) {
425
+ this .observation .error (throwable );
426
+ }
427
+ if (response .isCommitted ()) {
428
+ this .observation .stop ();
429
+ }
430
+ else {
431
+ response .beforeCommit (() -> {
432
+ this .observation .stop ();
433
+ return Mono .empty ();
434
+ });
435
+ }
436
+ }
437
+ }
438
+
439
+ @ Override
440
+ public void doOnError (Throwable error ) throws Throwable {
441
+ if (this .observationRecorded .compareAndSet (false , true )) {
442
+ this .observationContext .setError (error );
443
+ this .observation .stop ();
444
+ }
445
+ }
446
+ }
447
+
418
448
}
0 commit comments