Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Scala Steward: Reformat with scalafmt 3.8.3
867d1cf479642e3bfccf04d54ff54a226e07626c
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
style = defaultWithAlign
maxColumn = 120
version = 2.7.5
version = 3.8.3
runner.dialect = "scala213"
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ lazy val `scala 2.12` = "2.12.15"
lazy val `scala 2.13` = "2.13.8"
lazy val `scala 3` = "3.1.2"

scalaVersion := `scala 2.13`
scalaVersion := `scala 2.13`
crossScalaVersions := Seq(`scala 2.12`, `scala 2.13`, `scala 3`)

organization := "io.findify"
licenses := Seq("MIT" -> url("https://opensource.org/licenses/MIT"))
homepage := Some(url("https://github.com/findify/flink-adt"))
licenses := Seq("MIT" -> url("https://opensource.org/licenses/MIT"))
homepage := Some(url("https://github.com/findify/flink-adt"))

publishMavenStyle := true

Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/io/findify/flinkadt/api/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ package object api extends LowPrioImplicits {
implicit lazy val shortInfo: TypeInformation[Short] = BasicTypeInfo.getInfoFor(classOf[Short])

implicit lazy val bigDecMapper: TypeMapper[scala.BigDecimal, java.math.BigDecimal] = new BigDecMapper()
implicit lazy val bigDecInfo: TypeInformation[BigDecimal] = mappedTypeInfo[scala.BigDecimal, java.math.BigDecimal]
implicit lazy val bigIntMapper: TypeMapper[scala.BigInt, java.math.BigInteger] = new BigIntMapper()
implicit lazy val bigIntInfo: TypeInformation[BigInt] = mappedTypeInfo[scala.BigInt, java.math.BigInteger]
implicit lazy val bigDecInfo: TypeInformation[BigDecimal] = mappedTypeInfo[scala.BigDecimal, java.math.BigDecimal]
implicit lazy val bigIntMapper: TypeMapper[scala.BigInt, java.math.BigInteger] = new BigIntMapper()
implicit lazy val bigIntInfo: TypeInformation[BigInt] = mappedTypeInfo[scala.BigInt, java.math.BigInteger]

implicit lazy val unitInfo: TypeInformation[Unit] = new UnitTypeInformation()
implicit def mappedTypeInfo[A: ClassTag, B](implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.types.NullFieldException

/** Serializer for Case Classes. Creation and access is different from
* our Java Tuples so we have to treat them differently.
* Copied from Flink 1.14.
/** Serializer for Case Classes. Creation and access is different from our Java Tuples so we have to treat them
* differently. Copied from Flink 1.14.
*/
@Internal
@SerialVersionUID(7341356073446263475L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class CoproductSerializer[T](subtypeClasses: Array[Class[_]], subtypeSerializers
override def copy(from: T): T = from
override def copy(from: T, reuse: T): T = from
override def copy(source: DataInputView, target: DataOutputView): Unit = serialize(deserialize(source), target)
override def createInstance(): T =
override def createInstance(): T =
// this one may be used for later reuse, but we never reuse coproducts due to their unclear concrete type
subtypeSerializers.head.createInstance().asInstanceOf[T]
override def getLength: Int = -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import org.apache.flink.annotation.Internal
import org.apache.flink.api.common.typeutils._
import org.apache.flink.core.memory.{DataInputView, DataOutputView}

/** Serializer for [[Either]].
* Copied from Flink 1.14.
/** Serializer for [[Either]]. Copied from Flink 1.14.
*/
@Internal
@SerialVersionUID(9219995873023657525L)
Expand Down Expand Up @@ -101,7 +100,7 @@ class EitherSerializer[A, B](
obj match {
case eitherSerializer: EitherSerializer[_, _] =>
leftSerializer.equals(eitherSerializer.leftSerializer) &&
rightSerializer.equals(eitherSerializer.rightSerializer)
rightSerializer.equals(eitherSerializer.rightSerializer)
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import org.apache.flink.annotation.Internal
import org.apache.flink.api.common.typeutils.{SimpleTypeSerializerSnapshot, TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}

/** Serializer for cases where no serializer is required but the system still expects one. This
* happens for OptionTypeInfo when None is used, or for Either when one of the type parameters
* is Nothing.
/** Serializer for cases where no serializer is required but the system still expects one. This happens for
* OptionTypeInfo when None is used, or for Either when one of the type parameters is Nothing.
*/
@Internal
class NothingSerializer extends TypeSerializer[Any] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnap

import java.io.ObjectInputStream

/** This is a non macro-generated, concrete Scala case class serializer.
* Copied from Flink 1.14 with two changes:
* 1. Does not extend `SelfResolvingTypeSerializer`, since we're breaking compatibility anyway.
* 2. Move `lookupConstructor` to version-specific sources.
/** This is a non macro-generated, concrete Scala case class serializer. Copied from Flink 1.14 with two changes:
* 1. Does not extend `SelfResolvingTypeSerializer`, since we're breaking compatibility anyway. 2. Move
* `lookupConstructor` to version-specific sources.
*/
@SerialVersionUID(1L)
class ScalaCaseClassSerializer[T <: Product](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.InstantiationUtil

class ScalaCaseObjectSerializer[T](clazz: Class[T]) extends TypeSerializerSingleton[T] {
override def isImmutableType: Boolean = true
override def copy(from: T): T = from
override def copy(from: T, reuse: T): T = from
override def isImmutableType: Boolean = true
override def copy(from: T): T = from
override def copy(from: T, reuse: T): T = from
override def copy(source: DataInputView, target: DataOutputView): Unit = {}
override def createInstance(): T = clazz.getField("MODULE$").get(null).asInstanceOf[T]
override def getLength: Int = 0
override def createInstance(): T = clazz.getField("MODULE$").get(null).asInstanceOf[T]
override def getLength: Int = 0
override def serialize(record: T, target: DataOutputView): Unit = {}

override def deserialize(source: DataInputView): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
import org.apache.flink.core.memory.MemorySegment
import org.apache.flink.types.{KeyFieldOutOfBoundsException, NullKeyFieldException}

/** Comparator for Case Classes. Access is different from
* our Java Tuples so we have to treat them differently.
/** Comparator for Case Classes. Access is different from our Java Tuples so we have to treat them differently.
*/
@Internal
class CaseClassComparator[T <: Product](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import scala.annotation.{nowarn, tailrec}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

/** TypeInformation for Case Classes. Creation and access is different from
* our Java Tuples so we have to treat them differently.
/** TypeInformation for Case Classes. Creation and access is different from our Java Tuples so we have to treat them
* differently.
*/
@Public
abstract class CaseClassTypeInfo[T <: Product](
Expand Down Expand Up @@ -279,9 +279,9 @@ abstract class CaseClassTypeInfo[T <: Product](
obj match {
case caseClass: CaseClassTypeInfo[_] =>
caseClass.canEqual(this) &&
super.equals(caseClass) &&
typeParamTypeInfos.sameElements(caseClass.typeParamTypeInfos) &&
fieldNames.equals(caseClass.fieldNames)
super.equals(caseClass) &&
typeParamTypeInfos.sameElements(caseClass.typeParamTypeInfos) &&
fieldNames.equals(caseClass.fieldNames)
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
import scala.reflect.{ClassTag, classTag}

case class CollectionTypeInformation[T: ClassTag](serializer: TypeSerializer[T]) extends TypeInformation[T] {
val clazz = classTag[T].runtimeClass.asInstanceOf[Class[T]]
val clazz = classTag[T].runtimeClass.asInstanceOf[Class[T]]
override def createSerializer(config: ExecutionConfig): TypeSerializer[T] = serializer
override def isBasicType: Boolean = false
override def isTupleType: Boolean = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ class EitherTypeInfo[A, B, T <: Either[A, B]](
obj match {
case eitherTypeInfo: EitherTypeInfo[_, _, _] =>
eitherTypeInfo.canEqual(this) &&
clazz.equals(eitherTypeInfo.clazz) &&
leftTypeInfo.equals(eitherTypeInfo.leftTypeInfo) &&
rightTypeInfo.equals(eitherTypeInfo.rightTypeInfo)
clazz.equals(eitherTypeInfo.clazz) &&
leftTypeInfo.equals(eitherTypeInfo.leftTypeInfo) &&
rightTypeInfo.equals(eitherTypeInfo.rightTypeInfo)
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ abstract class SimpleTypeInformation[T: ClassTag: TypeSerializer] extends TypeIn
override def isTupleType: Boolean = false
override def isKeyType: Boolean = false
override def getTotalFields: Int = 1
override def getTypeClass: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
override def getArity: Int = 1
override def getTypeClass: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
override def getArity: Int = 1
}
2 changes: 1 addition & 1 deletion src/test/scala/io/findify/flinkadt/SerializerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with Test

it should "derive recursively" in {
// recursive is broken
//val ti = implicitly[TypeInformation[Node]]
// val ti = implicitly[TypeInformation[Node]]
}

it should "derive list" in {
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/io/findify/flinkadt/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait TestUtils extends Matchers with Inspectors {
ser.getClass.getClassLoader
)
val restoredSerializer = restoredSnapshot.restoreSerializer()
val copy = restoredSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray)))
val copy = restoredSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray)))
in shouldBe copy
}

Expand Down