Skip to content

Fix memory leak caused by incorrect context deactivation #896

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,13 @@
import com.uber.cadence.internal.tracing.TracingPropagator;
import com.uber.cadence.serviceclient.ClientOptions;
import com.uber.cadence.serviceclient.auth.IAuthorizationProvider;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Deadline;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.*;
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -228,10 +218,23 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
Span span =
tracingPropagator.activateSpanByServiceMethod(
tracingPropagator.spanByServiceMethod(
String.format(OPERATIONFORMAT, method.getBareMethodName()));
super.start(responseListener, headers);
span.finish();
Scope scope = tracer.activateSpan(span);
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
try {
super.onClose(status, trailers);
} finally {
span.finish();
scope.close();
}
}
},
headers);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ final class SyncDecisionContext implements WorkflowInterceptor {
private final byte[] lastCompletionResult;
private final WorkflowImplementationOptions workflowImplementationOptions;
private final TracingPropagator tracingPropagator;
private final Tracer tracer;

public SyncDecisionContext(
DecisionContext context,
Expand Down Expand Up @@ -133,6 +134,7 @@ public SyncDecisionContext(
this.lastCompletionResult = lastCompletionResult;
this.workflowImplementationOptions = workflowImplementationOptions;
this.tracingPropagator = new TracingPropagator(tracer);
this.tracer = tracer;
}

/**
Expand All @@ -154,8 +156,8 @@ public WorkflowInterceptor getWorkflowInterceptor() {
@Override
public byte[] executeWorkflow(
SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input) {
Span span = tracingPropagator.activateSpanForExecuteWorkflow(context);
try {
Span span = tracingPropagator.spanForExecuteWorkflow(context);
try (io.opentracing.Scope scope = tracer.activateSpan(span)) {
return workflowDefinition.execute(input.getInput());
} finally {
span.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,57 +53,45 @@ public TracingPropagator(Tracer tracer) {
this.tracer = tracer;
}

public Span activateSpanByServiceMethod(String serviceMethod) {
Span span = tracer.buildSpan(serviceMethod).asChildOf(tracer.activeSpan()).start();
tracer.activateSpan(span);
return span;
public Span spanByServiceMethod(String serviceMethod) {
return tracer.buildSpan(serviceMethod).asChildOf(tracer.activeSpan()).start();
}

public Span activateSpanForExecuteWorkflow(DecisionContext context) {
public Span spanForExecuteWorkflow(DecisionContext context) {
WorkflowExecutionStartedEventAttributes attributes =
context.getWorkflowExecutionStartedEventAttributes();
SpanContext parent = extract(attributes.getHeader());

Span span =
tracer
.buildSpan(EXECUTE_WORKFLOW)
.addReference(
References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
.withTag(TAG_WORKFLOW_TYPE, context.getWorkflowType().getName())
.withTag(TAG_WORKFLOW_ID, context.getWorkflowId())
.withTag(TAG_WORKFLOW_RUN_ID, context.getRunId())
.start();
tracer.activateSpan(span);
return span;
return tracer
.buildSpan(EXECUTE_WORKFLOW)
.addReference(
References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
.withTag(TAG_WORKFLOW_TYPE, context.getWorkflowType().getName())
.withTag(TAG_WORKFLOW_ID, context.getWorkflowId())
.withTag(TAG_WORKFLOW_RUN_ID, context.getRunId())
.start();
}

public Span activateSpanForExecuteActivity(PollForActivityTaskResponse task) {
public Span spanForExecuteActivity(PollForActivityTaskResponse task) {
SpanContext parent = extract(task.getHeader());
Span span =
tracer
.buildSpan(EXECUTE_ACTIVITY)
.addReference(
References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
.withTag(
TAG_WORKFLOW_TYPE,
task.isSetWorkflowType() ? task.getWorkflowType().getName() : "null")
.withTag(
TAG_WORKFLOW_ID,
task.isSetWorkflowExecution()
? task.getWorkflowExecution().getWorkflowId()
: "null")
.withTag(
TAG_WORKFLOW_RUN_ID,
task.isSetWorkflowExecution() ? task.getWorkflowExecution().getRunId() : "null")
.withTag(
TAG_ACTIVITY_TYPE,
task.isSetActivityType() ? task.getActivityType().getName() : "null")
.start();
tracer.activateSpan(span);
return span;
return tracer
.buildSpan(EXECUTE_ACTIVITY)
.addReference(
References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
.withTag(
TAG_WORKFLOW_TYPE, task.isSetWorkflowType() ? task.getWorkflowType().getName() : "null")
.withTag(
TAG_WORKFLOW_ID,
task.isSetWorkflowExecution() ? task.getWorkflowExecution().getWorkflowId() : "null")
.withTag(
TAG_WORKFLOW_RUN_ID,
task.isSetWorkflowExecution() ? task.getWorkflowExecution().getRunId() : "null")
.withTag(
TAG_ACTIVITY_TYPE, task.isSetActivityType() ? task.getActivityType().getName() : "null")
.start();
}

public Span activateSpanForExecuteLocalActivity(Task task) {
public Span spanForExecuteLocalActivity(Task task) {
ExecuteLocalActivityParameters params = task.getExecuteLocalActivityParameters();

// retrieve spancontext from params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.uber.m3.util.Duration;
import com.uber.m3.util.ImmutableMap;
import io.opentracing.Span;
import io.opentracing.Tracer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -53,6 +54,7 @@ public class ActivityWorker extends SuspendableWorkerBase {
private final IWorkflowService service;
private final String domain;
private final String taskList;
private final Tracer tracer;
private final TracingPropagator spanFactory;

public ActivityWorker(
Expand All @@ -75,6 +77,7 @@ public ActivityWorker(
this.domain = Objects.requireNonNull(domain);
this.taskList = Objects.requireNonNull(taskList);
this.handler = handler;
this.tracer = options.getTracer();
this.spanFactory = new TracingPropagator(options.getTracer());

PollerOptions pollerOptions = options.getPollerOptions();
Expand Down Expand Up @@ -142,9 +145,8 @@ public void handle(PollForActivityTaskResponse task) throws Exception {
MDC.put(LoggerTag.RUN_ID, task.getWorkflowExecution().getRunId());

propagateContext(task);
Span span = spanFactory.activateSpanForExecuteActivity(task);

try {
Span span = spanFactory.spanForExecuteActivity(task);
try (io.opentracing.Scope scope = tracer.activateSpan(span)) {
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
ActivityTaskHandler.Result response = handler.handle(task, metricsScope, false);
sw.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.uber.m3.tally.Stopwatch;
import com.uber.m3.util.ImmutableMap;
import io.opentracing.Span;
import io.opentracing.Tracer;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
Expand All @@ -50,6 +51,7 @@ public final class LocalActivityWorker extends SuspendableWorkerBase {
private final SingleWorkerOptions options;
private final LocalActivityPollTask laPollTask;
private final TracingPropagator spanFactory;
private final Tracer tracer;

public LocalActivityWorker(
String domain, String taskList, SingleWorkerOptions options, ActivityTaskHandler handler) {
Expand All @@ -58,6 +60,7 @@ public LocalActivityWorker(
this.handler = handler;
this.laPollTask = new LocalActivityPollTask();
this.spanFactory = new TracingPropagator(options.getTracer());
this.tracer = options.getTracer();

PollerOptions pollerOptions = options.getPollerOptions();
if (pollerOptions.getPollThreadNamePrefix() == null) {
Expand Down Expand Up @@ -129,42 +132,43 @@ public void handle(Task task) throws Exception {
propagateContext(task.params);

// start and activate span for local activities
Span span = spanFactory.activateSpanForExecuteLocalActivity(task);

task.taskStartTime = System.currentTimeMillis();
ActivityTaskHandler.Result result = handleLocalActivity(task);

LocalActivityMarkerData.Builder markerBuilder = new LocalActivityMarkerData.Builder();
markerBuilder.setActivityId(task.params.getActivityId());
markerBuilder.setActivityType(task.params.getActivityType());
long replayTimeMillis =
task.currentTimeMillis.getAsLong()
+ (System.currentTimeMillis() - task.replayTimeUpdatedAtMillis.getAsLong());
markerBuilder.setReplayTimeMillis(replayTimeMillis);

if (result.getTaskCompleted() != null) {
markerBuilder.setResult(result.getTaskCompleted().getResult());
} else if (result.getTaskFailedResult() != null) {
markerBuilder.setTaskFailedRequest(result.getTaskFailedResult().getTaskFailedRequest());
markerBuilder.setAttempt(result.getAttempt());
markerBuilder.setBackoff(result.getBackoff());
} else {
markerBuilder.setTaskCancelledRequest(result.getTaskCancelled());
Span span = spanFactory.spanForExecuteLocalActivity(task);
try (io.opentracing.Scope scope = tracer.activateSpan(span)) {
task.taskStartTime = System.currentTimeMillis();
ActivityTaskHandler.Result result = handleLocalActivity(task);

LocalActivityMarkerData.Builder markerBuilder = new LocalActivityMarkerData.Builder();
markerBuilder.setActivityId(task.params.getActivityId());
markerBuilder.setActivityType(task.params.getActivityType());
long replayTimeMillis =
task.currentTimeMillis.getAsLong()
+ (System.currentTimeMillis() - task.replayTimeUpdatedAtMillis.getAsLong());
markerBuilder.setReplayTimeMillis(replayTimeMillis);

if (result.getTaskCompleted() != null) {
markerBuilder.setResult(result.getTaskCompleted().getResult());
} else if (result.getTaskFailedResult() != null) {
markerBuilder.setTaskFailedRequest(result.getTaskFailedResult().getTaskFailedRequest());
markerBuilder.setAttempt(result.getAttempt());
markerBuilder.setBackoff(result.getBackoff());
} else {
markerBuilder.setTaskCancelledRequest(result.getTaskCancelled());
}

LocalActivityMarkerData marker = markerBuilder.build();

HistoryEvent event = new HistoryEvent();
event.setEventType(EventType.MarkerRecorded);
MarkerRecordedEventAttributes attributes =
new MarkerRecordedEventAttributes()
.setMarkerName(ClockDecisionContext.LOCAL_ACTIVITY_MARKER_NAME)
.setHeader(marker.getHeader(options.getDataConverter()))
.setDetails(marker.getResult());
event.setMarkerRecordedEventAttributes(attributes);
task.eventConsumer.accept(event);
} finally {
span.finish();
}

LocalActivityMarkerData marker = markerBuilder.build();

HistoryEvent event = new HistoryEvent();
event.setEventType(EventType.MarkerRecorded);
MarkerRecordedEventAttributes attributes =
new MarkerRecordedEventAttributes()
.setMarkerName(ClockDecisionContext.LOCAL_ACTIVITY_MARKER_NAME)
.setHeader(marker.getHeader(options.getDataConverter()))
.setDetails(marker.getResult());
event.setMarkerRecordedEventAttributes(attributes);
task.eventConsumer.accept(event);

span.finish();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentracing.Span;
import io.opentracing.Tracer;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
Expand All @@ -74,6 +75,7 @@ public class WorkflowServiceTChannel implements IWorkflowService {
private final Map<String, String> thriftHeaders;
private final TChannel tChannel;
private final TracingPropagator tracingPropagator;
private final Tracer tracer;
private final SubChannel subChannel;

/**
Expand All @@ -86,6 +88,7 @@ public WorkflowServiceTChannel(ClientOptions options) {
this.thriftHeaders = getThriftHeaders(options);
this.tChannel = new TChannel.Builder(options.getClientAppName()).build();
this.tracingPropagator = new TracingPropagator(options.getTracer());
this.tracer = options.getTracer();

InetAddress address;
try {
Expand Down Expand Up @@ -126,6 +129,7 @@ public WorkflowServiceTChannel(SubChannel subChannel, ClientOptions options) {
this.tChannel = null;
this.subChannel = subChannel;
this.tracingPropagator = new TracingPropagator(options.getTracer());
this.tracer = options.getTracer();
}

private static Map<String, String> getThriftHeaders(ClientOptions options) {
Expand Down Expand Up @@ -326,14 +330,15 @@ private <T> T measureRemoteCall(String scopeName, RemoteCall<T> call) throws TEx

private <T> T measureRemoteCallWithTags(
String scopeName, RemoteCall<T> call, Map<String, String> tags) throws TException {
Span span = tracingPropagator.activateSpanByServiceMethod(scopeName);
Scope scope = options.getMetricsScope().subScope(scopeName);
if (tags != null) {
scope = scope.tagged(tags);
}
scope.counter(MetricsType.CADENCE_REQUEST).inc(1);
Stopwatch sw = scope.timer(MetricsType.CADENCE_LATENCY).start();
try {

Span span = tracingPropagator.spanByServiceMethod(scopeName);
try (io.opentracing.Scope tracingScope = tracer.activateSpan(span)) {
T resp = call.apply();
sw.stop();
return resp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ private void testStartWorkflowHelper(
}

// assert workflow spans
MockSpan spanExecuteWF = getLinkedSpans(spans, spanStartWorkflow.context()).get(1);
MockSpan spanExecuteWF = getLinkedSpans(spans, spanStartWorkflow.context()).get(0);
assertEquals(spanExecuteWF.operationName(), "cadence-ExecuteWorkflow");
assertSpanReferences(spanExecuteWF, "follows_from", spanStartWorkflow);

Expand Down Expand Up @@ -382,7 +382,7 @@ private void testSignalWithStartWorkflowHelper(
// assert workflow spans
List<MockSpan> workflowSpans =
getSpansByTraceID(spans, spanStartWorkflow.context().toTraceId());
MockSpan spanExecuteWF = getLinkedSpans(spans, spanStartWorkflow.context()).get(1);
MockSpan spanExecuteWF = getLinkedSpans(spans, spanStartWorkflow.context()).get(0);
assertEquals(spanExecuteWF.operationName(), "cadence-ExecuteWorkflow");
assertSpanReferences(spanExecuteWF, "follows_from", spanStartWorkflow);

Expand Down