diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index c2cd955..0f70ec0 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -105,10 +105,14 @@ case class CsvRelation protected[spark] ( tokenRdd(schemaFields.map(_.name)).flatMap { tokens => if (dropMalformed && schemaFields.length != tokens.size) { - logger.warn(s"Dropping malformed line: ${tokens.mkString(",")}") + logger.warn(s"Dropping malformed line " + + s"(expected ${schemaFields.length} tokens but received " + + s"${tokens.size} tokens): ${tokens.mkString(",")}") None } else if (failFast && schemaFields.length != tokens.size) { - throw new RuntimeException(s"Malformed line in FAILFAST mode: ${tokens.mkString(",")}") + throw new RuntimeException(s"Malformed line " + + s"(expected ${schemaFields.length} tokens but received " + + s"${tokens.size} tokens) in FAILFAST mode: ${tokens.mkString(",")}") } else { var index: Int = 0 val rowArray = new Array[Any](schemaFields.length) @@ -125,13 +129,16 @@ case class CsvRelation protected[spark] ( case aiob: ArrayIndexOutOfBoundsException if permissive => (index until schemaFields.length).foreach(ind => rowArray(ind) = null) Some(Row.fromSeq(rowArray)) - case _: java.lang.NumberFormatException | - _: IllegalArgumentException if dropMalformed => - logger.warn("Number format exception. " + + case e: java.lang.NumberFormatException if dropMalformed => + logger.warn(s"Number format exception (${e.getMessage})." + + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") + None + case e: IllegalArgumentException if dropMalformed => + logger.warn(s"IllegalArgument exception (${e.getMessage})." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None case pe: java.text.ParseException if dropMalformed => - logger.warn("Parse exception. " + + logger.warn(s"Parse exception. (${pe.getMessage})." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None } @@ -170,11 +177,14 @@ case class CsvRelation protected[spark] ( val requiredSize = requiredFields.length tokenRdd(schemaFields.map(_.name)).flatMap { tokens => if (dropMalformed && schemaFields.length != tokens.size) { - logger.warn(s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") + logger.warn(s"Dropping malformed line " + + s"(expected ${schemaFields.length} tokens but received " + + s"${tokens.size} tokens): ${tokens.mkString(delimiter.toString)}") None } else if (failFast && schemaFields.length != tokens.size) { - throw new RuntimeException(s"Malformed line in FAILFAST mode: " + - s"${tokens.mkString(delimiter.toString)}") + throw new RuntimeException(s"Malformed line in FAILFAST mode " + + s"(expected ${schemaFields.length} tokens but received " + + s"${tokens.size} tokens): ${tokens.mkString(delimiter.toString)}") } else { val indexSafeTokens = if (permissive && schemaFields.length != tokens.size) { tokens ++ new Array[String](schemaFields.length - tokens.size) @@ -198,13 +208,16 @@ case class CsvRelation protected[spark] ( } Some(Row.fromSeq(rowArray.take(requiredSize))) } catch { - case _: java.lang.NumberFormatException | - _: IllegalArgumentException if dropMalformed => - logger.warn("Number format exception. " + + case e: java.lang.NumberFormatException if dropMalformed => + logger.warn("Number format exception (" + e + ")." + + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") + None + case e: IllegalArgumentException if dropMalformed => + logger.warn("IllegalArgument exception (" + e + ")." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None case pe: java.text.ParseException if dropMalformed => - logger.warn("Parse exception. " + + logger.warn("Parse exception (" + pe + ")." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None } diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index 00eb846..7c055e4 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -205,7 +205,8 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { .collect() } - assert(exception.getMessage.contains("Malformed line in FAILFAST mode: 2015,Chevy,Volt")) + assert(exception.getMessage.contains("Malformed line in FAILFAST mode " + + "(expected 5 tokens but received 3 tokens): 2015,Chevy,Volt")) } test("DSL test roundtrip nulls") {