Skip to content

Commit f6e3e98

Browse files
Update sbt, scripted-plugin to 1.11.0 (#257)
* Improve reliability of TypeInformation.createSerializer() and TypeSerializer.duplicate() (#254) * Improve reliability of TypeInformation.createSerializer() and TypeSerializer.duplicate() * Update magnolia to 1.3.18 (#256) * Update sbt, scripted-plugin to 1.11.0 --------- Co-authored-by: Arnaud <[email protected]>
1 parent c9afa56 commit f6e3e98

32 files changed

+291
-45
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ lazy val commonSettings = Seq(
2929
)
3030
} else {
3131
Seq(
32-
"com.softwaremill.magnolia1_3" %% "magnolia" % "1.3.16",
32+
"com.softwaremill.magnolia1_3" %% "magnolia" % "1.3.18",
3333
"org.scala-lang" %% "scala3-compiler" % scalaVersion.value % Provided
3434
)
3535
}

modules/flink-1-api/src/main/scala/org/apache/flinkx/api/serializer/ArraySerializer.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,15 @@ class ArraySerializer[T](val child: TypeSerializer[T], clazz: Class[T]) extends
2929
}
3030
}
3131

32+
override def duplicate(): ArraySerializer[T] = {
33+
val duplicatedChild = child.duplicate()
34+
if (duplicatedChild.eq(child)) {
35+
this
36+
} else {
37+
new ArraySerializer[T](duplicatedChild, clazz)
38+
}
39+
}
40+
3241
override def getLength: Int = -1
3342

