Skip to content

Atum expects path to be always available  #32

Open
@dk1844

Description

@dk1844

When Atum's control measure tracking is enabled, the measure tracking fails in case one attempts to write the dataframe directly to Kafka, e.g.

spark.enableControlMeasuresTracking(somePath).setControlMeasuresWorkflow(someName)
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()

with an error

WARN util.ExecutionListenerManager: Error executing query execution listener
java.util.NoSuchElementException: key not found: path
	at scala.collection.MapLike$class.default(MapLike.scala:228)
	at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.default(CaseInsensitiveMap.scala:28)
	at scala.collection.MapLike$class.apply(MapLike.scala:141)
	at org.apache.spark.sql.catalyst.util.CaseInsensitiveMap.apply(CaseInsensitiveMap.scala:28)
	at za.co.absa.atum.utils.ExecutionPlanUtils$.inferOutputInfoFileName(ExecutionPlanUtils.scala:103)
	at za.co.absa.atum.core.SparkQueryExecutionListener.writeInfoFileForQuery(SparkQueryExecutionListener.scala:52)
	at za.co.absa.atum.core.SparkQueryExecutionListener.onSuccess(SparkQueryExecutionListener.scala:32)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:127)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:126)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:148)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:146)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
	at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
	at org.apache.spark.sql.util.ExecutionListenerManager.org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling(QueryExecutionListener.scala:146)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply$mcV$sp(QueryExecutionListener.scala:126)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:126)
	at org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:126)
	at org.apache.spark.sql.util.ExecutionListenerManager.readLock(QueryExecutionListener.scala:159)
	at org.apache.spark.sql.util.ExecutionListenerManager.onSuccess(QueryExecutionListener.scala:125)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:678)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:268)

The underlying reason is that Atum expects that the parameter path exists SaveIntoDataSourceCommand.options map -- which only holds if conventional dataframe writing is used (e.g. to HDFS).

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions