Skip to content

Commit

Permalink
fix(hudi): fix alignment performance (#253)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyogev authored and doronporat committed Dec 26, 2019
1 parent bf8af44 commit 2f3ce1d
Showing 1 changed file with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ package com.yotpo.metorikku.output.writers.file
import com.yotpo.metorikku.configuration.job.output.Hudi
import com.yotpo.metorikku.output.Writer
import org.apache.log4j.LogManager
import org.apache.spark
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.functions.{col, lit, when, max}
import org.apache.spark.sql._

import scala.collection.mutable.ListBuffer

// REQUIRED: -Dspark.serializer=org.apache.spark.serializer.KryoSerializer
// http://hudi.incubator.apache.org/configurations.html

Expand Down Expand Up @@ -55,7 +52,7 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext
}

df = this.hudiOutputProperties.alignToPreviousSchema match {
case Some(true) => alignToPreviousSchema(supportNullableFields(df))
case Some(true) => alignToPreviousSchema(df)
case _ => df
}

Expand Down Expand Up @@ -208,10 +205,6 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext
dataFrame.sparkSession.createDataFrame(dataFrame.rdd, schema)
}

def isOnlyNullColumn(df: DataFrame, name: String): Boolean = {
df.select(name).filter(col(name).isNotNull).limit(1).count() == 0
}

def alignToSchemaColumns(df: DataFrame, previousSchema: Option[StructType]) : DataFrame = {
val lowerCasedColumns = df.columns.map(f => f.toLowerCase)
previousSchema match {
Expand All @@ -228,16 +221,27 @@ class HudiOutputWriter(props: Map[String, Object], hudiOutput: Option[Hudi]) ext

def removeNullColumns(dataFrame: DataFrame, previousSchema: Option[StructType]): DataFrame = {
var df = dataFrame
val nullColumns = df
.select(df.schema.fields.map(
f =>
when(
max(col(f.name)).isNull, true)
.otherwise(false)):_*)
.collect()(0)

var fieldMap = Map[String, DataType]()

val schema = StructType(df.schema.fields.flatMap(
field => {
val schema = StructType(df.schema.fields.zipWithIndex.flatMap(
a => {
val field = a._1.copy(nullable = true)
val index = a._2

// Add nullability, not on by default
val fieldName = field.name
var returnedFields = List[StructField]()

// Column is detected as having only null values, we need to remove it
isOnlyNullColumn(df, fieldName) match {
nullColumns(index).asInstanceOf[Boolean] match {
case true => {
df = df.drop(fieldName)

Expand Down

0 comments on commit 2f3ce1d

Please sign in to comment.