Skip to content

Commit cc72f34

Browse files
committed
code review fixes, add version tag
Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent cc7cdde commit cc72f34

File tree

6 files changed

+40
-11
lines changed

6 files changed

+40
-11
lines changed

dd-java-agent/instrumentation/spark/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ configurations.all {
77
resolutionStrategy.deactivateDependencyLocking()
88
}
99
dependencies {
10+
implementation project(':utils:version-utils')
1011
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
1112
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'
1213

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
import datadog.trace.instrumentation.spark.AbstractSparkListenerTest
22
import datadog.trace.instrumentation.spark.DatadogSpark212Listener
33
import org.apache.spark.SparkConf
4-
import org.apache.spark.scheduler.SparkListener
54

65
class SparkListenerTest extends AbstractSparkListenerTest {
76
@Override
8-
protected SparkListener getTestDatadogSparkListener() {
7+
protected DatadogSpark212Listener getTestDatadogSparkListener() {
98
def conf = new SparkConf()
109
return new DatadogSpark212Listener(conf, "some_app_id", "some_version")
1110
}
11+
12+
@Override
13+
protected DatadogSpark212Listener getTestDatadogSparkListener(SparkConf conf) {
14+
return new DatadogSpark212Listener(conf, "some_app_id", "some_version")
15+
}
1216
}
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
import datadog.trace.instrumentation.spark.AbstractSparkListenerTest
22
import datadog.trace.instrumentation.spark.DatadogSpark213Listener
33
import org.apache.spark.SparkConf
4-
import org.apache.spark.scheduler.SparkListener
54

65
class SparkListenerTest extends AbstractSparkListenerTest {
76
@Override
8-
protected SparkListener getTestDatadogSparkListener() {
7+
protected DatadogSpark213Listener getTestDatadogSparkListener() {
98
def conf = new SparkConf()
109
return new DatadogSpark213Listener(conf, "some_app_id", "some_version")
1110
}
11+
12+
@Override
13+
protected DatadogSpark213Listener getTestDatadogSparkListener(SparkConf conf) {
14+
return new DatadogSpark213Listener(conf, "some_app_id", "some_version")
15+
}
1216
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import com.fasterxml.jackson.databind.JsonNode;
99
import com.fasterxml.jackson.databind.ObjectMapper;
10+
import datadog.common.version.VersionInfo;
1011
import datadog.trace.api.Config;
1112
import datadog.trace.api.DDTags;
1213
import datadog.trace.api.DDTraceId;
@@ -168,7 +169,7 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
168169
}
169170

170171
public void setupOpenLineage(DDTraceId traceId) {
171-
log.error("Setting up OpenLineage configuration with trace id {}", traceId);
172+
log.debug("Setting up OpenLineage configuration with trace id {}", traceId);
172173
if (openLineageSparkListener != null) {
173174
openLineageSparkConf.set("spark.openlineage.transport.type", "composite");
174175
openLineageSparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
@@ -185,7 +186,9 @@ public void setupOpenLineage(DDTraceId traceId) {
185186
+ ";_dd.ol_intake.emit_spans:false;_dd.ol_service:"
186187
+ sparkServiceName
187188
+ ";_dd.ol_app_id:"
188-
+ appId);
189+
+ appId
190+
+ ";_dd.tracer_version:"
191+
+ VersionInfo.VERSION);
189192
return;
190193
}
191194
log.debug(

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
5050
}
5151

5252
if (!sparkConf.contains(OPENLINEAGE_ROOT_PARENT_RUN_ID)) {
53-
log.error("Have parent info, but not root parent info. Can't construct valid trace id.");
53+
log.debug("Found parent info, but not root parent info. Can't construct valid trace id.");
5454
return Optional.empty();
5555
}
5656

@@ -59,7 +59,7 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
5959
String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID);
6060

6161
if (!UUID.matcher(parentRunId).matches()) {
62-
log.error("OpenLineage parent run id is not a valid UUID: {}", parentRunId);
62+
log.debug("OpenLineage parent run id is not a valid UUID: {}", parentRunId);
6363
return Optional.empty();
6464
}
6565

@@ -68,7 +68,7 @@ public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
6868
String rootParentRunId = sparkConf.get(OPENLINEAGE_ROOT_PARENT_RUN_ID);
6969

7070
if (!UUID.matcher(rootParentRunId).matches()) {
71-
log.error("OpenLineage root parent run id is not a valid UUID: {}", parentRunId);
71+
log.debug("OpenLineage root parent run id is not a valid UUID: {}", parentRunId);
7272
return Optional.empty();
7373
}
7474

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding
44
import com.datadoghq.sketch.ddsketch.proto.DDSketch
55
import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore
66
import datadog.trace.agent.test.AgentTestRunner
7+
import org.apache.spark.SparkConf
78
import org.apache.spark.Success$
89
import org.apache.spark.executor.TaskMetrics
910
import org.apache.spark.scheduler.JobSucceeded$
10-
import org.apache.spark.scheduler.SparkListener
1111
import org.apache.spark.scheduler.SparkListenerApplicationEnd
1212
import org.apache.spark.scheduler.SparkListenerApplicationStart
1313
import org.apache.spark.scheduler.SparkListenerExecutorAdded
@@ -30,7 +30,8 @@ import scala.collection.JavaConverters
3030

3131
abstract class AbstractSparkListenerTest extends AgentTestRunner {
3232

33-
protected abstract SparkListener getTestDatadogSparkListener()
33+
protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener()
34+
protected abstract AbstractDatadogSparkListener getTestDatadogSparkListener(SparkConf conf)
3435

3536
protected applicationStartEvent(time=0L) {
3637
// Constructor of SparkListenerApplicationStart changed starting spark 3.0
@@ -463,6 +464,22 @@ abstract class AbstractSparkListenerTest extends AgentTestRunner {
463464
}
464465
}
465466

467+
def "sets up OpenLineage trace id properly"() {
468+
setup:
469+
def conf = new SparkConf()
470+
conf.set("spark.openlineage.parentRunId", "ad3b6baa-8d88-3b38-8dbe-f06232249a84")
471+
conf.set("spark.openlineage.parentJobNamespace", "default")
472+
conf.set("spark.openlineage.parentJobName", "dag-push-to-s3-spark.upload_to_s3")
473+
conf.set("spark.openlineage.rootParentRunId", "01964820-5280-7674-b04e-82fbed085f39")
474+
conf.set("spark.openlineage.rootParentJobNamespace", "default")
475+
conf.set("spark.openlineage.rootParentJobName", "dag-push-to-s3-spark")
476+
def listener = getTestDatadogSparkListener(conf)
477+
478+
expect:
479+
listener.onApplicationStart(applicationStartEvent(1000L))
480+
assert listener.openLineageSparkConf.get("spark.openlineage.run.tags").contains("13959090542865903119")
481+
}
482+
466483
def "test lastJobFailed is not set when job is cancelled"() {
467484
setup:
468485
def listener = getTestDatadogSparkListener()

0 commit comments

Comments
 (0)