Skip to content

Commit 166f773

Browse files
Fix tracer OpenLineage instrumentation (#9487)
* Fix tracer OpenLineage instrumentation --------- Signed-off-by: Pawel Leszczynski <[email protected]>
1 parent c68b54c commit 166f773

File tree

2 files changed

+83
-3
lines changed

2 files changed

+83
-3
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import datadog.trace.api.Config;
88
import datadog.trace.api.DDTags;
99
import datadog.trace.api.DDTraceId;
10+
import datadog.trace.api.ProcessTags;
1011
import datadog.trace.api.datastreams.DataStreamsTags;
1112
import datadog.trace.api.sampling.PrioritySampling;
1213
import datadog.trace.api.sampling.SamplingMechanism;
@@ -180,7 +181,9 @@ public void setupOpenLineage(DDTraceId traceId) {
180181
"_dd.trace_id:"
181182
+ traceId.toString()
182183
+ ";_dd.ol_intake.emit_spans:false;_dd.ol_service:"
183-
+ sparkServiceName
184+
+ getServiceForOpenLineage(sparkConf, isRunningOnDatabricks)
185+
+ ";_dd.ol_intake.process_tags:"
186+
+ ProcessTags.getTagsForSerialization()
184187
+ ";_dd.ol_app_id:"
185188
+ appId);
186189
return;
@@ -669,6 +672,9 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
669672
}
670673
}
671674

675+
// OpenLineage call should be prior to method return statements
676+
notifyOl(x -> openLineageSparkListener.onTaskEnd(x), taskEnd);
677+
672678
// Only sending failing tasks
673679
if (!(taskEnd.reason() instanceof TaskFailedReason)) {
674680
return;
@@ -687,8 +693,6 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
687693

688694
Properties props = stageProperties.get(stageSpanKey);
689695
sendTaskSpan(stageSpan, taskEnd, props);
690-
691-
notifyOl(x -> openLineageSparkListener.onTaskEnd(x), taskEnd);
692696
}
693697

694698
public static boolean classIsLoadable(String className) {
@@ -1292,6 +1296,33 @@ private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDat
12921296
return sparkAppName;
12931297
}
12941298

1299+
private static String getServiceForOpenLineage(SparkConf conf, boolean isRunningOnDatabricks) {
1300+
// Service for OpenLineage in Databricks is not supported yet
1301+
if (isRunningOnDatabricks) {
1302+
return null;
1303+
}
1304+
1305+
// Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM
1306+
String serviceName = Config.get().getServiceName();
1307+
if (Config.get().isServiceNameSetByUser()
1308+
&& !"spark".equals(serviceName)
1309+
&& !"hadoop".equals(serviceName)) {
1310+
log.debug(
1311+
"Service explicitly set by user, not using the application name. Service name: {}",
1312+
serviceName);
1313+
return serviceName;
1314+
}
1315+
1316+
String sparkAppName = conf.get("spark.app.name", null);
1317+
if (sparkAppName != null) {
1318+
log.debug(
1319+
"Using Spark application name as the Datadog service for OpenLineage. Spark application name: {}",
1320+
sparkAppName);
1321+
}
1322+
1323+
return sparkAppName;
1324+
}
1325+
12951326
private static void reportKafkaOffsets(
12961327
final String appName, final AgentSpan span, final SourceProgress progress) {
12971328
if (!traceConfig().isDataStreamsEnabled()

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding
44
import com.datadoghq.sketch.ddsketch.proto.DDSketch
55
import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore
66
import datadog.trace.agent.test.InstrumentationSpecification
7+
import datadog.trace.api.DDTraceId
8+
import datadog.trace.api.ProcessTags
79
import org.apache.spark.SparkConf
810
import org.apache.spark.Success$
911
import org.apache.spark.executor.TaskMetrics
@@ -12,6 +14,7 @@ import org.apache.spark.scheduler.SparkListenerApplicationEnd
1214
import org.apache.spark.scheduler.SparkListenerApplicationStart
1315
import org.apache.spark.scheduler.SparkListenerExecutorAdded
1416
import org.apache.spark.scheduler.SparkListenerExecutorRemoved
17+
import org.apache.spark.scheduler.SparkListenerInterface
1518
import org.apache.spark.scheduler.SparkListenerJobEnd
1619
import org.apache.spark.scheduler.SparkListenerJobStart
1720
import org.apache.spark.scheduler.SparkListenerStageCompleted
@@ -519,6 +522,52 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification {
519522
}
520523
}
521524

525+
def "test setupOpenLineage gets service name"(boolean serviceNameSetByUser, String serviceName, String sparkAppName) {
526+
setup:
527+
SparkConf sparkConf = new SparkConf()
528+
injectSysConfig("dd.service.name.set.by.user", Boolean.toString(serviceNameSetByUser))
529+
if (serviceNameSetByUser) {
530+
injectSysConfig("dd.service.name", serviceName)
531+
}
532+
if (sparkAppName != null) {
533+
sparkConf.set("spark.app.name", sparkAppName)
534+
}
535+
536+
def listener = getTestDatadogSparkListener(sparkConf)
537+
listener.openLineageSparkListener = Mock(SparkListenerInterface)
538+
listener.openLineageSparkConf = new SparkConf()
539+
listener.setupOpenLineage(Mock(DDTraceId))
540+
541+
expect:
542+
assert listener
543+
.openLineageSparkConf
544+
.get("spark.openlineage.run.tags")
545+
.split(";")
546+
.contains("_dd.ol_service:expected-service-name")
547+
548+
where:
549+
serviceNameSetByUser | serviceName | sparkAppName
550+
true | "expected-service-name" | null
551+
false | null | "expected-service-name"
552+
true | "spark" | "expected-service-name"
553+
true | "hadoop" | "expected-service-name"
554+
}
555+
556+
def "test setupOpenLineage fills ProcessTags"() {
557+
setup:
558+
def listener = getTestDatadogSparkListener()
559+
listener.openLineageSparkListener = Mock(SparkListenerInterface)
560+
listener.openLineageSparkConf = new SparkConf()
561+
listener.setupOpenLineage(Mock(DDTraceId))
562+
563+
expect:
564+
assert listener
565+
.openLineageSparkConf
566+
.get("spark.openlineage.run.tags")
567+
.split(";")
568+
.contains("_dd.ol_intake.process_tags:" + ProcessTags.getTagsForSerialization())
569+
}
570+
522571
protected validateRelativeError(double value, double expected, double relativeAccuracy) {
523572
double relativeError = Math.abs(value - expected) / expected
524573
assert relativeError < relativeAccuracy

0 commit comments

Comments
 (0)