Skip to content

Handle state evolution without breaking state compatibility #245

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import org.apache.flinkx.api.SerializerSnapshotTest.{
SimpleClassMap2,
TraitMap
}
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -90,11 +89,11 @@ class SerializerSnapshotTest extends AnyFlatSpec with Matchers {
val snap = ser.snapshotConfiguration()
val buffer = new ByteArrayOutputStream()
val output = new DataOutputViewStreamWrapper(buffer)
snap.writeSnapshot(output)
TypeSerializerSnapshot.writeVersionedSnapshot(output, snap)
output.close()
val input = new DataInputViewStreamWrapper(new ByteArrayInputStream(buffer.toByteArray))
snap.readSnapshot(ser.snapshotConfiguration().getCurrentVersion, input, cl)
snap.restoreSerializer()
val deserSnap = TypeSerializerSnapshot.readVersionedSnapshot[T](input, cl)
deserSnap.restoreSerializer()
}

def assertRoundtripSerializer[T](ser: TypeSerializer[T]): Assertion = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.apache.flinkx.api

import org.apache.flinkx.api.serializer.ScalaCaseClassSerializer
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
import org.scalatest.{Assertion, Inspectors}
Expand All @@ -14,14 +14,10 @@ trait TestUtils extends Matchers with Inspectors {
val out = new ByteArrayOutputStream()
ser.serialize(in, new DataOutputViewStreamWrapper(out))
val snapBytes = new ByteArrayOutputStream()
ser.snapshotConfiguration().writeSnapshot(new DataOutputViewStreamWrapper(snapBytes))
val restoredSnapshot = ser.snapshotConfiguration()
restoredSnapshot
.readSnapshot(
restoredSnapshot.getCurrentVersion,
new DataInputViewStreamWrapper(new ByteArrayInputStream(snapBytes.toByteArray)),
ser.getClass.getClassLoader
)
TypeSerializerSnapshot.writeVersionedSnapshot(new DataOutputViewStreamWrapper(snapBytes), ser.snapshotConfiguration())
val restoredSnapshot = TypeSerializerSnapshot.readVersionedSnapshot[T](
new DataInputViewStreamWrapper(new ByteArrayInputStream(snapBytes.toByteArray)),
ser.getClass.getClassLoader)
val restoredSerializer = restoredSnapshot.restoreSerializer()
val copy = restoredSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray)))
in shouldBe copy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,36 @@ import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSche
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.InstantiationUtil

class CollectionSerializerSnapshot[F[_], T, S <: TypeSerializer[F[T]]] extends TypeSerializerSnapshot[F[T]] {
def this(ser: TypeSerializer[T], serClass: Class[S], valueClass: Class[T]) = {
this()
nestedSerializer = ser
clazz = serClass
vclazz = valueClass
}
/** Generic serializer snapshot for collection.
*
* @param nestedSerializer
* the serializer of `T`
* @param clazz
* the class of `S`
* @param vclazz
* the class of `T`
* @tparam F
* the type of the serialized collection
* @tparam T
* the type of the collection's elements
* @tparam S
* the type of the collection serializer
*/
class CollectionSerializerSnapshot[F[_], T, S <: TypeSerializer[F[T]]](
var nestedSerializer: TypeSerializer[T],
var clazz: Class[S],
var vclazz: Class[T]
) extends TypeSerializerSnapshot[F[T]] {

var nestedSerializer: TypeSerializer[T] = _
var clazz: Class[S] = _
var vclazz: Class[T] = _
// Empty constructor is required to instantiate this class during deserialization.
def this() = this(null, null, null)

override def getCurrentVersion: Int = 1
override def getCurrentVersion: Int = 2

override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
clazz = InstantiationUtil.resolveClassByName[S](in, userCodeClassLoader)
vclazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader)
val snapClass = InstantiationUtil.resolveClassByName[TypeSerializerSnapshot[T]](in, userCodeClassLoader)
val nestedSnapshot = InstantiationUtil.instantiate(snapClass)
nestedSnapshot.readSnapshot(nestedSnapshot.getCurrentVersion, in, userCodeClassLoader)
nestedSerializer = nestedSnapshot.restoreSerializer()
nestedSerializer = TypeSerializerSnapshot.readVersionedSnapshot[T](in, userCodeClassLoader).restoreSerializer()
}

