Skip to content

Commit cc7cdde

Browse files
committed
spark: use OpenLineage context to generate trace/span id if present
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent 42fc779 commit cc7cdde

File tree

3 files changed

+72
-44
lines changed

3 files changed

+72
-44
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -273,13 +273,7 @@ private void captureOpenlineageContextIfPresent(
273273
AgentTracer.SpanBuilder builder, OpenlineageParentContext context) {
274274
builder.asChildOf(context);
275275

276-
builder.withSpanId(context.getChildRootSpanId());
277-
278-
log.debug(
279-
"Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}",
280-
context,
281-
context.getTraceId(),
282-
context.getChildRootSpanId());
276+
log.debug("Captured Openlineage context: {}, with trace_id: {}", context, context.getTraceId());
283277

284278
builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace());
285279
builder.withTag("openlineage_parent_job_name", context.getParentJobName());

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ public class OpenlineageParentContext implements AgentSpanContext {
2424

2525
private final DDTraceId traceId;
2626
private final long spanId;
27-
private final long childRootSpanId;
2827

2928
private final String parentJobNamespace;
3029
private final String parentJobName;
@@ -50,23 +49,29 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
5049
return Optional.empty();
5150
}
5251

52+
if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) {
53+
log.error("Have parent info, but not root parent info. Can't construct valid trace id.");
54+
return Optional.empty();
55+
}
56+
5357
String parentJobNamespace = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAMESPACE);
5458
String parentJobName = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAME);
5559
String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID);
5660

5761
if (!UUID.matcher(parentRunId).matches()) {
62+
log.error("OpenLineage parent run id is not a valid UUID: {}", parentRunId);
5863
return Optional.empty();
5964
}
6065

61-
if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) {
62-
log.error("Have parent info, but not root parent info. Can't construct valid trace id.");
66+
String rootParentJobNamespace = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE);
67+
String rootParentJobName = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAME);
68+
String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID);
69+
70+
if (!UUID.matcher(rootParentRunId).matches()) {
71+
log.error("OpenLineage root parent run id is not a valid UUID: {}", parentRunId);
6372
return Optional.empty();
6473
}
6574

