Skip to content

Commit 767540d

Browse files
mobuchowskisezen-datadog
authored andcommitted
Use OpenLineage root parent information to generate trace id (#8726)
* spark instrumentation use openlineage trace id if can generate it out of root parent information Signed-off-by: Maciej Obuchowski <[email protected]> * spark: use OpenLineage context to generate trace/span id if present Signed-off-by: Maciej Obuchowski <[email protected]> * code review fixes, add version tag Signed-off-by: Maciej Obuchowski <[email protected]> --------- Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent 7f268f3 commit 767540d

File tree

6 files changed

+158
-72
lines changed

6 files changed

+158
-72
lines changed
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
import datadog.trace.instrumentation.spark.AbstractSparkListenerTest
22
import datadog.trace.instrumentation.spark.DatadogSpark212Listener
33
import org.apache.spark.SparkConf
4-
import org.apache.spark.scheduler.SparkListener
54

65
class SparkListenerTest extends AbstractSparkListenerTest {
76
@Override
8-
protected SparkListener getTestDatadogSparkListener() {
7+
protected DatadogSpark212Listener getTestDatadogSparkListener() {
98
def conf = new SparkConf()
109
return new DatadogSpark212Listener(conf, "some_app_id", "some_version")
1110
}
11+
12+
@Override
13+
protected DatadogSpark212Listener getTestDatadogSparkListener(SparkConf conf) {
14+
return new DatadogSpark212Listener(conf, "some_app_id", "some_version")
15+
}
1216
}
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
import datadog.trace.instrumentation.spark.AbstractSparkListenerTest
22
import datadog.trace.instrumentation.spark.DatadogSpark213Listener
33
import org.apache.spark.SparkConf
4-
import org.apache.spark.scheduler.SparkListener
54

65
class SparkListenerTest extends AbstractSparkListenerTest {
76
@Override
8-
protected SparkListener getTestDatadogSparkListener() {
7+
protected DatadogSpark213Listener getTestDatadogSparkListener() {
98
def conf = new SparkConf()
109
return new DatadogSpark213Listener(conf, "some_app_id", "some_version")
1110
}
11+
12+
@Override
13+
protected DatadogSpark213Listener getTestDatadogSparkListener(SparkConf conf) {
14+
return new DatadogSpark213Listener(conf, "some_app_id", "some_version")
15+
}
1216
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
168168
}
169169

170170
public void setupOpenLineage(DDTraceId traceId) {
171-
log.debug("Setting up OpenLineage configuration");
171+
log.debug("Setting up OpenLineage configuration with trace id {}", traceId);
172172
if (openLineageSparkListener != null) {
173173
openLineageSparkConf.set("spark.openlineage.transport.type", "composite");
174174
openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
@@ -273,17 +273,14 @@ private void captureOpenlineageContextIfPresent(
273273
AgentTracer.SpanBuilder builder, OpenlineageParentContext context) {
274274
builder.asChildOf(context);
275275

276-
builder.withSpanId(context.getChildRootSpanId());
277-
278-
log.debug(
279-
"Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}",
280-
context,
281-
context.getTraceId(),
282-
context.getChildRootSpanId());
276+
log.debug("Captured Openlineage context: {}, with trace_id: {}", context, context.getTraceId());
283277

284278
builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace());
285279
builder.withTag("openlineage_parent_job_name", context.getParentJobName());
286280
builder.withTag("openlineage_parent_run_id", context.getParentRunId());
281+
builder.withTag("openlineage_root_parent_job_namespace", context.getRootParentJobNamespace());
282+
builder.withTag("openlineage_root_parent_job_name", context.getRootParentJobName());
283+
builder.withTag("openlineage_root_parent_run_id", context.getRootParentRunId());
287284
}
288285

289286
@Override

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java

Lines changed: 69 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,7 @@
77
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
88
import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector;
99
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
10-
import java.nio.ByteBuffer;
11-
import java.nio.charset.StandardCharsets;
12-
import java.security.MessageDigest;
13-
import java.security.NoSuchAlgorithmException;
10+
import datadog.trace.util.FNV64Hash;
1411
import java.util.Collections;
1512
import java.util.Map;
1613
import java.util.Optional;
@@ -27,16 +24,23 @@ public class OpenlineageParentContext implements AgentSpanContext {
2724

2825
private final DDTraceId traceId;
2926
private final long spanId;
30-
private final long childRootSpanId;
3127

3228
private final String parentJobNamespace;
3329
private final String parentJobName;
3430
private final String parentRunId;
31+
private final String rootParentJobNamespace;
32+
private final String rootParentJobName;
33+
private final String rootParentRunId;
3534

3635
public static final String OPENLINEAGE_PARENT_JOB_NAMESPACE =
3736
"spark.openlineage.parentJobNamespace";
3837
public static final String OPENLINEAGE_PARENT_JOB_NAME = "spark.openlineage.parentJobName";
3938
public static final String OPENLINEAGE_PARENT_RUN_ID = "spark.openlineage.parentRunId";
39+
public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE =
40+
"spark.openlineage.rootParentJobNamespace";
41+
public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAME =
42+
"spark.openlineage.rootParentJobName";
43+
public static final String OPENLINEAGE_ROOT_PARENT_RUN_ID = "spark.openlineage.rootParentRunId";
4044

4145
public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
4246
if (!sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAMESPACE)
@@ -45,68 +49,84 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
4549
return Optional.empty();
4650
}
4751

52+
if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) {
53+
log.debug("Found parent info, but not root parent info. Can't construct valid trace id.");
54+
return Optional.empty();
55+
}
56+
4857
String parentJobNamespace = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAMESPACE);
4958
String parentJobName = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAME);
5059
String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID);
5160