override def writeSnapshot(out: DataOutputView): Unit = {
Expand All @@ -40,17 +49,16 @@ class CollectionSerializerSnapshot[F[_], T, S <: TypeSerializer[F[T]]] extends T
case "boolean" => out.writeUTF("java.lang.Boolean")
case other => out.writeUTF(other)
}

out.writeUTF(nestedSerializer.snapshotConfiguration().getClass.getName)
nestedSerializer.snapshotConfiguration().writeSnapshot(out)
TypeSerializerSnapshot.writeVersionedSnapshot(out, nestedSerializer.snapshotConfiguration())
}

override def resolveSchemaCompatibility(
newSerializer: TypeSerializerSnapshot[F[T]]
oldSerializerSnapshot: TypeSerializerSnapshot[F[T]]
): TypeSerializerSchemaCompatibility[F[T]] = TypeSerializerSchemaCompatibility.compatibleAsIs()

override def restoreSerializer(): TypeSerializer[F[T]] = {
val constructor = clazz.getConstructors()(0)
constructor.newInstance(nestedSerializer, vclazz).asInstanceOf[TypeSerializer[F[T]]]
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package org.apache.flinkx.api.serializer

import org.apache.flinkx.api.serializer.CoproductSerializer.CoproductSerializerSnapshot
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.InstantiationUtil

import scala.annotation.nowarn
import org.apache.flinkx.api.serializer.CoproductSerializer.CoproductSerializerSnapshot

class CoproductSerializer[T](subtypeClasses: Array[Class[_]], subtypeSerializers: Array[TypeSerializer[_]])
extends TypeSerializerSingleton[T] {
Expand Down Expand Up @@ -51,42 +49,39 @@ object CoproductSerializer {
var subtypeClasses: Array[Class[_]],
var subtypeSerializers: Array[TypeSerializer[_]]
) extends TypeSerializerSnapshot[T] {

// Empty constructor is required to instantiate this class during deserialization.
def this() = this(Array.empty[Class[_]], Array.empty[TypeSerializer[_]])

@nowarn("msg=dead code")
override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
val len = in.readInt()

subtypeClasses = (0 until len)
.map(_ => InstantiationUtil.resolveClassByName(in, userCodeClassLoader))
.toArray

subtypeSerializers = (0 until len).map { _ =>
val clazz = InstantiationUtil.resolveClassByName(in, userCodeClassLoader)
val serializer = InstantiationUtil.instantiate(clazz).asInstanceOf[TypeSerializerSnapshot[_]]
serializer.readSnapshot(serializer.getCurrentVersion, in, userCodeClassLoader)
serializer.restoreSerializer()
}.toArray
subtypeSerializers = (0 until len)
.map(_ => TypeSerializerSnapshot.readVersionedSnapshot(in, userCodeClassLoader).restoreSerializer())
.toArray
}

override def getCurrentVersion: Int = 1
override def getCurrentVersion: Int = 2

override def writeSnapshot(out: DataOutputView): Unit = {
out.writeInt(subtypeClasses.length)
subtypeClasses.foreach(c => out.writeUTF(c.getName))
subtypeSerializers.foreach(s => {
val snap = s.snapshotConfiguration()
out.writeUTF(snap.getClass.getName)
snap.writeSnapshot(out)
TypeSerializerSnapshot.writeVersionedSnapshot(out, s.snapshotConfiguration())
})
}

override def resolveSchemaCompatibility(
newSerializer: TypeSerializerSnapshot[T]
oldSerializerSnapshot: TypeSerializerSnapshot[T]
): TypeSerializerSchemaCompatibility[T] =
TypeSerializerSchemaCompatibility.compatibleAsIs()

override def restoreSerializer(): TypeSerializer[T] =
new CoproductSerializer[T](subtypeClasses, subtypeSerializers)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.InstantiationUtil
import org.apache.flinkx.api.serializer.MapSerializer._
import org.apache.flinkx.api.serializer.MapSerializer.*

class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends SimpleSerializer[Map[K, V]] {
override def createInstance(): Map[K, V] = Map.empty[K, V]
Expand Down Expand Up @@ -32,33 +31,23 @@ class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends
object MapSerializer {
case class MapSerializerSnapshot[K, V](var keySerializer: TypeSerializer[K], var valueSerializer: TypeSerializer[V])
extends TypeSerializerSnapshot[Map[K, V]] {

def this() = this(null, null)
override def getCurrentVersion: Int = 1

override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
keySerializer = readSerializer[K](in, userCodeClassLoader)
valueSerializer = readSerializer[V](in, userCodeClassLoader)
}
override def getCurrentVersion: Int = 2

def readSerializer[T](in: DataInputView, userCodeClassLoader: ClassLoader): TypeSerializer[T] = {
val snapClass = InstantiationUtil.resolveClassByName[TypeSerializerSnapshot[T]](in, userCodeClassLoader)
val nestedSnapshot = InstantiationUtil.instantiate(snapClass)
nestedSnapshot.readSnapshot(nestedSnapshot.getCurrentVersion, in, userCodeClassLoader)
nestedSnapshot.restoreSerializer()
override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
keySerializer = TypeSerializerSnapshot.readVersionedSnapshot[K](in, userCodeClassLoader).restoreSerializer()
valueSerializer = TypeSerializerSnapshot.readVersionedSnapshot[V](in, userCodeClassLoader).restoreSerializer()
}

override def writeSnapshot(out: DataOutputView): Unit = {
writeSerializer[K](keySerializer, out)
writeSerializer[V](valueSerializer, out)
}

def writeSerializer[T](nestedSerializer: TypeSerializer[T], out: DataOutputView) = {
out.writeUTF(nestedSerializer.snapshotConfiguration().getClass.getName)
nestedSerializer.snapshotConfiguration().writeSnapshot(out)
TypeSerializerSnapshot.writeVersionedSnapshot(out, keySerializer.snapshotConfiguration())
TypeSerializerSnapshot.writeVersionedSnapshot(out, valueSerializer.snapshotConfiguration())
}

override def resolveSchemaCompatibility(
newSerializer: TypeSerializerSnapshot[Map[K, V]]
oldSerializerSnapshot: TypeSerializerSnapshot[Map[K, V]]
): TypeSerializerSchemaCompatibility[Map[K, V]] = TypeSerializerSchemaCompatibility.compatibleAsIs()

override def restoreSerializer(): TypeSerializer[Map[K, V]] = new MapSerializer(keySerializer, valueSerializer)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
package org.apache.flinkx.api.serializer

import org.apache.flinkx.api.serializer.MappedSerializer.{MappedSerializerSnapshot, TypeMapper}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.{
CompositeTypeSerializerSnapshot,
GenericTypeSerializerSnapshot,
SimpleTypeSerializerSnapshot,
TypeSerializer,
TypeSerializerSchemaCompatibility,
TypeSerializerSnapshot
}
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.InstantiationUtil
import org.apache.flinkx.api.serializer.MappedSerializer.{MappedSerializerSnapshot, TypeMapper}

case class MappedSerializer[A, B](mapper: TypeMapper[A, B], ser: TypeSerializer[B]) extends SimpleSerializer[A] {
override def equals(obj: Any): Boolean = ser.equals(obj)
Expand Down Expand Up @@ -41,34 +33,35 @@ object MappedSerializer {
def map(a: A): B
def contramap(b: B): A
}
class MappedSerializerSnapshot[A, B]() extends TypeSerializerSnapshot[A] {
var mapper: TypeMapper[A, B] = _
var ser: TypeSerializer[B] = _
def this(xmapper: TypeMapper[A, B], xser: TypeSerializer[B]) = {
this()
mapper = xmapper
ser = xser
}

class MappedSerializerSnapshot[A, B](
var mapper: TypeMapper[A, B],
var ser: TypeSerializer[B]
) extends TypeSerializerSnapshot[A] {

// Empty constructor is required to instantiate this class during deserialization.
def this() = this(null, null)

override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
val mapperClazz = InstantiationUtil.resolveClassByName[TypeMapper[A, B]](in, userCodeClassLoader)
mapper = InstantiationUtil.instantiate(mapperClazz)
val serClazz = InstantiationUtil.resolveClassByName[TypeSerializer[B]](in, userCodeClassLoader)
ser = InstantiationUtil.instantiate(serClazz)
ser = TypeSerializerSnapshot.readVersionedSnapshot[B](in, userCodeClassLoader).restoreSerializer()
}

override def resolveSchemaCompatibility(
newSerializer: TypeSerializerSnapshot[A]
oldSerializerSnapshot: TypeSerializerSnapshot[A]
): TypeSerializerSchemaCompatibility[A] =
TypeSerializerSchemaCompatibility.compatibleAsIs()

override def writeSnapshot(out: DataOutputView): Unit = {
out.writeUTF(mapper.getClass.getName)
out.writeUTF(ser.getClass.getName)
TypeSerializerSnapshot.writeVersionedSnapshot(out, ser.snapshotConfiguration())
}

override def restoreSerializer(): TypeSerializer[A] = new MappedSerializer[A, B](mapper, ser)

override def getCurrentVersion: Int = 1
override def getCurrentVersion: Int = 2
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.apache.flinkx.api.serializer

import org.apache.flinkx.api.serializer.ScalaCaseObjectSerializer.ScalaCaseObjectSerializerSnapshot
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.InstantiationUtil
import org.apache.flinkx.api.serializer.ScalaCaseObjectSerializer.ScalaCaseObjectSerializerSnapshot

class ScalaCaseObjectSerializer[T](clazz: Class[T]) extends TypeSerializerSingleton[T] {
override def isImmutableType: Boolean = true
Expand Down Expand Up @@ -37,7 +37,7 @@ object ScalaCaseObjectSerializer {

override def getCurrentVersion: Int = 1
override def resolveSchemaCompatibility(
newSerializer: TypeSerializerSnapshot[T]
oldSerializerSnapshot: TypeSerializerSnapshot[T]
): TypeSerializerSchemaCompatibility[T] =
TypeSerializerSchemaCompatibility.compatibleAsIs()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.apache.flinkx.api.serializer

import org.apache.flink.api.common.serialization.SerializerConfigImpl
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
import org.apache.flink.core.memory.{DataInputDeserializer, DataOutputSerializer}
import org.apache.flinkx.api.serializers.*
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class CollectionSerializerSnapshotTest extends AnyFlatSpec with Matchers {

it should "serialize then deserialize" in {
val serializerConfig = new SerializerConfigImpl()
// Create SerializerSnapshot
val tSerializer = implicitly[TypeSerializer[String]]
val serializerSnapshot: CollectionSerializerSnapshot[Set, String, SetSerializer[String]] =
new CollectionSerializerSnapshot(tSerializer, classOf[SetSerializer[String]], classOf[String])

val expectedSerializer = serializerSnapshot.restoreSerializer()

// Serialize SerializerSnapshot
val snapshotOutput = new DataOutputSerializer(1024 * 1024)
TypeSerializerSnapshot.writeVersionedSnapshot(snapshotOutput, serializerSnapshot)
val snapshotInput = new DataInputDeserializer(snapshotOutput.getSharedBuffer)

// Deserialize SerializerSnapshot
val deserializedSnapshot = TypeSerializerSnapshot
.readVersionedSnapshot[SetSerializer[String]](snapshotInput, getClass.getClassLoader)

deserializedSnapshot.restoreSerializer() should be(expectedSerializer)
}

}
Loading