66-
String rootParentJobNamespace = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAMESPACE, "");
67-
String rootParentJobName = sparkConf.get(OPENLINEAGE_ROOT_PARENT_JOB_NAME, "");
68-
String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID, "");
69-
7075
return Optional.of(
7176
new OpenlineageParentContext(
7277
parentJobNamespace,
@@ -101,19 +106,15 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
101106
this.rootParentJobName = rootParentJobName;
102107
this.rootParentRunId = rootParentRunId;
103108

104-
if (this.parentRunId != null) {
109+
if (this.rootParentRunId != null) {
110+
traceId = computeTraceId(this.rootParentRunId);
111+
spanId = computeSpanId(this.parentRunId);
112+
} else if (this.parentRunId != null) {
105113
traceId = computeTraceId(this.parentRunId);
106-
spanId = DDSpanId.ZERO;
107-
108-
if (this.rootParentRunId != null) {
109-
childRootSpanId = computeSpanId(this.rootParentRunId);
110-
} else {
111-
childRootSpanId = DDSpanId.ZERO;
112-
}
114+
spanId = computeSpanId(this.parentRunId);
113115
} else {
114116
traceId = DDTraceId.ZERO;
115117
spanId = DDSpanId.ZERO;
116-
childRootSpanId = DDSpanId.ZERO;
117118
}
118119

119120
log.debug("Created OpenlineageParentContext with traceId: {}, spanId: {}", traceId, spanId);
@@ -124,6 +125,7 @@ private long computeSpanId(String runId) {
124125
}
125126

126127
private DDTraceId computeTraceId(String runId) {
128+
log.debug("Generating traceID from runId: {}", runId);
127129
return DDTraceId.from(FNV64Hash.generateHash(runId, FNV64Hash.Version.v1A));
128130
}
129131

@@ -137,10 +139,6 @@ public long getSpanId() {
137139
return spanId;
138140
}
139141

140-
public long getChildRootSpanId() {
141-
return childRootSpanId;
142-
}
143-
144142
@Override
145143
public AgentTraceCollector getTraceCollector() {
146144
return AgentTracer.NoopAgentTraceCollector.INSTANCE;
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,39 @@
11
package datadog.trace.instrumentation.spark
22

3-
import datadog.trace.api.DDSpanId
43
import org.apache.spark.SparkConf
54
import spock.lang.Specification
65

76
class OpenlineageParentContextTest extends Specification {
8-
def "should create none empty OpenLineageParentContext using SHA-256 for TraceID and root span SpanId if all required fields are present" () {
7+
def "should create OpenLineageParentContext with particular trace id based on root parent id" () {
98
given:
109
SparkConf mockSparkConf = Mock(SparkConf)
1110

1211
when:
1312
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true
1413
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true
1514
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true
15+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true
1616
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default"
1717
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3"
1818
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> parentRunId
19+
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentRunId
1920

2021
then:
2122
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
2223
parentContext.isPresent()
2324

2425
parentContext.get().getParentJobNamespace() == "default"
2526
parentContext.get().getParentJobName() == "dag-push-to-s3-spark.upload_to_s3"
26-
parentContext.get().getParentRunId() == expectedParentRunId
27+
parentContext.get().getRootParentRunId() == rootParentRunId
28+
parentContext.get().getParentRunId() == parentRunId
2729

28-
parentContext.get().traceId.toLong() == expectedTraceId
29-
parentContext.get().spanId == DDSpanId.ZERO
30-
parentContext.get().childRootSpanId == expectedRootSpanId
30+
parentContext.get().traceId.toString() == expectedTraceId
31+
parentContext.get().spanId.toString() == expectedSpanId
3132

3233
where:
33-
parentRunId | expectedParentRunId | expectedTraceId | expectedRootSpanId
34-
"ad3b6baa-8d88-3b38-8dbe-f06232249a84" | "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | 0xa475569dbce5e6cfL | 0xa475569dbce5e6cfL
35-
"ad3b6baa-8d88-3b38-8dbe-f06232249a85" | "ad3b6baa-8d88-3b38-8dbe-f06232249a85" | 0x31da6680bd14991bL | 0x31da6680bd14991bL
34+
rootParentRunId | parentRunId | expectedTraceId | expectedSpanId
35+
"01964820-5280-7674-b04e-82fbed085f39" | "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | "13959090542865903119" | "2903780135964948649"
36+
"1a1a1a1a-2b2b-3c3c-4d4d-5e5e5e5e5e5e" | "6f6f6f6f-5e5e-4d4d-3c3c-2b2b2b2b2b2b" | "15830118871223350489" | "8020087091656517257"
3637
}
3738

3839
def "should create empty OpenLineageParentContext if any required field is missing" () {
@@ -43,20 +44,24 @@ class OpenlineageParentContextTest extends Specification {
4344
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> jobNamespacePresent
4445
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> jobNamePresent
4546
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runIdPresent
47+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentIdPresent
4648

4749
then:
4850
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
4951
parentContext.isPresent() == expected
5052

5153
where:
52-
jobNamespacePresent | jobNamePresent | runIdPresent | expected
53-
true | true | false | false
54-
true | false | true | false
55-
false | true | true | false
56-
true | false | false | false
57-
false | true | false | false
58-
false | false | true | false
59-
false | false | false | false
54+
jobNamespacePresent | jobNamePresent | runIdPresent | rootParentIdPresent | expected
55+
true | true | true | false | false
56+
true | true | false | false | false
57+
true | true | true | false | false
58+
true | true | false | true | false
59+
true | false | true | false | false
60+
false | true | true | true | false
61+
true | false | false | false | false
62+
false | true | false | false | false
63+
false | false | true | true | false
64+
false | false | false | false | false
6065
}
6166

6267
def "should only generate a non-empty OpenlineageParentContext if parentRunId is a valid UUID" () {
@@ -67,9 +72,12 @@ class OpenlineageParentContextTest extends Specification {
6772
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true
6873
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true
6974
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true
75+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true
7076
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default"
7177
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3"
7278
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runId
79+
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> runId
80+
7381

7482
then:
7583
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
@@ -83,5 +91,33 @@ class OpenlineageParentContextTest extends Specification {
8391
"6afeb6ee-729d-37f7-b8e6f47ca694" | false
8492
"6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true
8593
}
94+
95+
def "should only generate a non-empty OpenlineageParentContext if rootParentRunId is a valid UUID" () {
96+
given:
97+
SparkConf mockSparkConf = Mock(SparkConf)
98+
99+
when:
100+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true
101+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true
102+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true
103+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> true
104+
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default"
105+
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3"
106+
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> "6afeb6ee-729d-37f7-ad73-b8e6f47ca694"
107+
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_ROOT_PARENT_RUN_ID) >> rootParentRunId
108+
109+
110+
then:
111+
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
112+
parentContext.isPresent() == expected
113+
114+
where:
115+
rootParentRunId | expected
116+
"6afeb6ee-729d-37f7-ad73-b8e6f47ca694" | true
117+
" " | false
118+
"invalid-uuid" | false
119+
"6afeb6ee-729d-37f7-b8e6f47ca694" | false
120+
"6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true
121+
}
86122
}
87123

0 commit comments

Comments
 (0)