Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting zero byte files using abris #307

Open
kpr9991 opened this issue Aug 3, 2022 · 3 comments
Open

Getting zero byte files using abris #307

kpr9991 opened this issue Aug 3, 2022 · 3 comments

Comments

@kpr9991
Copy link

kpr9991 commented Aug 3, 2022

I was using abris with confluent schema registry to deserialize avro records received from kafka source.
When i use confluent schema registry and manually get the schema and pass it to spark default from_avro function by skipping first 6 bytes i was able to read records. I wish to do the same using abris. Since abris as a library does that. But when i am using abris 0 byte files are written. Is this issue with Abris ?

Working code without Abris:

package pruthvi.kafka.poc
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.functions.from_avro
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.Trigger
import io.confluent.kafka.schemaregistry.client.rest.RestService
import org.apache.avro.Schema

import java.util
import org.apache.spark.sql.avro.functions._

object Important1 {

  def main(args: Array[String]): Unit = {
    println("Hello world!")

try {
      val spark: SparkSession = SparkSession.builder
      .master("local[3]")
      .appName("Kafka testing")
      .config("spark.streaming.stopGracefullyOnShutdown", "true")
      .config("spark.sql.shuffle.partitions",3)
      .getOrCreate()
      spark.conf.set("spark.sql.avro.compression.codec", "uncompressed")

      val topicName = "foo"

      val df = spark.readStream
      .format("kafka")
      .option("kafka.security.protocol", "SSL")
      .option("kafka.ssl.truststore.location", "certs/truststore.jks")
      .option("kafka.ssl.keystore.location", "certs/keystore.jks")
      .option("kafka.ssl.key.password", "foo")
      .option("kafka.ssl.keystore.password", "foo")
      .option("kafka.ssl.truststore.password", "foo")
      .option("kafka.bootstrap.servers", "x:16501,y:16501,z:16501")
      .option("subscribe", topicName)
      .option("kafka.group.id", "foo")
      .option("startingOffsets", "earliest")
    .load()

  val schemaRegistryURL = "url"
  val restService = new RestService(schemaRegistryURL)
  val valueRestResponseSchema = restService.getLatestVersion(topicName)
val jsonSchema = valueRestResponseSchema.getSchema

      import spark.implicits._
      val dsAvroRecord = df
        .selectExpr("substring(value, 6) as avro_value")
        .select(
      from_avro($"avro_value", jsonSchema, fromAvroConfig).as("RecordValue")) 

  dsAvroRecord.writeStream
    .outputMode("append")
    .format("json")
    .option("path", "output")
    .trigger(Trigger.ProcessingTime(1))
    .option("checkpointLocation", "chk_point_dir")
    .start().awaitTermination()
}
    catch{
      case e:Exception=>{
        println(e.printStackTrace())
      }
    }
  }
}

With Abris :

package pruthvi.kafka.poc
package examples

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.Trigger
import za.co.absa.abris.config.AbrisConfig

import scala.concurrent.duration.Duration

object readStreamingData {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder
      .master("local[4]")
      .appName("Kafka testing")
      .config("spark.streaming.stopGracefullyOnShutdown", "true")
      .getOrCreate()

    val topicName = "foo"
    val df = spark.readStream
      .format("kafka")
      .option("kafka.security.protocol", "SSL")
      .option("kafka.ssl.truststore.location", "certs/truststore.jks")
      .option("kafka.ssl.keystore.location", "certs/keystore.jks")
      .option("kafka.ssl.key.password", "")
      .option("kafka.ssl.keystore.password", "")
      .option("kafka.ssl.truststore.password", "")
      .option(
        "kafka.bootstrap.servers",
        "x:16501,y:16501,z:16501"
      )
      .option("kafka.group.id", "foo")
      .option("subscribe", topicName)
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger","50")
      .load()

    val abrisConfig =
      AbrisConfig.fromConfluentAvro.downloadReaderSchemaByLatestVersion
        .andTopicNameStrategy(topicName)
        .usingSchemaRegistry("url")

    import za.co.absa.abris.avro.functions.from_avro
    val deserialized = df.select(from_avro(col("value"), abrisConfig) as 'data)

    deserialized
    .writeStream
    .option("path", "output")
    .option("checkpointLocation", "chk_point_dir")
    .outputMode("append")
    .format("parquet")
    .trigger(Trigger.ProcessingTime(1000))
    .start()
    .awaitTermination()



  }
}
@kevinwallimann
Copy link
Collaborator

Hi @kpr9991 Thanks for your examples. I ran your code with Abris and consumed the records as expected. I don't see any errors in your provided code. What versions of Spark and Abris are you running? I see that you used the same output and checkpoint directory for both examples. Is it possible that you forgot to clear the checkpoints before running the example with Abris?

@kpr9991
Copy link
Author

kpr9991 commented Aug 7, 2022

I ran it for 2.5hrs and then it was filling up the 0 byte files. The culprit was trigger. I thought trigger would trigger the infinite table for every x milliseconds, and write the output but it was creating 0 byte files for every x milliseconds and after some hours it started to write the files. Trigger is not behaving as expected. So i removed trigger and it ran perfectly. But still failed for batch data. Can you try once with spark.read instead of spark.readStream and spark.write instead of spark.writeStream. I am getting 0 byte files even in this case and this doesnt have any triggers so nothing to remove from this code to test

@kevinwallimann
Copy link
Collaborator

Hi @kpr9991 I was able to run the example using .read and .write and could ingest the records as expected. I don't see that this is a problem in Abris. The default value for the trigger is ProcessingTime(0), so it seems strange that changing from ProcessingTime(1000) to ProcessingTime(0) solved your problem, but it surely sped up your query. One reason for empty parquet files can be if some partitions happen to be empty. Maybe you can try running with a single executor and see if the problem persists.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants