Skip to content

Commit c4de9a0

Browse files
committed
HTTP response schema collection and data classification
1 parent 38ccc57 commit c4de9a0

File tree

23 files changed

+272
-39
lines changed

23 files changed

+272
-39
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import datadog.trace.bootstrap.instrumentation.api.URIUtils;
3737
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
3838
import datadog.trace.bootstrap.instrumentation.decorator.http.ClientIpAddressResolver;
39+
import java.io.OutputStream;
3940
import java.net.InetAddress;
4041
import java.util.BitSet;
4142
import java.util.LinkedHashMap;
@@ -386,6 +387,12 @@ public AgentSpan onResponse(final AgentSpan span, final RESPONSE response) {
386387
final int status = status(response);
387388
onResponseStatus(span, status);
388389

390+
BiFunction<RequestContext, OutputStream, Void> start =
391+
tracer().getUniversalCallbackProvider().getCallback(EVENTS.responseBodyStart());
392+
393+
final OutputStream responseBody = responseBody(response);
394+
start.apply(span.getRequestContext(), responseBody);
395+
389396
AgentPropagation.ContextVisitor<RESPONSE> getter = responseGetter();
390397
if (getter != null) {
391398
ResponseHeaderTagClassifier tagger =
@@ -396,23 +403,16 @@ public AgentSpan onResponse(final AgentSpan span, final RESPONSE response) {
396403
}
397404

398405
if (!isAppSecOnResponseSeparate()) {
399-
callIGCallbackResponseAndHeaders(span, response, status);
406+
callIGCallbackResponseAndHeaders(span, response, status, responseBody);
400407
}
401408
}
402409
return span;
403410
}
404411

405-
// @Override
406-
// public Span onError(final Span span, final Throwable throwable) {
407-
// assert span != null;
408-
// // FIXME
409-
// final Object status = span.getTag("http.status");
410-
// if (status == null || status.equals(200)) {
411-
// // Ensure status set correctly
412-
// span.setTag("http.status", 500);
413-
// }
414-
// return super.onError(span, throwable);
415-
// }
412+
// TODO this should be abstract by the end of developments
413+
protected OutputStream responseBody(RESPONSE response) {
414+
return null;
415+
}
416416

