Skip to content

Extract Vert.x json body response schemas #8954

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class AppSecRequestContext implements DataBundle, Closeable {
private boolean reqDataPublished;
private boolean rawReqBodyPublished;
private boolean convertedReqBodyPublished;
private boolean responseBodyPublished;
private boolean respDataPublished;
private boolean pathParamsPublished;
private volatile Map<String, String> derivatives;
Expand Down Expand Up @@ -502,6 +503,14 @@ public void setConvertedReqBodyPublished(boolean convertedReqBodyPublished) {
this.convertedReqBodyPublished = convertedReqBodyPublished;
}

public boolean isResponseBodyPublished() {
return responseBodyPublished;
}

public void setResponseBodyPublished(final boolean responseBodyPublished) {
this.responseBodyPublished = responseBodyPublished;
}

public boolean isRespDataPublished() {
return respDataPublished;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class GatewayBridge {
private volatile DataSubscriberInfo initialReqDataSubInfo;
private volatile DataSubscriberInfo rawRequestBodySubInfo;
private volatile DataSubscriberInfo requestBodySubInfo;
private volatile DataSubscriberInfo responseBodySubInfo;
private volatile DataSubscriberInfo pathParamsSubInfo;
private volatile DataSubscriberInfo respDataSubInfo;
private volatile DataSubscriberInfo grpcServerMethodSubInfo;
Expand Down Expand Up @@ -135,6 +136,7 @@ public void init() {
subscriptionService.registerCallback(EVENTS.requestMethodUriRaw(), this::onRequestMethodUriRaw);
subscriptionService.registerCallback(EVENTS.requestBodyStart(), this::onRequestBodyStart);
subscriptionService.registerCallback(EVENTS.requestBodyDone(), this::onRequestBodyDone);
subscriptionService.registerCallback(EVENTS.responseBody(), this::onResponseBody);
subscriptionService.registerCallback(
EVENTS.requestClientSocketAddress(), this::onRequestClientSocketAddress);
subscriptionService.registerCallback(
Expand Down Expand Up @@ -175,6 +177,7 @@ public void reset() {
initialReqDataSubInfo = null;
rawRequestBodySubInfo = null;
requestBodySubInfo = null;
responseBodySubInfo = null;
pathParamsSubInfo = null;
respDataSubInfo = null;
grpcServerMethodSubInfo = null;
Expand Down Expand Up @@ -636,6 +639,40 @@ private Flow<Void> onRequestBodyDone(RequestContext ctx_, StoredBodySupplier sup
}
}

private Flow<Void> onResponseBody(RequestContext ctx_, Object obj) {
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (ctx == null) {
return NoopFlow.INSTANCE;
}

if (ctx.isResponseBodyPublished()) {
log.debug(
"Response body already published; will ignore new value of type {}", obj.getClass());
return NoopFlow.INSTANCE;
}
ctx.setResponseBodyPublished(true);

while (true) {
DataSubscriberInfo subInfo = responseBodySubInfo;
if (subInfo == null) {
subInfo = producerService.getDataSubscribers(KnownAddresses.RESPONSE_BODY_OBJECT);
responseBodySubInfo = subInfo;
}
if (subInfo == null || subInfo.isEmpty()) {
return NoopFlow.INSTANCE;
}
// TODO: review schema extraction limits
Object converted = ObjectIntrospection.convert(obj, ctx);
DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.RESPONSE_BODY_OBJECT, converted);
try {
GatewayContext gwCtx = new GatewayContext(false);
return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
} catch (ExpiredSubscriberInfoException e) {
responseBodySubInfo = null;
}
}
}

private Flow<Void> onRequestPathParams(RequestContext ctx_, Map<String, ?> data) {
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (ctx == null || ctx.isPathParamsPublished()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class GatewayBridgeSpecification extends DDSpecification {
BiFunction<RequestContext, StoredBodySupplier, Void> requestBodyStartCB
BiFunction<RequestContext, StoredBodySupplier, Flow<Void>> requestBodyDoneCB
BiFunction<RequestContext, Object, Flow<Void>> requestBodyProcessedCB
BiFunction<RequestContext, Object, Flow<Void>> responseBodyCB
BiFunction<RequestContext, Integer, Flow<Void>> responseStartedCB
TriConsumer<RequestContext, String, String> respHeaderCB
Function<RequestContext, Flow<Void>> respHeadersDoneCB
Expand Down Expand Up @@ -450,6 +451,7 @@ class GatewayBridgeSpecification extends DDSpecification {
1 * ig.registerCallback(EVENTS.requestBodyStart(), _) >> { requestBodyStartCB = it[1]; null }
1 * ig.registerCallback(EVENTS.requestBodyDone(), _) >> { requestBodyDoneCB = it[1]; null }
1 * ig.registerCallback(EVENTS.requestBodyProcessed(), _) >> { requestBodyProcessedCB = it[1]; null }
1 * ig.registerCallback(EVENTS.responseBody(), _) >> { responseBodyCB = it[1]; null }
1 * ig.registerCallback(EVENTS.responseStarted(), _) >> { responseStartedCB = it[1]; null }
1 * ig.registerCallback(EVENTS.responseHeader(), _) >> { respHeaderCB = it[1]; null }
1 * ig.registerCallback(EVENTS.responseHeaderDone(), _) >> { respHeadersDoneCB = it[1]; null }
Expand Down Expand Up @@ -1327,4 +1329,17 @@ class GatewayBridgeSpecification extends DDSpecification {
arCtx.getRoute() == route
}

void 'test on response body callback'() {
when:
responseBodyCB.apply(ctx, [test: 'this is a test'])

then:
1 * eventDispatcher.getDataSubscribers(KnownAddresses.RESPONSE_BODY_OBJECT) >> nonEmptyDsInfo
1 * eventDispatcher.publishDataEvent(_, _, _, _) >> {
final bundle = it[2] as DataBundle
final body = bundle.get(KnownAddresses.RESPONSE_BODY_OBJECT)
assert body['test'] == 'this is a test'
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package datadog.trace.instrumentation.vertx_4_0.server;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.agent.tooling.muzzle.Reference;
import io.vertx.ext.web.impl.RoutingContextImpl;

/**
* @see RoutingContextImpl#getBodyAsJson(int)
* @see RoutingContextImpl#getBodyAsJsonArray(int)
*/
@AutoService(InstrumenterModule.class)
public class RoutingContextInstrumentation extends InstrumenterModule.AppSec
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public RoutingContextInstrumentation() {
super("vertx", "vertx-4.0");
}

@Override
public Reference[] additionalMuzzleReferences() {
return new Reference[] {VertxVersionMatcher.HTTP_1X_SERVER_RESPONSE};
}

@Override
public String instrumentedType() {
return "io.vertx.ext.web.RoutingContext";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
named("json").and(takesArguments(1)).and(takesArgument(0, Object.class)),
packageName + ".RoutingContextJsonResponseAdvice");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package datadog.trace.instrumentation.vertx_4_0.server;

import static datadog.trace.api.gateway.Events.EVENTS;

import datadog.appsec.api.blocking.BlockingException;
import datadog.trace.advice.ActiveRequestContext;
import datadog.trace.advice.RequiresRequestContext;
import datadog.trace.api.gateway.BlockResponseFunction;
import datadog.trace.api.gateway.CallbackProvider;
import datadog.trace.api.gateway.Flow;
import datadog.trace.api.gateway.RequestContext;
import datadog.trace.api.gateway.RequestContextSlot;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import io.vertx.core.json.JsonObject;
import java.util.function.BiFunction;
import net.bytebuddy.asm.Advice;

@RequiresRequestContext(RequestContextSlot.APPSEC)
class RoutingContextJsonResponseAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
static void before(
@Advice.Argument(0) Object source, @ActiveRequestContext RequestContext reqCtx) {

if (source == null) {
return;
}

Object object = source;
if (object instanceof JsonObject) {
object = ((JsonObject) object).getMap();
}

CallbackProvider cbp = AgentTracer.get().getCallbackProvider(RequestContextSlot.APPSEC);
BiFunction<RequestContext, Object, Flow<Void>> callback =
cbp.getCallback(EVENTS.responseBody());
if (callback == null) {
return;
}

Flow<Void> flow = callback.apply(reqCtx, object);
Flow.Action action = flow.getAction();
if (action instanceof Flow.Action.RequestBlockingAction) {
BlockResponseFunction blockResponseFunction = reqCtx.getBlockResponseFunction();
if (blockResponseFunction == null) {
return;
}
Flow.Action.RequestBlockingAction rba = (Flow.Action.RequestBlockingAction) action;
blockResponseFunction.tryCommitBlockingResponse(
reqCtx.getTraceSegment(),
rba.getStatusCode(),
rba.getBlockingContentType(),
rba.getExtraHeaders());

throw new BlockingException("Blocked request (for RoutingContext/json)");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ class VertxHttpServerForkedTest extends HttpServerTest<Vertx> {
true
}

@Override
boolean testResponseBodyJson() {
true
}

@Override
boolean testBlocking() {
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void start(final Promise<Void> startPromise) {
BODY_JSON,
() -> {
JsonObject json = ctx.getBodyAsJson();
ctx.response().setStatusCode(BODY_JSON.getStatus()).end(json.toString());
ctx.response().setStatusCode(BODY_JSON.getStatus());
ctx.json(json);
}));
router
.route(QUERY_ENCODED_BOTH.getRawPath())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ class VertxHttpServerForkedTest extends HttpServerTest<Vertx> {
true
}

@Override
boolean testResponseBodyJson() {
true
}

@Override
boolean testBodyUrlencoded() {
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public void start(final Promise<Void> startPromise) {
BODY_JSON,
() -> {
JsonObject json = ctx.body().asJsonObject();
ctx.response().setStatusCode(BODY_JSON.getStatus()).end(json.toString());
ctx.response().setStatusCode(BODY_JSON.getStatus());
ctx.json(json);
}));
router
.route(QUERY_ENCODED_BOTH.getRawPath())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
ss.registerCallback(events.requestBodyStart(), callbacks.requestBodyStartCb)
ss.registerCallback(events.requestBodyDone(), callbacks.requestBodyEndCb)
ss.registerCallback(events.requestBodyProcessed(), callbacks.requestBodyObjectCb)
ss.registerCallback(events.responseBody(), callbacks.responseBodyObjectCb)
ss.registerCallback(events.responseStarted(), callbacks.responseStartedCb)
ss.registerCallback(events.responseHeader(), callbacks.responseHeaderCb)
ss.registerCallback(events.responseHeaderDone(), callbacks.responseHeaderDoneCb)
Expand Down Expand Up @@ -335,6 +336,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
false
}


boolean isRequestBodyNoStreaming() {
// if true, plain text request body tests expect the requestBodyProcessed
// callback to tbe called, not requestBodyStart/requestBodyDone
Expand All @@ -353,6 +355,10 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
false
}

boolean testResponseBodyJson() {
false
}

boolean testBlocking() {
false
}
Expand Down Expand Up @@ -1581,6 +1587,40 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
true | 'text/html;q=0.8, application/json;q=0.9'
}

void 'test instrumentation gateway json response body'() {
setup:
assumeTrue(testResponseBodyJson())
def request = request(
BODY_JSON, 'POST',
RequestBody.create(MediaType.get('application/json'), '{"a": "x"}'))
.header(IG_RESPONSE_BODY_TAG, 'true')
.build()
def response = client.newCall(request).execute()
if (isDataStreamsEnabled()) {
TEST_DATA_STREAMS_WRITER.waitForGroups(1)
}

expect:
response.body().charStream().text == BODY_JSON.body

when:
TEST_WRITER.waitForTraces(1)

then:
TEST_WRITER.get(0).any {
it.getTag('response.body') == '[a:[x]]'
}

and:
if (isDataStreamsEnabled()) {
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
verifyAll(first) {
edgeTags.containsAll(DSM_EDGE_TAGS)
edgeTags.size() == DSM_EDGE_TAGS.size()
}
}
}

@Flaky(value = "https://github.com/DataDog/dd-trace-java/issues/4681", suites = ["GrizzlyAsyncTest", "GrizzlyTest"])
def 'test blocking of request with json response'() {
setup:
Expand Down Expand Up @@ -2280,6 +2320,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
static final String IG_BODY_END_BLOCK_HEADER = "x-block-body-end"
static final String IG_BODY_CONVERTED_HEADER = "x-block-body-converted"
static final String IG_ASK_FOR_RESPONSE_HEADER_TAGS_HEADER = "x-include-response-headers-in-tags"
static final String IG_RESPONSE_BODY_TAG = "x-include-response-body-in-tags"
static final String IG_PEER_ADDRESS = "ig-peer-address"
static final String IG_PEER_PORT = "ig-peer-port"
static final String IG_RESPONSE_STATUS = "ig-response-status"
Expand All @@ -2303,6 +2344,7 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
boolean bodyEndBlock
boolean bodyConvertedBlock
boolean responseHeadersInTags
boolean responseBodyTag
}

static final String stringOrEmpty(String string) {
Expand Down Expand Up @@ -2356,6 +2398,9 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
if (IG_ASK_FOR_RESPONSE_HEADER_TAGS_HEADER.equalsIgnoreCase(key)) {
context.responseHeadersInTags = true
}
if (IG_RESPONSE_BODY_TAG.equalsIgnoreCase(key)) {
context.responseBodyTag = true
}
} as TriConsumer<RequestContext, String, String>

final Function<RequestContext, Flow<Void>> requestHeaderDoneCb =
Expand Down Expand Up @@ -2450,6 +2495,33 @@ abstract class HttpServerTest<SERVER> extends WithHttpServer<SERVER> {
}
} as BiFunction<RequestContext, Object, Flow<Void>>)

final BiFunction<RequestContext, Object, Flow<Void>> responseBodyObjectCb =
({ RequestContext rqCtxt, Object obj ->
if (obj instanceof Map) {
obj = obj.collectEntries {
[
it.key,
(it.value instanceof Iterable || it.value instanceof String[]) ? it.value : [it.value]
]
}
} else if (!(obj instanceof String) && !(obj instanceof List)) {
obj = obj.properties
.findAll { it.key != 'class' }
.collectEntries { [it.key, it.value instanceof Iterable ? it.value : [it.value]] }
}
Context context = rqCtxt.getData(RequestContextSlot.APPSEC)
if (context.responseBodyTag) {
rqCtxt.traceSegment.setTagTop('response.body', obj as String)
}
if (context.responseBlock) {
new RbaFlow(
new Flow.Action.RequestBlockingAction(413, BlockingContentType.JSON)
)
} else {
Flow.ResultFlow.empty()
}
} as BiFunction<RequestContext, Object, Flow<Void>>)

final BiFunction<RequestContext, Integer, Flow<Void>> responseStartedCb =
({ RequestContext rqCtxt, Integer resultCode ->
Context context = rqCtxt.getData(RequestContextSlot.APPSEC)
Expand Down
Loading
Loading