5261
if (!UUID.matcher(parentRunId).matches()) {
62+
log.debug("OpenLineage parent run id is not a valid UUID: {}", parentRunId);
63+
return Optional.empty();
64+
}
65+
66+
String rootParentJobNamespace = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE);
67+
String rootParentJobName = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAME);
68+
String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID);
69+
70+
if (!UUID.matcher(rootParentRunId).matches()) {
71+
log.debug("OpenLineage root parent run id is not a valid UUID: {}", parentRunId);
5372
return Optional.empty();
5473
}
5574

5675
return Optional.of(
57-
new OpenlineageParentContext(parentJobNamespace, parentJobName, parentRunId));
76+
new OpenlineageParentContext(
77+
parentJobNamespace,
78+
parentJobName,
79+
parentRunId,
80+
rootParentJobNamespace,
81+
rootParentJobName,
82+
rootParentRunId));
5883
}
5984

60-
OpenlineageParentContext(String parentJobNamespace, String parentJobName, String parentRunId) {
85+
OpenlineageParentContext(
86+
String parentJobNamespace,
87+
String parentJobName,
88+
String parentRunId,
89+
String rootParentJobNamespace,
90+
String rootParentJobName,
91+
String rootParentRunId) {
6192
log.debug(
62-
"Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}",
93+
"Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}, rootParentJobNamespace: {}, rootParentJobName: {}, rootParentRunId: {}",
6394
parentJobNamespace,
6495
parentJobName,
65-
parentRunId);
96+
parentRunId,
97+
rootParentJobNamespace,
98+
rootParentJobName,
99+
rootParentRunId);
66100

67101
this.parentJobNamespace = parentJobNamespace;
68102
this.parentJobName = parentJobName;
69103
this.parentRunId = parentRunId;
70104

71-
MessageDigest digest = null;
72-
try {
73-
digest = MessageDigest.getInstance("SHA-256");
74-
} catch (NoSuchAlgorithmException e) {
75-
log.debug("Unable to find SHA-256 algorithm", e);
76-
}
105+
this.rootParentJobNamespace = rootParentJobNamespace;
106+
this.rootParentJobName = rootParentJobName;
107+
this.rootParentRunId = rootParentRunId;
77108

78-
if (digest != null && parentJobNamespace != null && parentRunId != null) {
79-
traceId = computeTraceId(digest, parentJobNamespace, parentJobName, parentRunId);
80-
spanId = DDSpanId.ZERO;
81-
82-
childRootSpanId =
83-
computeChildRootSpanId(digest, parentJobNamespace, parentJobName, parentRunId);
109+
if (this.rootParentRunId != null) {
110+
traceId = computeTraceId(this.rootParentRunId);
111+
spanId = computeSpanId(this.parentRunId);
112+
} else if (this.parentRunId != null) {
113+
traceId = computeTraceId(this.parentRunId);
114+
spanId = computeSpanId(this.parentRunId);
84115
} else {
85116
traceId = DDTraceId.ZERO;
86117
spanId = DDSpanId.ZERO;
87-
88-
childRootSpanId = DDSpanId.ZERO;
89118
}
90119

91120
log.debug("Created OpenlineageParentContext with traceId: {}, spanId: {}", traceId, spanId);
92121
}
93122

94-
private long computeChildRootSpanId(
95-
MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) {
96-
byte[] inputBytes =
97-
(parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8);
98-
byte[] hash = digest.digest(inputBytes);
99-
100-
return ByteBuffer.wrap(hash).getLong();
123+
private long computeSpanId(String runId) {
124+
return FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A);
101125
}
102126

103-
private DDTraceId computeTraceId(
104-
MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) {
105-
byte[] inputBytes =
106-
(parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8);
107-
byte[] hash = digest.digest(inputBytes);
108-
109-
return DDTraceId.from(ByteBuffer.wrap(hash).getLong());
127+
private DDTraceId computeTraceId(String runId) {
128+
log.debug("Generating traceID from runId: {}", runId);
129+
return DDTraceId.from(FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A));
110130
}
111131

112132
@Override
@@ -119,10 +139,6 @@ public long getSpanId() {
119139
return spanId;
120140
}
121141

122-
public long getChildRootSpanId() {
123-
return childRootSpanId;
124-
}
125-
126142
@Override
127143
public AgentTraceCollector getTraceCollector() {
128144
return AgentTracer.NoopAgentTraceCollector.INSTANCE;
@@ -159,4 +175,16 @@ public String getParentJobName() {
159175
public String getParentRunId() {
160176
return parentRunId;
161177
}
178+
179+
public String getRootParentJobNamespace() {
180+
return rootParentJobNamespace;
181+
}
182+
183+
public String getRootParentJobName() {
184+
return rootParentJobName;
185+
}
186+
187+
public String getRootParentRunId() {
188+
return rootParentRunId;
189+
}
162190
}

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding
44
import com.datadoghq.sketch.ddsketch.proto.DDSketch
55
import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore
66
import datadog.trace.agent.test.AgentTestRunner
7+
import org.apache.spark.SparkConf
78
import org.apache.spark.Success$
89
import org.apache.spark.executor.TaskMetrics
910
import org.apache.spark.scheduler.JobSucceeded$
10-
import org.apache.spark.scheduler.SparkListener
1111
import org.apache.spark.scheduler.SparkListenerApplicationEnd
1212
import org.apache.spark.scheduler.SparkListenerApplicationStart
1313
import org.apache.spark.scheduler.SparkListenerExecutorAdded
@@ -30,7 +30,8 @@ import scala.collection.JavaConverters
3030

3131
abstract class AbstractSparkListenerTest extends AgentTestRunner {
3232

33-
protected abstract SparkListener getTestDatadogSparkListener()
33+
protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener()
34+
protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener(SparkConf conf)
3435

3536
protected applicationStartEvent(time=0L) {
3637
// Constructor of SparkListenerApplicationStart changed starting spark 3.0
@@ -463,6 +464,22 @@ abstract class AbstractSparkListenerTest extends AgentTestRunner {
463464
}
464465
}
465466

467+
def "sets up OpenLineage trace id properly"() {
468+
setup:
469+
def conf = new SparkConf()
470+
conf.set("spark.openlineage.parentRunId", "ad3b6baa-8d88-3b38-8dbe-f06232249a84")
471+
conf.set("spark.openlineage.parentJobNamespace", "default")
472+
conf.set("spark.openlineage.parentJobName", "dag-push-to-s3-spark.upload_to_s3")
473+
conf.set("spark.openlineage.rootParentRunId", "01964820-5280-7674-b04e-82fbed085f39")
474+
conf.set("spark.openlineage.rootParentJobNamespace", "default")
475+
conf.set("spark.openlineage.rootParentJobName", "dag-push-to-s3-spark")
476+
def listener = getTestDatadogSparkListener(conf)
477+
478+
expect:
479+
listener.onApplicationStart(applicationStartEvent(1000L))
480+
assert listener.openLineageSparkConf.get("spark.openlineage.run.tags").contains("13959090542865903119")
481+
}
482+
466483
def "test lastJobFailed is not set when job is cancelled"() {
467484
setup:
468485
def listener = getTestDatadogSparkListener()

0 commit comments

Comments
 (0)