417417
private AgentSpanContext.Extracted callIGCallbackStart(AgentSpanContext.Extracted context) {
418418
AgentTracer.TracerAPI tracer = tracer();
@@ -467,7 +467,8 @@ private Flow<Void> callIGCallbackRequestHeaders(AgentSpan span, REQUEST_CARRIER
467467
IGKeyClassifier.create(
468468
requestContext,
469469
cbp.getCallback(EVENTS.requestHeader()),
470-
cbp.getCallback(EVENTS.requestHeaderDone()));
470+
cbp.getCallback(EVENTS.requestHeaderDone()),
471+
null);
471472
if (null != igKeyClassifier) {
472473
getter.forEachKey(carrier, igKeyClassifier);
473474
return igKeyClassifier.done();
@@ -496,21 +497,27 @@ private Flow<Void> callIGCallbackRequestSessionId(final AgentSpan span, final RE
496497
}
497498

498499
private Flow<Void> callIGCallbackResponseAndHeaders(
499-
AgentSpan span, RESPONSE carrier, int status) {
500-
return callIGCallbackResponseAndHeaders(span, carrier, status, responseGetter());
500+
AgentSpan span, RESPONSE carrier, int status, OutputStream responseBody) {
501+
return callIGCallbackResponseAndHeaders(span, carrier, status, responseGetter(), responseBody);
501502
}
502503

503504
public <RESP> Flow<Void> callIGCallbackResponseAndHeaders(
504505
AgentSpan span,
505506
RESP carrier,
506507
int status,
507-
AgentPropagation.ContextVisitor<RESP> contextVisitor) {
508+
AgentPropagation.ContextVisitor<RESP> contextVisitor,
509+
OutputStream responseBody) {
508510
CallbackProvider cbp = tracer().getCallbackProvider(RequestContextSlot.APPSEC);
509511
RequestContext requestContext = span.getRequestContext();
510512
if (cbp == null || requestContext == null) {
511513
return Flow.ResultFlow.empty();
512514
}
513515

516+
BiFunction<RequestContext, OutputStream, Flow<Void>> responseBodyDone =
517+
cbp.getCallback(EVENTS.responseBodyDone());
518+
if (null != responseBodyDone) {
519+
responseBodyDone.apply(requestContext, responseBody);
520+
}
514521
BiFunction<RequestContext, Integer, Flow<Void>> addrCallback =
515522
cbp.getCallback(EVENTS.responseStarted());
516523
if (null != addrCallback) {
@@ -523,7 +530,8 @@ public <RESP> Flow<Void> callIGCallbackResponseAndHeaders(
523530
IGKeyClassifier.create(
524531
requestContext,
525532
cbp.getCallback(EVENTS.responseHeader()),
526-
cbp.getCallback(EVENTS.responseHeaderDone()));
533+
cbp.getCallback(EVENTS.responseHeaderDone()),
534+
cbp.getCallback(EVENTS.responseBodyDone()));
527535
if (null != igKeyClassifier) {
528536
contextVisitor.forEachKey(carrier, igKeyClassifier);
529537
return igKeyClassifier.done();
@@ -607,7 +615,8 @@ protected static final class IGKeyClassifier implements AgentPropagation.KeyClas
607615
public static IGKeyClassifier create(
608616
RequestContext requestContext,
609617
TriConsumer<RequestContext, String, String> headerCallback,
610-
Function<RequestContext, Flow<Void>> doneCallback) {
618+
Function<RequestContext, Flow<Void>> doneCallback,
619+
BiFunction<RequestContext, OutputStream, Flow<Void>> bodyDoneCallback) {
611620
if (null == requestContext || null == headerCallback) {
612621
return null;
613622
}

dd-java-agent/appsec/src/main/java/com/datadog/appsec/event/data/KnownAddresses.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public interface KnownAddresses {
4848
Address<Object> RESPONSE_BODY_OBJECT = new Address<>("server.response.body");
4949

5050
/** First chars of HTTP response body */
51-
Address<String> RESPONSE_BODY_RAW = new Address<>("server.response.body.raw");
51+
Address<CharSequence> RESPONSE_BODY_RAW = new Address<>("server.response.body.raw");
5252

5353
/** Reponse headers excluding cookies */
5454
Address<Map<String, List<String>>> RESPONSE_HEADERS_NO_COOKIES =

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import datadog.trace.api.internal.TraceSegment;
1616
import datadog.trace.util.stacktrace.StackTraceEvent;
1717
import java.io.Closeable;
18+
import java.io.OutputStream;
1819
import java.util.*;
1920
import java.util.concurrent.ConcurrentHashMap;
2021
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -98,14 +99,17 @@ public class AppSecRequestContext implements DataBundle, Closeable {
9899
private String inferredClientIp;
99100

100101
private volatile StoredBodySupplier storedRequestBodySupplier;
102+
private volatile OutputStream storedResponseBodySupplier;
101103
private String dbType;
102104

103105
private int responseStatus;
104106

105107
private boolean reqDataPublished;
106108
private boolean rawReqBodyPublished;
107109
private boolean convertedReqBodyPublished;
110+
private boolean convertedResBodyPublished;
108111
private boolean respDataPublished;
112+
private boolean rawResBodyPublished;
109113
private boolean pathParamsPublished;
110114
private volatile Map<String, String> derivatives;
111115

@@ -451,6 +455,10 @@ void setStoredRequestBodySupplier(StoredBodySupplier storedRequestBodySupplier)
451455
this.storedRequestBodySupplier = storedRequestBodySupplier;
452456
}
453457

458+
void setStoredResponseBodySupplier(OutputStream storedResponseBodySupplier) {
459+
this.storedResponseBodySupplier = storedResponseBodySupplier;
460+
}
461+
454462
public String getDbType() {
455463
return dbType;
456464
}
@@ -495,18 +503,34 @@ public boolean isConvertedReqBodyPublished() {
495503
return convertedReqBodyPublished;
496504
}
497505

506+
public boolean isConvertedResBodyPublished() {
507+
return convertedResBodyPublished;
508+
}
509+
498510
public void setConvertedReqBodyPublished(boolean convertedReqBodyPublished) {
499511
this.convertedReqBodyPublished = convertedReqBodyPublished;
500512
}
501513

514+
public void setConvertedResBodyPublished(boolean convertedResBodyPublished) {
515+
this.convertedResBodyPublished = convertedResBodyPublished;
516+
}
517+
502518
public boolean isRespDataPublished() {
503519
return respDataPublished;
504520
}
505521

522+
public boolean isRawResBodyPublished() {
523+
return rawResBodyPublished;
524+
}
525+
506526
public void setRespDataPublished(boolean respDataPublished) {
507527
this.respDataPublished = respDataPublished;
508528
}
509529

530+
public void setRawResBodyPublished(boolean rawResBodyPublished) {
531+
this.rawResBodyPublished = rawResBodyPublished;
532+
}
533+
510534
/**
511535
* Updates the current used usr.id
512536
*
@@ -580,6 +604,14 @@ public CharSequence getStoredRequestBody() {
580604
return storedRequestBodySupplier.get();
581605
}
582606

607+
/** @return the contents of stream */
608+
public CharSequence getStoredResponseBody() {
609+
CharSequence storedResponseBody = null;
610+
storedResponseBody =
611+
this.storedResponseBodySupplier != null ? this.storedResponseBodySupplier.toString() : null;
612+
return storedResponseBody;
613+
}
614+
583615
public void reportEvents(Collection<AppSecEvent> appSecEvents) {
584616
for (AppSecEvent event : appSecEvents) {
585617
StandardizedLogging.attackDetected(log, event);

dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import datadog.trace.bootstrap.instrumentation.api.URIDataAdapter;
3838
import datadog.trace.util.stacktrace.StackTraceEvent;
3939
import datadog.trace.util.stacktrace.StackUtils;
40+
import java.io.OutputStream;
4041
import java.net.URI;
4142
import java.net.URISyntaxException;
4243
import java.nio.charset.Charset;
@@ -94,7 +95,9 @@ public class GatewayBridge {
9495
// subscriber cache
9596
private volatile DataSubscriberInfo initialReqDataSubInfo;
9697
private volatile DataSubscriberInfo rawRequestBodySubInfo;
98+
private volatile DataSubscriberInfo rawResponseBodySubInfo;
9799
private volatile DataSubscriberInfo requestBodySubInfo;
100+
private volatile DataSubscriberInfo responseBodySubInfo;
98101
private volatile DataSubscriberInfo pathParamsSubInfo;
99102
private volatile DataSubscriberInfo respDataSubInfo;
100103
private volatile DataSubscriberInfo grpcServerMethodSubInfo;
@@ -141,6 +144,8 @@ public void init() {
141144
subscriptionService.registerCallback(EVENTS.responseStarted(), this::onResponseStarted);
142145
subscriptionService.registerCallback(EVENTS.responseHeader(), this::onResponseHeader);
143146
subscriptionService.registerCallback(EVENTS.responseHeaderDone(), this::onResponseHeaderDone);
147+
subscriptionService.registerCallback(EVENTS.responseBodyStart(), this::onResponseBodyStart);
148+
subscriptionService.registerCallback(EVENTS.responseBodyDone(), this::onResponseBodyDone);
144149
subscriptionService.registerCallback(EVENTS.grpcServerMethod(), this::onGrpcServerMethod);
145150
subscriptionService.registerCallback(
146151
EVENTS.grpcServerRequestMessage(), this::onGrpcServerRequestMessage);
@@ -163,6 +168,10 @@ public void init() {
163168
subscriptionService.registerCallback(
164169
EVENTS.requestBodyProcessed(), this::onRequestBodyProcessed);
165170
}
171+
if (additionalIGEvents.contains(EVENTS.responseBodyProcessed())) {
172+
subscriptionService.registerCallback(
173+
EVENTS.responseBodyProcessed(), this::onResponseBodyProcessed);
174+
}
166175
}
167176

168177
/**
@@ -172,7 +181,9 @@ public void init() {
172181
public void reset() {
173182
initialReqDataSubInfo = null;
174183
rawRequestBodySubInfo = null;
184+
rawResponseBodySubInfo = null;
175185
requestBodySubInfo = null;
186+
responseBodySubInfo = null;
176187
pathParamsSubInfo = null;
177188
respDataSubInfo = null;
178189
grpcServerMethodSubInfo = null;
@@ -584,6 +595,40 @@ private Flow<Void> onRequestBodyProcessed(RequestContext ctx_, Object obj) {
584595
}
585596
}
586597

598+
private Flow<Void> onResponseBodyProcessed(RequestContext ctx_, Object obj) {
599+
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
600+
if (ctx == null) {
601+
return NoopFlow.INSTANCE;
602+
}
603+
604+
if (ctx.isConvertedResBodyPublished()) {
605+
log.debug(
606+
"Response body already published; will ignore new value of type {}", obj.getClass());
607+
return NoopFlow.INSTANCE;
608+
}
609+
ctx.setConvertedResBodyPublished(true);
610+
611+
while (true) {
612+
DataSubscriberInfo subInfo = responseBodySubInfo;
613+
if (subInfo == null) {
614+
subInfo = producerService.getDataSubscribers(KnownAddresses.RESPONSE_BODY_OBJECT);
615+
responseBodySubInfo = subInfo;
616+
}
617+
if (subInfo == null || subInfo.isEmpty()) {
618+
return NoopFlow.INSTANCE;
619+
}
620+
DataBundle bundle =
621+
new SingletonDataBundle<>(
622+
KnownAddresses.RESPONSE_BODY_OBJECT, ObjectIntrospection.convert(obj, ctx));
623+
try {
624+
GatewayContext gwCtx = new GatewayContext(false);
625+
return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
626+
} catch (ExpiredSubscriberInfoException e) {
627+
responseBodySubInfo = null;
628+
}
629+
}
630+
}
631+
587632
private Flow<Void> onRequestBodyDone(RequestContext ctx_, StoredBodySupplier supplier) {
588633
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
589634
if (ctx == null || ctx.isRawReqBodyPublished()) {
@@ -602,7 +647,7 @@ private Flow<Void> onRequestBodyDone(RequestContext ctx_, StoredBodySupplier sup
602647
}
603648

604649
CharSequence bodyContent = supplier.get();
605-
if (bodyContent == null || bodyContent.length() == 0) {
650+
if (bodyContent.length() == 0) {
606651
return NoopFlow.INSTANCE;
607652
}
608653
DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.REQUEST_BODY_RAW, bodyContent);
@@ -615,6 +660,38 @@ private Flow<Void> onRequestBodyDone(RequestContext ctx_, StoredBodySupplier sup
615660
}
616661
}
617662

663+
private Flow<Void> onResponseBodyDone(RequestContext ctx_, OutputStream supplier) {
664+
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
665+
if (ctx == null || ctx.isRawResBodyPublished()) {
666+
return NoopFlow.INSTANCE;
667+
}
668+
ctx.setRawResBodyPublished(true);
669+
670+
while (true) {
671+
DataSubscriberInfo subInfo = responseBodySubInfo;
672+
if (subInfo == null) {
673+
subInfo = producerService.getDataSubscribers(KnownAddresses.RESPONSE_BODY_OBJECT);
674+
responseBodySubInfo = subInfo;
675+
}
676+
if (subInfo == null || subInfo.isEmpty()) {
677+
return NoopFlow.INSTANCE;
678+
}
679+
680+
CharSequence bodyContent = supplier.toString();
681+
if (bodyContent.length() == 0) {
682+
return NoopFlow.INSTANCE;
683+
}
684+
DataBundle bundle =
685+
new SingletonDataBundle<>(KnownAddresses.RESPONSE_BODY_OBJECT, bodyContent);
686+
try {
687+
GatewayContext gwCtx = new GatewayContext(false);
688+
return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
689+
} catch (ExpiredSubscriberInfoException e) {
690+
responseBodySubInfo = null;
691+
}
692+
}
693+
}
694+
618695
private Flow<Void> onRequestPathParams(RequestContext ctx_, Map<String, ?> data) {
619696
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
620697
if (ctx == null || ctx.isPathParamsPublished()) {
@@ -651,6 +728,16 @@ private Void onRequestBodyStart(RequestContext ctx_, StoredBodySupplier supplier
651728
return null;
652729
}
653730

731+
private Void onResponseBodyStart(RequestContext ctx_, OutputStream supplier) {
732+
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
733+
if (ctx == null) {
734+
return null;
735+
}
736+
737+
ctx.setStoredResponseBodySupplier(supplier);
738+
return null;
739+
}
740+
654741
private Flow<AppSecRequestContext> onRequestStarted() {
655742
if (!AppSecSystem.isActive()) {
656743
return RequestContextSupplier.EMPTY;
@@ -1014,8 +1101,10 @@ private Flow<Void> maybePublishResponseData(AppSecRequestContext ctx) {
10141101

10151102
MapDataBundle bundle =
10161103
MapDataBundle.of(
1017-
KnownAddresses.RESPONSE_STATUS, String.valueOf(ctx.getResponseStatus()),
1018-
KnownAddresses.RESPONSE_HEADERS_NO_COOKIES, ctx.getResponseHeaders());
1104+
KnownAddresses.RESPONSE_STATUS,
1105+
String.valueOf(ctx.getResponseStatus()),
1106+
KnownAddresses.RESPONSE_HEADERS_NO_COOKIES,
1107+
ctx.getResponseHeaders());
10191108

10201109
while (true) {
10211110
DataSubscriberInfo subInfo = respDataSubInfo;
@@ -1110,6 +1199,10 @@ private static class IGAppSecEventDependencies {
11101199
KnownAddresses.REQUEST_BODY_RAW, l(EVENTS.requestBodyStart(), EVENTS.requestBodyDone()));
11111200
DATA_DEPENDENCIES.put(KnownAddresses.REQUEST_PATH_PARAMS, l(EVENTS.requestPathParams()));
11121201
DATA_DEPENDENCIES.put(KnownAddresses.REQUEST_BODY_OBJECT, l(EVENTS.requestBodyProcessed()));
1202+
DATA_DEPENDENCIES.put(
1203+
KnownAddresses.RESPONSE_BODY_RAW,
1204+
l(EVENTS.responseBodyStart(), EVENTS.responseBodyDone()));
1205+
DATA_DEPENDENCIES.put(KnownAddresses.RESPONSE_BODY_OBJECT, l(EVENTS.responseBodyProcessed()));
11131206
}
11141207

11151208
private static Collection<datadog.trace.api.gateway.EventType<?>> l(

0 commit comments

Comments
 (0)