From 2ec759f3dfba9d06779843e9784763d0eb2df9ca Mon Sep 17 00:00:00 2001 From: adaroussin Date: Fri, 25 Apr 2025 18:41:17 +0200 Subject: [PATCH 1/2] Handle state evolution without breaking state compatibility --- .../flinkx/api/SerializerSnapshotTest.scala | 36 ++-- .../org/apache/flinkx/api/TestUtils.scala | 14 +- .../CollectionSerializerSnapshot.scala | 46 +++-- .../api/serializer/CoproductSerializer.scala | 25 +-- .../flinkx/api/serializer/MapSerializer.scala | 29 +-- .../api/serializer/MappedSerializer.scala | 37 ++-- .../ScalaCaseObjectSerializer.scala | 6 +- .../CollectionSerializerSnapshotTest.scala | 33 ++++ .../MapSerializerSnapshotTest.scala | 178 ++++++++++++++++++ 9 files changed, 293 insertions(+), 111 deletions(-) create mode 100644 modules/flink-2-api/src/test/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshotTest.scala create mode 100644 modules/flink-2-api/src/test/scala/org/apache/flinkx/api/serializer/MapSerializerSnapshotTest.scala diff --git a/modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala b/modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala index 0f0205a..c606db7 100644 --- a/modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala +++ b/modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala @@ -1,33 +1,23 @@ package org.apache.flinkx.api -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} -import org.apache.flinkx.api.SerializerSnapshotTest.{ - ADT2, - OuterTrait, - SimpleClass1, - SimpleClassArray, - SimpleClassList, - SimpleClassMap1, - SimpleClassMap2, - TraitMap -} -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers -import org.apache.flinkx.api.serializers._ +import org.apache.flink.api.common.serialization.SerializerConfigImpl import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} import org.apache.flink.util.ChildFirstClassLoader +import org.apache.flinkx.api.SerializerSnapshotTest.* +import org.apache.flinkx.api.serializers.* import org.scalatest.Assertion +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.net.URLClassLoader class SerializerSnapshotTest extends AnyFlatSpec with Matchers { - def createSerializer[T: TypeInformation] = - implicitly[TypeInformation[T]].createSerializer(new ExecutionConfig()) + def createSerializer[T: TypeInformation]: TypeSerializer[T] = + implicitly[TypeInformation[T]].createSerializer(new SerializerConfigImpl()) it should "roundtrip product serializer snapshot" in { val ser = createSerializer[SimpleClass1] @@ -90,11 +80,11 @@ class SerializerSnapshotTest extends AnyFlatSpec with Matchers { val snap = ser.snapshotConfiguration() val buffer = new ByteArrayOutputStream() val output = new DataOutputViewStreamWrapper(buffer) - snap.writeSnapshot(output) + TypeSerializerSnapshot.writeVersionedSnapshot(output, snap) output.close() val input = new DataInputViewStreamWrapper(new ByteArrayInputStream(buffer.toByteArray)) - snap.readSnapshot(ser.snapshotConfiguration().getCurrentVersion, input, cl) - snap.restoreSerializer() + val deserSnap = TypeSerializerSnapshot.readVersionedSnapshot[T](input, cl) + deserSnap.restoreSerializer() } def assertRoundtripSerializer[T](ser: TypeSerializer[T]): Assertion = { diff --git a/modules/flink-1-api/src/test/scala/org/apache/flinkx/api/TestUtils.scala b/modules/flink-1-api/src/test/scala/org/apache/flinkx/api/TestUtils.scala index cccd011..b7ce857 100644 --- a/modules/flink-1-api/src/test/scala/org/apache/flinkx/api/TestUtils.scala +++ b/modules/flink-1-api/src/test/scala/org/apache/flinkx/api/TestUtils.scala @@ -1,7 +1,7 @@ package org.apache.flinkx.api import org.apache.flinkx.api.serializer.ScalaCaseClassSerializer -import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} import org.scalatest.{Assertion, Inspectors} @@ -14,14 +14,10 @@ trait TestUtils extends Matchers with Inspectors { val out = new ByteArrayOutputStream() ser.serialize(in, new DataOutputViewStreamWrapper(out)) val snapBytes = new ByteArrayOutputStream() - ser.snapshotConfiguration().writeSnapshot(new DataOutputViewStreamWrapper(snapBytes)) - val restoredSnapshot = ser.snapshotConfiguration() - restoredSnapshot - .readSnapshot( - restoredSnapshot.getCurrentVersion, - new DataInputViewStreamWrapper(new ByteArrayInputStream(snapBytes.toByteArray)), - ser.getClass.getClassLoader - ) + TypeSerializerSnapshot.writeVersionedSnapshot(new DataOutputViewStreamWrapper(snapBytes), ser.snapshotConfiguration()) + val restoredSnapshot = TypeSerializerSnapshot.readVersionedSnapshot[T]( + new DataInputViewStreamWrapper(new ByteArrayInputStream(snapBytes.toByteArray)), + ser.getClass.getClassLoader) val restoredSerializer = restoredSnapshot.restoreSerializer() val copy = restoredSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray))) in shouldBe copy diff --git a/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala b/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala index 066d9cc..c00d1d1 100644 --- a/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala +++ b/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala @@ -4,27 +4,36 @@ import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSche import org.apache.flink.core.memory.{DataInputView, DataOutputView} import org.apache.flink.util.InstantiationUtil -class CollectionSerializerSnapshot[F[_], T, S <: TypeSerializer[F[T]]] extends TypeSerializerSnapshot[F[T]] { - def this(ser: TypeSerializer[T], serClass: Class[S], valueClass: Class[T]) = { - this() - nestedSerializer = ser - clazz = serClass - vclazz = valueClass - } +/** Generic serializer snapshot for collection. + * + * @param nestedSerializer + * the serializer of `T` + * @param clazz + * the class of `S` + * @param vclazz + * the class of `T` + * @tparam F + * the type of the serialized collection + * @tparam T + * the type of the collection's elements + * @tparam S + * the type of the collection serializer + */ +class CollectionSerializerSnapshot[F[_], T, S <: TypeSerializer[F[T]]]( + var nestedSerializer: TypeSerializer[T], + var clazz: Class[S], + var vclazz: Class[T] +) extends TypeSerializerSnapshot[F[T]] { - var nestedSerializer: TypeSerializer[T] = _ - var clazz: Class[S] = _ - var vclazz: Class[T] = _ + // Empty constructor is required to instantiate this class during deserialization. + def this() = this(null, null, null) - override def getCurrentVersion: Int = 1 + override def getCurrentVersion: Int = 2 override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { clazz = InstantiationUtil.resolveClassByName[S](in, userCodeClassLoader) vclazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader) - val snapClass = InstantiationUtil.resolveClassByName[TypeSerializerSnapshot[T]](in, userCodeClassLoader) - val nestedSnapshot = InstantiationUtil.instantiate(snapClass) - nestedSnapshot.readSnapshot(nestedSnapshot.getCurrentVersion, in, userCodeClassLoader) - nestedSerializer = nestedSnapshot.restoreSerializer() + nestedSerializer = TypeSerializerSnapshot.readVersionedSnapshot[T](in, userCodeClassLoader).restoreSerializer() } override def writeSnapshot(out: DataOutputView): Unit = { @@ -40,17 +49,16 @@ class CollectionSerializerSnapshot[F[_], T, S <: TypeSerializer[F[T]]] extends T case "boolean" => out.writeUTF("java.lang.Boolean") case other => out.writeUTF(other) } - - out.writeUTF(nestedSerializer.snapshotConfiguration().getClass.getName) - nestedSerializer.snapshotConfiguration().writeSnapshot(out) + TypeSerializerSnapshot.writeVersionedSnapshot(out, nestedSerializer.snapshotConfiguration()) } override def resolveSchemaCompatibility( - newSerializer: TypeSerializerSnapshot[F[T]] + oldSerializerSnapshot: TypeSerializerSnapshot[F[T]] ): TypeSerializerSchemaCompatibility[F[T]] = TypeSerializerSchemaCompatibility.compatibleAsIs() override def restoreSerializer(): TypeSerializer[F[T]] = { val constructor = clazz.getConstructors()(0) constructor.newInstance(nestedSerializer, vclazz).asInstanceOf[TypeSerializer[F[T]]] } + } diff --git a/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala b/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala index a07648b..21dcb99 100644 --- a/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala +++ b/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala @@ -1,12 +1,10 @@ package org.apache.flinkx.api.serializer -import org.apache.flinkx.api.serializer.CoproductSerializer.CoproductSerializerSnapshot import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} import org.apache.flink.util.InstantiationUtil - -import scala.annotation.nowarn +import org.apache.flinkx.api.serializer.CoproductSerializer.CoproductSerializerSnapshot class CoproductSerializer[T](subtypeClasses: Array[Class[_]], subtypeSerializers: Array[TypeSerializer[_]]) extends TypeSerializerSingleton[T] { @@ -51,9 +49,10 @@ object CoproductSerializer { var subtypeClasses: Array[Class[_]], var subtypeSerializers: Array[TypeSerializer[_]] ) extends TypeSerializerSnapshot[T] { + + // Empty constructor is required to instantiate this class during deserialization. def this() = this(Array.empty[Class[_]], Array.empty[TypeSerializer[_]]) - @nowarn("msg=dead code") override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { val len = in.readInt() @@ -61,32 +60,28 @@ object CoproductSerializer { .map(_ => InstantiationUtil.resolveClassByName(in, userCodeClassLoader)) .toArray - subtypeSerializers = (0 until len).map { _ => - val clazz = InstantiationUtil.resolveClassByName(in, userCodeClassLoader) - val serializer = InstantiationUtil.instantiate(clazz).asInstanceOf[TypeSerializerSnapshot[_]] - serializer.readSnapshot(serializer.getCurrentVersion, in, userCodeClassLoader) - serializer.restoreSerializer() - }.toArray + subtypeSerializers = (0 until len) + .map(_ => TypeSerializerSnapshot.readVersionedSnapshot(in, userCodeClassLoader).restoreSerializer()) + .toArray } - override def getCurrentVersion: Int = 1 + override def getCurrentVersion: Int = 2 override def writeSnapshot(out: DataOutputView): Unit = { out.writeInt(subtypeClasses.length) subtypeClasses.foreach(c => out.writeUTF(c.getName)) subtypeSerializers.foreach(s => { - val snap = s.snapshotConfiguration() - out.writeUTF(snap.getClass.getName) - snap.writeSnapshot(out) + TypeSerializerSnapshot.writeVersionedSnapshot(out, s.snapshotConfiguration()) }) } override def resolveSchemaCompatibility( - newSerializer: TypeSerializerSnapshot[T] + oldSerializerSnapshot: TypeSerializerSnapshot[T] ): TypeSerializerSchemaCompatibility[T] = TypeSerializerSchemaCompatibility.compatibleAsIs() override def restoreSerializer(): TypeSerializer[T] = new CoproductSerializer[T](subtypeClasses, subtypeSerializers) } + } diff --git a/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala b/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala index d72d3e4..d14e03c 100644 --- a/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala +++ b/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala @@ -2,8 +2,7 @@ package org.apache.flinkx.api.serializer import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} -import org.apache.flink.util.InstantiationUtil -import org.apache.flinkx.api.serializer.MapSerializer._ +import org.apache.flinkx.api.serializer.MapSerializer.* class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends SimpleSerializer[Map[K, V]] { override def createInstance(): Map[K, V] = Map.empty[K, V] @@ -32,33 +31,23 @@ class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends object MapSerializer { case class MapSerializerSnapshot[K, V](var keySerializer: TypeSerializer[K], var valueSerializer: TypeSerializer[V]) extends TypeSerializerSnapshot[Map[K, V]] { + def this() = this(null, null) - override def getCurrentVersion: Int = 1 - override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { - keySerializer = readSerializer[K](in, userCodeClassLoader) - valueSerializer = readSerializer[V](in, userCodeClassLoader) - } + override def getCurrentVersion: Int = 2 - def readSerializer[T](in: DataInputView, userCodeClassLoader: ClassLoader): TypeSerializer[T] = { - val snapClass = InstantiationUtil.resolveClassByName[TypeSerializerSnapshot[T]](in, userCodeClassLoader) - val nestedSnapshot = InstantiationUtil.instantiate(snapClass) - nestedSnapshot.readSnapshot(nestedSnapshot.getCurrentVersion, in, userCodeClassLoader) - nestedSnapshot.restoreSerializer() + override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { + keySerializer = TypeSerializerSnapshot.readVersionedSnapshot[K](in, userCodeClassLoader).restoreSerializer() + valueSerializer = TypeSerializerSnapshot.readVersionedSnapshot[V](in, userCodeClassLoader).restoreSerializer() } override def writeSnapshot(out: DataOutputView): Unit = { - writeSerializer[K](keySerializer, out) - writeSerializer[V](valueSerializer, out) - } - - def writeSerializer[T](nestedSerializer: TypeSerializer[T], out: DataOutputView) = { - out.writeUTF(nestedSerializer.snapshotConfiguration().getClass.getName) - nestedSerializer.snapshotConfiguration().writeSnapshot(out) + TypeSerializerSnapshot.writeVersionedSnapshot(out, keySerializer.snapshotConfiguration()) + TypeSerializerSnapshot.writeVersionedSnapshot(out, valueSerializer.snapshotConfiguration()) } override def resolveSchemaCompatibility( - newSerializer: TypeSerializerSnapshot[Map[K, V]] + oldSerializerSnapshot: TypeSerializerSnapshot[Map[K, V]] ): TypeSerializerSchemaCompatibility[Map[K, V]] = TypeSerializerSchemaCompatibility.compatibleAsIs() override def restoreSerializer(): TypeSerializer[Map[K, V]] = new MapSerializer(keySerializer, valueSerializer) diff --git a/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/MappedSerializer.scala b/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/MappedSerializer.scala index 68554a0..fed9cef 100644 --- a/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/MappedSerializer.scala +++ b/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/MappedSerializer.scala @@ -1,17 +1,9 @@ package org.apache.flinkx.api.serializer -import org.apache.flinkx.api.serializer.MappedSerializer.{MappedSerializerSnapshot, TypeMapper} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.{ - CompositeTypeSerializerSnapshot, - GenericTypeSerializerSnapshot, - SimpleTypeSerializerSnapshot, - TypeSerializer, - TypeSerializerSchemaCompatibility, - TypeSerializerSnapshot -} +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} import org.apache.flink.util.InstantiationUtil +import org.apache.flinkx.api.serializer.MappedSerializer.{MappedSerializerSnapshot, TypeMapper} case class MappedSerializer[A, B](mapper: TypeMapper[A, B], ser: TypeSerializer[B]) extends SimpleSerializer[A] { override def equals(obj: Any): Boolean = ser.equals(obj) @@ -41,34 +33,35 @@ object MappedSerializer { def map(a: A): B def contramap(b: B): A } - class MappedSerializerSnapshot[A, B]() extends TypeSerializerSnapshot[A] { - var mapper: TypeMapper[A, B] = _ - var ser: TypeSerializer[B] = _ - def this(xmapper: TypeMapper[A, B], xser: TypeSerializer[B]) = { - this() - mapper = xmapper - ser = xser - } + + class MappedSerializerSnapshot[A, B]( + var mapper: TypeMapper[A, B], + var ser: TypeSerializer[B] + ) extends TypeSerializerSnapshot[A] { + + // Empty constructor is required to instantiate this class during deserialization. + def this() = this(null, null) override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { val mapperClazz = InstantiationUtil.resolveClassByName[TypeMapper[A, B]](in, userCodeClassLoader) mapper = InstantiationUtil.instantiate(mapperClazz) val serClazz = InstantiationUtil.resolveClassByName[TypeSerializer[B]](in, userCodeClassLoader) - ser = InstantiationUtil.instantiate(serClazz) + ser = TypeSerializerSnapshot.readVersionedSnapshot[B](in, userCodeClassLoader).restoreSerializer() } override def resolveSchemaCompatibility( - newSerializer: TypeSerializerSnapshot[A] + oldSerializerSnapshot: TypeSerializerSnapshot[A] ): TypeSerializerSchemaCompatibility[A] = TypeSerializerSchemaCompatibility.compatibleAsIs() override def writeSnapshot(out: DataOutputView): Unit = { out.writeUTF(mapper.getClass.getName) - out.writeUTF(ser.getClass.getName) + TypeSerializerSnapshot.writeVersionedSnapshot(out, ser.snapshotConfiguration()) } override def restoreSerializer(): TypeSerializer[A] = new MappedSerializer[A, B](mapper, ser) - override def getCurrentVersion: Int = 1 + override def getCurrentVersion: Int = 2 } + } diff --git a/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseObjectSerializer.scala b/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseObjectSerializer.scala index 4094a1c..45052c8 100644 --- a/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseObjectSerializer.scala +++ b/modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseObjectSerializer.scala @@ -1,10 +1,10 @@ package org.apache.flinkx.api.serializer -import org.apache.flinkx.api.serializer.ScalaCaseObjectSerializer.ScalaCaseObjectSerializerSnapshot -import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputView, DataOutputView} import org.apache.flink.util.InstantiationUtil +import org.apache.flinkx.api.serializer.ScalaCaseObjectSerializer.ScalaCaseObjectSerializerSnapshot class ScalaCaseObjectSerializer[T](clazz: Class[T]) extends TypeSerializerSingleton[T] { override def isImmutableType: Boolean = true @@ -37,7 +37,7 @@ object ScalaCaseObjectSerializer { override def getCurrentVersion: Int = 1 override def resolveSchemaCompatibility( - newSerializer: TypeSerializerSnapshot[T] + oldSerializerSnapshot: TypeSerializerSnapshot[T] ): TypeSerializerSchemaCompatibility[T] = TypeSerializerSchemaCompatibility.compatibleAsIs() diff --git a/modules/flink-2-api/src/test/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshotTest.scala b/modules/flink-2-api/src/test/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshotTest.scala new file mode 100644 index 0000000..6480c73 --- /dev/null +++ b/modules/flink-2-api/src/test/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshotTest.scala @@ -0,0 +1,33 @@ +package org.apache.flinkx.api.serializer + +import org.apache.flink.api.common.serialization.SerializerConfigImpl +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} +import org.apache.flink.core.memory.{DataInputDeserializer, DataOutputSerializer} +import org.apache.flinkx.api.serializers.* +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class CollectionSerializerSnapshotTest extends AnyFlatSpec with Matchers { + + it should "serialize then deserialize" in { + val serializerConfig = new SerializerConfigImpl() + // Create SerializerSnapshot + val tSerializer = implicitly[TypeSerializer[String]] + val serializerSnapshot: CollectionSerializerSnapshot[Set, String, SetSerializer[String]] = + new CollectionSerializerSnapshot(tSerializer, classOf[SetSerializer[String]], classOf[String]) + + val expectedSerializer = serializerSnapshot.restoreSerializer() + + // Serialize SerializerSnapshot + val snapshotOutput = new DataOutputSerializer(1024 * 1024) + TypeSerializerSnapshot.writeVersionedSnapshot(snapshotOutput, serializerSnapshot) + val snapshotInput = new DataInputDeserializer(snapshotOutput.getSharedBuffer) + + // Deserialize SerializerSnapshot + val deserializedSnapshot = TypeSerializerSnapshot + .readVersionedSnapshot[SetSerializer[String]](snapshotInput, getClass.getClassLoader) + + deserializedSnapshot.restoreSerializer() should be(expectedSerializer) + } + +} diff --git a/modules/flink-2-api/src/test/scala/org/apache/flinkx/api/serializer/MapSerializerSnapshotTest.scala b/modules/flink-2-api/src/test/scala/org/apache/flinkx/api/serializer/MapSerializerSnapshotTest.scala new file mode 100644 index 0000000..ec03395 --- /dev/null +++ b/modules/flink-2-api/src/test/scala/org/apache/flinkx/api/serializer/MapSerializerSnapshotTest.scala @@ -0,0 +1,178 @@ +package org.apache.flinkx.api.serializer + +import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot} +import org.apache.flink.core.memory.{DataInputDeserializer, DataInputView, DataOutputSerializer, DataOutputView} +import org.apache.flinkx.api.serializer.MapSerializer.MapSerializerSnapshot +import org.apache.flinkx.api.serializer.MapSerializerSnapshotTest.* +import org.apache.flinkx.api.serializers.* +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class MapSerializerSnapshotTest extends AnyFlatSpec with Matchers { + + it should "serialize the old data and deserialize them to the new data" in { + // Old data of the type Map[String, OldClass] + val oldData = Map("1" -> OldClass(1)) + + val keySerializer = implicitly[TypeSerializer[String]] + val oldValueSerializer = new OldOuterTraitSerializer() + + // Create MapSerializerSnapshot + val mapSerializerSnapshot: MapSerializerSnapshot[String, OuterTrait] = + MapSerializerSnapshot(keySerializer, oldValueSerializer) + + val mapSerializer: TypeSerializer[Map[String, OuterTrait]] = mapSerializerSnapshot.restoreSerializer() + + // Serialize MapSerializerSnapshot + val oldOutput = new DataOutputSerializer(1024 * 1024) + TypeSerializerSnapshot.writeVersionedSnapshot(oldOutput, mapSerializerSnapshot) + + // Serialize the old data + mapSerializer.serialize(oldData, oldOutput) + + // Switch to a new version of the code, now the data are expected to be of the type Map[String, NewClass] + MapSerializerSnapshotTest.VersionOfTheCode = 1 + + val oldInput = new DataInputDeserializer(oldOutput.getSharedBuffer) + + // Deserialize MapSerializerSnapshot + val reconfiguredSnapshot: TypeSerializerSnapshot[Map[String, OuterTrait]] = TypeSerializerSnapshot + .readVersionedSnapshot[Map[String, OuterTrait]](oldInput, getClass.getClassLoader) + val reconfiguredMapSnapshot = reconfiguredSnapshot.asInstanceOf[MapSerializerSnapshot[String, OuterTrait]] + reconfiguredMapSnapshot.valueSerializer should be(a[ReconfiguredOuterTraitSerializer]) + val reconfiguredSerializer = reconfiguredSnapshot.restoreSerializer() + + // Deserialize the old data but convert them to the new data + val reconfiguredData = reconfiguredSerializer.deserialize(oldInput) + reconfiguredData should be(Map("1" -> NewClass(1L))) + + // Serialize MapSerializerSnapshot + val newOutput = new DataOutputSerializer(1024 * 1024) + TypeSerializerSnapshot.writeVersionedSnapshot(newOutput, reconfiguredSnapshot) + + // Serialize the new data + reconfiguredSerializer.serialize(reconfiguredData, newOutput) + + val newInput = new DataInputDeserializer(newOutput.getSharedBuffer) + + // Deserialize MapSerializerSnapshot + val newSnapshot: TypeSerializerSnapshot[Map[String, OuterTrait]] = TypeSerializerSnapshot + .readVersionedSnapshot[Map[String, OuterTrait]](newInput, getClass.getClassLoader) + val newMapSnapshot = newSnapshot.asInstanceOf[MapSerializerSnapshot[String, OuterTrait]] + newMapSnapshot.valueSerializer should be(a[NewOuterTraitSerializer]) + + val newSerializer = newSnapshot.restoreSerializer() + + // Deserialize the new data + val newData = newSerializer.deserialize(newInput) + newData should be(Map("1" -> NewClass(1L))) + } + +} + +object MapSerializerSnapshotTest { + + var VersionOfTheCode = 0 + + sealed trait OuterTrait + case class OldClass(a: Int) extends OuterTrait + case class NewClass(a: Long) extends OuterTrait + + class OldOuterTraitSerializer extends SimpleSerializer[OuterTrait] { + + private val intSer = implicitly[TypeSerializer[Int]] + + override def createInstance(): OuterTrait = OldClass(0) + + override def getLength: Int = 4 + + override def serialize(record: OuterTrait, target: DataOutputView): Unit = { + val oc = record.asInstanceOf[OldClass] + intSer.serialize(oc.a, target) + } + + override def deserialize(source: DataInputView): OuterTrait = { + val a = intSer.deserialize(source) + OldClass(a) + } + + override def snapshotConfiguration(): TypeSerializerSnapshot[OuterTrait] = new OuterTraitSerializerSnapshot() + + } + + class ReconfiguredOuterTraitSerializer extends SimpleSerializer[OuterTrait] { + + private val intSer = implicitly[TypeSerializer[Int]] + private val longSer = implicitly[TypeSerializer[Long]] + + override def createInstance(): OuterTrait = NewClass(0L) + + override def getLength: Int = -1 + + override def serialize(record: OuterTrait, target: DataOutputView): Unit = { + val nc = record.asInstanceOf[NewClass] + longSer.serialize(nc.a, target) + } + + override def deserialize(source: DataInputView): OuterTrait = { + val a = intSer.deserialize(source) + NewClass(a.toLong) + } + + override def snapshotConfiguration(): TypeSerializerSnapshot[OuterTrait] = new OuterTraitSerializerSnapshot() + + } + + class NewOuterTraitSerializer extends SimpleSerializer[OuterTrait] { + + private val longSer = implicitly[TypeSerializer[Long]] + + override def createInstance(): OuterTrait = NewClass(0L) + + override def getLength: Int = 8 + + override def serialize(record: OuterTrait, target: DataOutputView): Unit = { + val nc = record.asInstanceOf[NewClass] + longSer.serialize(nc.a, target) + } + + override def deserialize(source: DataInputView): OuterTrait = { + val a = longSer.deserialize(source) + NewClass(a) + } + + override def snapshotConfiguration(): TypeSerializerSnapshot[OuterTrait] = new OuterTraitSerializerSnapshot() + + } + + class OuterTraitSerializerSnapshot extends TypeSerializerSnapshot[OuterTrait] { + + private var ser: TypeSerializer[OuterTrait] = new OldOuterTraitSerializer() + + override def getCurrentVersion: Int = VersionOfTheCode + + override def writeSnapshot(out: DataOutputView): Unit = {} + + override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = { + if (readVersion == 0) { + ser = new ReconfiguredOuterTraitSerializer() + } else { + ser = new NewOuterTraitSerializer() + } + } + + override def restoreSerializer(): TypeSerializer[OuterTrait] = ser + + override def resolveSchemaCompatibility( + oldSerializerSnapshot: TypeSerializerSnapshot[OuterTrait] + ): TypeSerializerSchemaCompatibility[OuterTrait] = { + if (oldSerializerSnapshot.getCurrentVersion == 0) { + TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(new ReconfiguredOuterTraitSerializer()) + } else { + TypeSerializerSchemaCompatibility.compatibleAsIs() + } + } + + } + +} From 98c935723b5ee537e197d567c1bcbf9c31646edc Mon Sep 17 00:00:00 2001 From: adaroussin Date: Fri, 25 Apr 2025 19:13:57 +0200 Subject: [PATCH 2/2] fix tests --- .../flinkx/api/SerializerSnapshotTest.scala | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala b/modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala index c606db7..459540b 100644 --- a/modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala +++ b/modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala @@ -1,23 +1,32 @@ package org.apache.flinkx.api -import org.apache.flink.api.common.serialization.SerializerConfigImpl -import org.apache.flink.api.common.typeinfo.TypeInformation +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import org.apache.flinkx.api.SerializerSnapshotTest.{ + ADT2, + OuterTrait, + SimpleClass1, + SimpleClassArray, + SimpleClassList, + SimpleClassMap1, + SimpleClassMap2, + TraitMap +} import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot} import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} -import org.apache.flink.util.ChildFirstClassLoader -import org.apache.flinkx.api.SerializerSnapshotTest.* -import org.apache.flinkx.api.serializers.* -import org.scalatest.Assertion import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import org.apache.flinkx.api.serializers._ +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.util.ChildFirstClassLoader +import org.scalatest.Assertion -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.net.URLClassLoader class SerializerSnapshotTest extends AnyFlatSpec with Matchers { - def createSerializer[T: TypeInformation]: TypeSerializer[T] = - implicitly[TypeInformation[T]].createSerializer(new SerializerConfigImpl()) + def createSerializer[T: TypeInformation] = + implicitly[TypeInformation[T]].createSerializer(new ExecutionConfig()) it should "roundtrip product serializer snapshot" in { val ser = createSerializer[SimpleClass1]