Skip to content

Commit 42fc779

Browse files
committed
spark instrumentation use openlineage trace id if can generate it out of root parent information
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent 22535c7 commit 42fc779

File tree

2 files changed

+67
-34
lines changed

2 files changed

+67
-34
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
168168
}
169169

170170
public void setupOpenLineage(DDTraceId traceId) {
171-
log.debug("Setting up OpenLineage configuration");
171+
log.error("Setting up OpenLineage configuration with trace id {}", traceId);
172172
if (openLineageSparkListener != null) {
173173
openLineageSparkConf.set("spark.openlineage.transport.type", "composite");
174174
openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
@@ -284,6 +284,9 @@ private void captureOpenlineageContextIfPresent(
284284
builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace());
285285
builder.withTag("openlineage_parent_job_name", context.getParentJobName());
286286
builder.withTag("openlineage_parent_run_id", context.getParentRunId());
287+
builder.withTag("openlineage_root_parent_job_namespace", context.getRootParentJobNamespace());
288+
builder.withTag("openlineage_root_parent_job_name", context.getRootParentJobName());
289+
builder.withTag("openlineage_root_parent_run_id", context.getRootParentRunId());
287290
}
288291

289292
@Override

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java

Lines changed: 63 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,7 @@
77
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
88
import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector;
99
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
10-
import java.nio.ByteBuffer;
11-
import java.nio.charset.StandardCharsets;
12-
import java.security.MessageDigest;
13-
import java.security.NoSuchAlgorithmException;
10+
import datadog.trace.util.FNV64Hash;
1411
import java.util.Collections;
1512
import java.util.Map;
1613
import java.util.Optional;
@@ -32,11 +29,19 @@ public class OpenlineageParentContext implements AgentSpanContext {
3229
private final String parentJobNamespace;
3330
private final String parentJobName;
3431
private final String parentRunId;
32+
private final String rootParentJobNamespace;
33+
private final String rootParentJobName;
34+
private final String rootParentRunId;
3535

3636
public static final String OPENLINEAGE_PARENT_JOB_NAMESPACE =
3737
"spark.openlineage.parentJobNamespace";
3838
public static final String OPENLINEAGE_PARENT_JOB_NAME = "spark.openlineage.parentJobName";
3939
public static final String OPENLINEAGE_PARENT_RUN_ID = "spark.openlineage.parentRunId";
40+
public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE =
41+
"spark.openlineage.rootParentJobNamespace";
42+
public static final String OPENLINEAGE_ROOT_PARENT_JOB_NAME =
43+
"spark.openlineage.rootParentJobName";
44+
public static final String OPENLINEAGE_ROOT_PARENT_RUN_ID = "spark.openlineage.rootParentRunId";
4045

4146
public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
4247
if (!sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAMESPACE)
@@ -53,60 +58,73 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
5358
return Optional.empty();
5459
}
5560

61+
if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) {
62+
log.error("Have parent info, but not root parent info. Can't construct valid trace id.");
63+
return Optional.empty();
64+
}
65+
66+
String rootParentJobNamespace = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE, "");
67+
String rootParentJobName = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAME, "");
68+
String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID, "");
69+
5670
return Optional.of(
57-
new OpenlineageParentContext(parentJobNamespace, parentJobName, parentRunId));
71+
new OpenlineageParentContext(
72+
parentJobNamespace,
73+
parentJobName,
74+
parentRunId,
75+
rootParentJobNamespace,
76+
rootParentJobName,
77+
rootParentRunId));
5878
}
5979

60-
OpenlineageParentContext(String parentJobNamespace, String parentJobName, String parentRunId) {
80+
OpenlineageParentContext(
81+
String parentJobNamespace,
82+
String parentJobName,
83+
String parentRunId,
84+
String rootParentJobNamespace,
85+
String rootParentJobName,
86+
String rootParentRunId) {
6187
log.debug(
62-
"Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}",
88+
"Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}, rootParentJobNamespace: {}, rootParentJobName: {}, rootParentRunId: {}",
6389
parentJobNamespace,
6490
parentJobName,
65-
parentRunId);
91+
parentRunId,
92+
rootParentJobNamespace,
93+
rootParentJobName,
94+
rootParentRunId);
6695

6796
this.parentJobNamespace = parentJobNamespace;
6897
this.parentJobName = parentJobName;
6998
this.parentRunId = parentRunId;
7099

71-
MessageDigest digest = null;
72-
try {
73-
digest = MessageDigest.getInstance("SHA-256");
74-
} catch (NoSuchAlgorithmException e) {
75-
log.debug("Unable to find SHA-256 algorithm", e);
76-
}
100+
this.rootParentJobNamespace = rootParentJobNamespace;
101+
this.rootParentJobName = rootParentJobName;
102+
this.rootParentRunId = rootParentRunId;
77103

78-
if (digest != null && parentJobNamespace != null && parentRunId != null) {
79-
traceId = computeTraceId(digest, parentJobNamespace, parentJobName, parentRunId);
104+
if (this.parentRunId != null) {
105+
traceId = computeTraceId(this.parentRunId);
80106
spanId = DDSpanId.ZERO;
81107

82-
childRootSpanId =
83-
computeChildRootSpanId(digest, parentJobNamespace, parentJobName, parentRunId);
108+
if (this.rootParentRunId != null) {
109+
childRootSpanId = computeSpanId(this.rootParentRunId);
110+
} else {
111+
childRootSpanId = DDSpanId.ZERO;
112+
}
84113
} else {
85114
traceId = DDTraceId.ZERO;
86115
spanId = DDSpanId.ZERO;
87-
88116
childRootSpanId = DDSpanId.ZERO;
89117
}
90118

91119
log.debug("Created OpenlineageParentContext with traceId: {}, spanId: {}", traceId, spanId);
92120
}
93121

94-
private long computeChildRootSpanId(
95-
MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) {
96-
byte[] inputBytes =
97-
(parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8);
98-
byte[] hash = digest.digest(inputBytes);
99-
100-
return ByteBuffer.wrap(hash).getLong();
122+
private long computeSpanId(String runId) {
123+
return FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A);
101124
}
102125

103-
private DDTraceId computeTraceId(
104-
MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) {
105-
byte[] inputBytes =
106-
(parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8);
107-
byte[] hash = digest.digest(inputBytes);
108-
109-
return DDTraceId.from(ByteBuffer.wrap(hash).getLong());
126+
private DDTraceId computeTraceId(String runId) {
127+
return DDTraceId.from(FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A));
110128
}
111129

112130
@Override
@@ -159,4 +177,16 @@ public String getParentJobName() {
159177
public String getParentRunId() {
160178
return parentRunId;
161179
}
180+
181+
public String getRootParentJobNamespace() {
182+
return rootParentJobNamespace;
183+
}
184+
185+
public String getRootParentJobName() {
186+
return rootParentJobName;
187+
}
188+
189+
public String getRootParentRunId() {
190+
return rootParentRunId;
191+
}
162192
}

0 commit comments

Comments
 (0)