3443
override def deserialize(source: DataInputView): Array[T] = {

modules/flink-1-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ import org.apache.flinkx.api.serializer.CoproductSerializer.CoproductSerializerS
88
class CoproductSerializer[T](subtypeClasses: Array[Class[_]], subtypeSerializers: Array[TypeSerializer[_]])
99
extends MutableSerializer[T] {
1010

11-
override val isImmutableType: Boolean = subtypeSerializers.forall(_.isImmutableType)
11+
override val isImmutableType: Boolean = subtypeSerializers.forall(Option(_).exists(_.isImmutableType))
12+
val isImmutableSerializer: Boolean = subtypeSerializers.forall(Option(_).forall(s => s.duplicate().eq(s)))
1213

1314
override def copy(from: T): T = {
1415
if (from == null || isImmutableType) {
@@ -19,6 +20,14 @@ class CoproductSerializer[T](subtypeClasses: Array[Class[_]], subtypeSerializers
1920
}
2021
}
2122

23+
override def duplicate(): CoproductSerializer[T] = {
24+
if (isImmutableSerializer) {
25+
this
26+
} else {
27+
new CoproductSerializer[T](subtypeClasses, subtypeSerializers.map(_.duplicate()))
28+
}
29+
}
30+
2231
override def createInstance(): T =
2332
// this one may be used for later reuse, but we never reuse coproducts due to their unclear concrete type
2433
subtypeSerializers.head.createInstance().asInstanceOf[T]
@@ -74,7 +83,7 @@ object CoproductSerializer {
7483

7584
subtypeSerializers = (0 until len).map { _ =>
7685
if (
77-
/* - The old code was calling getCurrentVersion() just before calling readSnapshot().
86+
/* - The old code was calling getCurrentVersion() just before calling readSnapshot().
7887
If only getCurrentVersion() is called, we know we must deserialize with old behavior.
7988
- The new code calls getCurrentVersion() only before calling writeSnapshot().
8089
getCurrentVersion() is not called before calling readSnapshot()

modules/flink-1-api/src/main/scala/org/apache/flinkx/api/serializer/ListCCSerializer.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@ class ListCCSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mut
1616
}
1717
}
1818

19+
override def duplicate(): ListCCSerializer[T] = {
20+
val duplicatedChild = child.duplicate()
21+
if (duplicatedChild.eq(child)) {
22+
this
23+
} else {
24+
new ListCCSerializer[T](duplicatedChild, clazz)
25+
}
26+
}
27+
1928
override def createInstance(): ::[T] = throw new IllegalArgumentException("cannot create instance of non-empty list")
2029
override def getLength: Int = -1
2130
override def deserialize(source: DataInputView): ::[T] = {

modules/flink-1-api/src/main/scala/org/apache/flinkx/api/serializer/ListSerializer.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@ class ListSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutab
1515
}
1616
}
1717

18+
override def duplicate(): ListSerializer[T] = {
19+
val duplicatedChild = child.duplicate()
20+
if (duplicatedChild.eq(child)) {
21+
this
22+
} else {
23+
new ListSerializer[T](duplicatedChild, clazz)
24+
}
25+
}
26+
1827
override def createInstance(): List[T] = List.empty[T]
1928
override def getLength: Int = -1
2029
override def deserialize(source: DataInputView): List[T] = {

modules/flink-1-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,16 @@ class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends
1717
}
1818
}
1919

20+
override def duplicate(): MapSerializer[K, V] = {
21+
val duplicatedKS = ks.duplicate()
22+
val duplicatedVS = vs.duplicate()
23+
if (duplicatedKS.eq(ks) && duplicatedVS.eq(vs)) {
24+
this
25+
} else {
26+
new MapSerializer[K, V](duplicatedKS, duplicatedVS)
27+
}
28+
}
29+
2030
override def createInstance(): Map[K, V] = Map.empty[K, V]
2131
override def getLength: Int = -1
2232
override def deserialize(source: DataInputView): Map[K, V] = {
@@ -56,7 +66,7 @@ object MapSerializer {
5666

5767
override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
5868
if (
59-
/* - The old code was calling getCurrentVersion() just before calling readSnapshot().
69+
/* - The old code was calling getCurrentVersion() just before calling readSnapshot().
6070
If only getCurrentVersion() is called, we know we must deserialize with old behavior.
6171
- The new code calls getCurrentVersion() only before calling writeSnapshot().
6272
getCurrentVersion() is not called before calling readSnapshot()

modules/flink-1-api/src/main/scala/org/apache/flinkx/api/serializer/MappedSerializer.scala

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,27 @@ case class MappedSerializer[A, B](mapper: TypeMapper[A, B], ser: TypeSerializer[
1717
}
1818
}
1919

20-
override def equals(obj: Any): Boolean = ser.equals(obj)
20+
override def duplicate(): MappedSerializer[A, B] = {
21+
val duplicatedSer = ser.duplicate()
22+
if (duplicatedSer.eq(ser)) {
23+
this
24+
} else {
25+
MappedSerializer[A, B](mapper, duplicatedSer)
26+
}
27+
}
28+
29+
override def equals(other: Any): Boolean = other match {
30+
case that: MappedSerializer[_, _] =>
31+
mapper == that.mapper &&
32+
ser == that.ser
33+
case _ => false
34+
}
35+
36+
override def toString = s"MappedSerializer($mapper, $ser)"
2137

22-
override def toString: String = ser.toString
38+
override def hashCode(): Int = 31 * mapper.hashCode + ser.hashCode
2339

24-
override def hashCode(): Int = ser.hashCode()
25-
override def getLength: Int = ser.getLength
40+
override def getLength: Int = ser.getLength
2641

2742
override def serialize(record: A, target: DataOutputView): Unit = {
2843
ser.serialize(mapper.map(record), target)
@@ -60,15 +75,15 @@ object MappedSerializer {
6075
val mapperClazz = InstantiationUtil.resolveClassByName[TypeMapper[A, B]](in, userCodeClassLoader)
6176
mapper = InstantiationUtil.instantiate(mapperClazz)
6277
if (
63-
/* - The old code was calling getCurrentVersion() just before calling readSnapshot().
78+
/* - The old code was calling getCurrentVersion() just before calling readSnapshot().
6479
If only getCurrentVersion() is called, we know we must deserialize with old behavior.
6580
- The new code calls getCurrentVersion() only before calling writeSnapshot().
6681
getCurrentVersion() is not called before calling readSnapshot()
6782
or both getCurrentVersion() and writeSnapshot() are called,
6883
so in these cases we know the readVersion parameter is trustable to determine which behavior to apply. */
6984
(!currentVersionCalled || writeSnapshotCalled) &&
70-
// readVersion is trustable
71-
readVersion == 2
85+
// readVersion is trustable
86+
readVersion == 2
7287
) {
7388
ser = TypeSerializerSnapshot.readVersionedSnapshot[B](in, userCodeClassLoader).restoreSerializer()
7489
} else {

modules/flink-1-api/src/main/scala/org/apache/flinkx/api/serializer/SeqSerializer.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@ class SeqSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutabl
1515
}
1616
}
1717

18+
override def duplicate(): SeqSerializer[T] = {
19+
val duplicatedChild = child.duplicate()
20+
if (duplicatedChild.eq(child)) {
21+
this
22+
} else {
23+
new SeqSerializer[T](duplicatedChild, clazz)
24+
}
25+
}
26+
1827
override def createInstance(): Seq[T] = Seq.empty[T]
1928
override def getLength: Int = -1
2029
override def deserialize(source: DataInputView): Seq[T] = {

modules/flink-1-api/src/main/scala/org/apache/flinkx/api/serializer/SetSerializer.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@ class SetSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mutabl
1515
}
1616
}
1717

18+
override def duplicate(): SetSerializer[T] = {
19+
val duplicatedChild = child.duplicate()
20+
if (duplicatedChild.eq(child)) {
21+
this
22+
} else {
23+
new SetSerializer[T](duplicatedChild, clazz)
24+
}
25+
}
26+
1827
override def createInstance(): Set[T] = Set.empty[T]
1928
override def getLength: Int = -1
2029
override def deserialize(source: DataInputView): Set[T] = {

modules/flink-1-api/src/main/scala/org/apache/flinkx/api/serializer/VectorSerializer.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@ class VectorSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends Mut
1515
}
1616
}
1717

18+
override def duplicate(): VectorSerializer[T] = {
19+
val duplicatedChild = child.duplicate()
20+
if (duplicatedChild.eq(child)) {
21+
this
22+
} else {
23+
new VectorSerializer[T](duplicatedChild, clazz)
24+
}
25+
}
26+
1827
override def createInstance(): Vector[T] = Vector.empty[T]
1928
override def getLength: Int = -1
2029
override def deserialize(source: DataInputView): Vector[T] = {

0 commit comments

Comments
 (0)