Skip to content

Commit 6973146

Browse files
Handle state evolution without breaking state compatibility (#245)
* Handle state evolution without breaking state compatibility * fix tests
1 parent a6848bb commit 6973146

File tree

9 files changed

+284
-93
lines changed

9 files changed

+284
-93
lines changed

modules/flink-1-api/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ import org.apache.flinkx.api.SerializerSnapshotTest.{
1111
SimpleClassMap2,
1212
TraitMap
1313
}
14-
import org.apache.flink.api.common.typeutils.TypeSerializer
15-
import org.apache.flink.api.common.ExecutionConfig
14+
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
1615
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
1716
import org.scalatest.flatspec.AnyFlatSpec
1817
import org.scalatest.matchers.should.Matchers
@@ -90,11 +89,11 @@ class SerializerSnapshotTest extends AnyFlatSpec with Matchers {
9089
val snap = ser.snapshotConfiguration()
9190
val buffer = new ByteArrayOutputStream()
9291
val output = new DataOutputViewStreamWrapper(buffer)
93-
snap.writeSnapshot(output)
92+
TypeSerializerSnapshot.writeVersionedSnapshot(output, snap)
9493
output.close()
9594
val input = new DataInputViewStreamWrapper(new ByteArrayInputStream(buffer.toByteArray))
96-
snap.readSnapshot(ser.snapshotConfiguration().getCurrentVersion, input, cl)
97-
snap.restoreSerializer()
95+
val deserSnap = TypeSerializerSnapshot.readVersionedSnapshot[T](input, cl)
96+
deserSnap.restoreSerializer()
9897
}
9998

10099
def assertRoundtripSerializer[T](ser: TypeSerializer[T]): Assertion = {

modules/flink-1-api/src/test/scala/org/apache/flinkx/api/TestUtils.scala

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package org.apache.flinkx.api
22

33
import org.apache.flinkx.api.serializer.ScalaCaseClassSerializer
4-
import org.apache.flink.api.common.typeutils.TypeSerializer
4+
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
55
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
66
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
77
import org.scalatest.{Assertion, Inspectors}
@@ -14,14 +14,10 @@ trait TestUtils extends Matchers with Inspectors {
1414
val out = new ByteArrayOutputStream()
1515
ser.serialize(in, new DataOutputViewStreamWrapper(out))
1616
val snapBytes = new ByteArrayOutputStream()
17-
ser.snapshotConfiguration().writeSnapshot(new DataOutputViewStreamWrapper(snapBytes))
18-
val restoredSnapshot = ser.snapshotConfiguration()
19-
restoredSnapshot
20-
.readSnapshot(
21-
restoredSnapshot.getCurrentVersion,
22-
new DataInputViewStreamWrapper(new ByteArrayInputStream(snapBytes.toByteArray)),
23-
ser.getClass.getClassLoader
24-
)
17+
TypeSerializerSnapshot.writeVersionedSnapshot(new DataOutputViewStreamWrapper(snapBytes), ser.snapshotConfiguration())
18+
val restoredSnapshot = TypeSerializerSnapshot.readVersionedSnapshot[T](
19+
new DataInputViewStreamWrapper(new ByteArrayInputStream(snapBytes.toByteArray)),
20+
ser.getClass.getClassLoader)
2521
val restoredSerializer = restoredSnapshot.restoreSerializer()
2622
val copy = restoredSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(out.toByteArray)))
2723
in shouldBe copy

modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,36 @@ import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSche
44
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
55
import org.apache.flink.util.InstantiationUtil
66

7-
class CollectionSerializerSnapshot[F[_], T, S <: TypeSerializer[F[T]]] extends TypeSerializerSnapshot[F[T]] {
8-
def this(ser: TypeSerializer[T], serClass: Class[S], valueClass: Class[T]) = {
9-
this()
10-
nestedSerializer = ser
11-
clazz = serClass
12-
vclazz = valueClass
13-
}
7+
/** Generic serializer snapshot for collection.
8+
*
9+
* @param nestedSerializer
10+
* the serializer of `T`
11+
* @param clazz
12+
* the class of `S`
13+
* @param vclazz
14+
* the class of `T`
15+
* @tparam F
16+
* the type of the serialized collection
17+
* @tparam T
18+
* the type of the collection's elements
19+
* @tparam S
20+
* the type of the collection serializer
21+
*/
22+
class CollectionSerializerSnapshot[F[_], T, S <: TypeSerializer[F[T]]](
23+
var nestedSerializer: TypeSerializer[T],
24+
var clazz: Class[S],
25+
var vclazz: Class[T]
26+
) extends TypeSerializerSnapshot[F[T]] {
1427

15-
var nestedSerializer: TypeSerializer[T] = _
16-
var clazz: Class[S] = _
17-
var vclazz: Class[T] = _
28+
// Empty constructor is required to instantiate this class during deserialization.
29+
def this() = this(null, null, null)
1830

19-
override def getCurrentVersion: Int = 1
31+
override def getCurrentVersion: Int = 2
2032

2133
override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
2234
clazz = InstantiationUtil.resolveClassByName[S](in, userCodeClassLoader)
2335
vclazz = InstantiationUtil.resolveClassByName[T](in, userCodeClassLoader)
24-
val snapClass = InstantiationUtil.resolveClassByName[TypeSerializerSnapshot[T]](in, userCodeClassLoader)
25-
val nestedSnapshot = InstantiationUtil.instantiate(snapClass)
26-
nestedSnapshot.readSnapshot(nestedSnapshot.getCurrentVersion, in, userCodeClassLoader)
27-
nestedSerializer = nestedSnapshot.restoreSerializer()
36+
nestedSerializer = TypeSerializerSnapshot.readVersionedSnapshot[T](in, userCodeClassLoader).restoreSerializer()
2837
}
2938

3039
override def writeSnapshot(out: DataOutputView): Unit = {
@@ -40,17 +49,16 @@ class CollectionSerializerSnapshot[F[_], T, S <: TypeSerializer[F[T]]] extends T
4049
case "boolean" => out.writeUTF("java.lang.Boolean")
4150
case other => out.writeUTF(other)
4251
}
43-
44-
out.writeUTF(nestedSerializer.snapshotConfiguration().getClass.getName)
45-
nestedSerializer.snapshotConfiguration().writeSnapshot(out)
52+
TypeSerializerSnapshot.writeVersionedSnapshot(out, nestedSerializer.snapshotConfiguration())
4653
}
4754

4855
override def resolveSchemaCompatibility(
49-
newSerializer: TypeSerializerSnapshot[F[T]]
56+
oldSerializerSnapshot: TypeSerializerSnapshot[F[T]]
5057
): TypeSerializerSchemaCompatibility[F[T]] = TypeSerializerSchemaCompatibility.compatibleAsIs()
5158

5259
override def restoreSerializer(): TypeSerializer[F[T]] = {
5360
val constructor = clazz.getConstructors()(0)
5461
constructor.newInstance(nestedSerializer, vclazz).asInstanceOf[TypeSerializer[F[T]]]
5562
}
63+
5664
}

modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package org.apache.flinkx.api.serializer
22

3-
import org.apache.flinkx.api.serializer.CoproductSerializer.CoproductSerializerSnapshot
43
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
54
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
65
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
76
import org.apache.flink.util.InstantiationUtil
8-
9-
import scala.annotation.nowarn
7+
import org.apache.flinkx.api.serializer.CoproductSerializer.CoproductSerializerSnapshot
108

119
class CoproductSerializer[T](subtypeClasses: Array[Class[_]], subtypeSerializers: Array[TypeSerializer[_]])
1210
extends TypeSerializerSingleton[T] {
@@ -51,42 +49,39 @@ object CoproductSerializer {
5149
var subtypeClasses: Array[Class[_]],
5250
var subtypeSerializers: Array[TypeSerializer[_]]
5351
) extends TypeSerializerSnapshot[T] {
52+
53+
// Empty constructor is required to instantiate this class during deserialization.
5454
def this() = this(Array.empty[Class[_]], Array.empty[TypeSerializer[_]])
5555

56-
@nowarn("msg=dead code")
5756
override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
5857
val len = in.readInt()
5958

6059
subtypeClasses = (0 until len)
6160
.map(_ => InstantiationUtil.resolveClassByName(in, userCodeClassLoader))
6261
.toArray
6362

64-
subtypeSerializers = (0 until len).map { _ =>
65-
val clazz = InstantiationUtil.resolveClassByName(in, userCodeClassLoader)
66-
val serializer = InstantiationUtil.instantiate(clazz).asInstanceOf[TypeSerializerSnapshot[_]]
67-
serializer.readSnapshot(serializer.getCurrentVersion, in, userCodeClassLoader)
68-
serializer.restoreSerializer()
69-
}.toArray
63+
subtypeSerializers = (0 until len)
64+
.map(_ => TypeSerializerSnapshot.readVersionedSnapshot(in, userCodeClassLoader).restoreSerializer())
65+
.toArray
7066
}
7167

72-
override def getCurrentVersion: Int = 1
68+
override def getCurrentVersion: Int = 2
7369

7470
override def writeSnapshot(out: DataOutputView): Unit = {
7571
out.writeInt(subtypeClasses.length)
7672
subtypeClasses.foreach(c => out.writeUTF(c.getName))
7773
subtypeSerializers.foreach(s => {
78-
val snap = s.snapshotConfiguration()
79-
out.writeUTF(snap.getClass.getName)
80-
snap.writeSnapshot(out)
74+
TypeSerializerSnapshot.writeVersionedSnapshot(out, s.snapshotConfiguration())
8175
})
8276
}
8377

8478
override def resolveSchemaCompatibility(
85-
newSerializer: TypeSerializerSnapshot[T]
79+
oldSerializerSnapshot: TypeSerializerSnapshot[T]
8680
): TypeSerializerSchemaCompatibility[T] =
8781
TypeSerializerSchemaCompatibility.compatibleAsIs()
8882

8983
override def restoreSerializer(): TypeSerializer[T] =
9084
new CoproductSerializer[T](subtypeClasses, subtypeSerializers)
9185
}
86+
9287
}

modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ package org.apache.flinkx.api.serializer
22

33
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
44
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
5-
import org.apache.flink.util.InstantiationUtil
6-
import org.apache.flinkx.api.serializer.MapSerializer._
5+
import org.apache.flinkx.api.serializer.MapSerializer.*
76

87
class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends SimpleSerializer[Map[K, V]] {
98
override def createInstance(): Map[K, V] = Map.empty[K, V]
@@ -32,33 +31,23 @@ class MapSerializer[K, V](ks: TypeSerializer[K], vs: TypeSerializer[V]) extends
3231
object MapSerializer {
3332
case class MapSerializerSnapshot[K, V](var keySerializer: TypeSerializer[K], var valueSerializer: TypeSerializer[V])
3433
extends TypeSerializerSnapshot[Map[K, V]] {
34+
3535
def this() = this(null, null)
36-
override def getCurrentVersion: Int = 1
3736

38-
override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
39-
keySerializer = readSerializer[K](in, userCodeClassLoader)
40-
valueSerializer = readSerializer[V](in, userCodeClassLoader)
41-
}
37+
override def getCurrentVersion: Int = 2
4238

43-
def readSerializer[T](in: DataInputView, userCodeClassLoader: ClassLoader): TypeSerializer[T] = {
44-
val snapClass = InstantiationUtil.resolveClassByName[TypeSerializerSnapshot[T]](in, userCodeClassLoader)
45-
val nestedSnapshot = InstantiationUtil.instantiate(snapClass)
46-
nestedSnapshot.readSnapshot(nestedSnapshot.getCurrentVersion, in, userCodeClassLoader)
47-
nestedSnapshot.restoreSerializer()
39+
override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
40+
keySerializer = TypeSerializerSnapshot.readVersionedSnapshot[K](in, userCodeClassLoader).restoreSerializer()
41+
valueSerializer = TypeSerializerSnapshot.readVersionedSnapshot[V](in, userCodeClassLoader).restoreSerializer()
4842
}
4943

5044
override def writeSnapshot(out: DataOutputView): Unit = {
51-
writeSerializer[K](keySerializer, out)
52-
writeSerializer[V](valueSerializer, out)
53-
}
54-
55-
def writeSerializer[T](nestedSerializer: TypeSerializer[T], out: DataOutputView) = {
56-
out.writeUTF(nestedSerializer.snapshotConfiguration().getClass.getName)
57-
nestedSerializer.snapshotConfiguration().writeSnapshot(out)
45+
TypeSerializerSnapshot.writeVersionedSnapshot(out, keySerializer.snapshotConfiguration())
46+
TypeSerializerSnapshot.writeVersionedSnapshot(out, valueSerializer.snapshotConfiguration())
5847
}
5948

6049
override def resolveSchemaCompatibility(
61-
newSerializer: TypeSerializerSnapshot[Map[K, V]]
50+
oldSerializerSnapshot: TypeSerializerSnapshot[Map[K, V]]
6251
): TypeSerializerSchemaCompatibility[Map[K, V]] = TypeSerializerSchemaCompatibility.compatibleAsIs()
6352

6453
override def restoreSerializer(): TypeSerializer[Map[K, V]] = new MapSerializer(keySerializer, valueSerializer)
Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,9 @@
11
package org.apache.flinkx.api.serializer
22

3-
import org.apache.flinkx.api.serializer.MappedSerializer.{MappedSerializerSnapshot, TypeMapper}
4-
import org.apache.flink.api.common.typeinfo.TypeInformation
5-
import org.apache.flink.api.common.typeutils.{
6-
CompositeTypeSerializerSnapshot,
7-
GenericTypeSerializerSnapshot,
8-
SimpleTypeSerializerSnapshot,
9-
TypeSerializer,
10-
TypeSerializerSchemaCompatibility,
11-
TypeSerializerSnapshot
12-
}
3+
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
134
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
145
import org.apache.flink.util.InstantiationUtil
6+
import org.apache.flinkx.api.serializer.MappedSerializer.{MappedSerializerSnapshot, TypeMapper}
157

168
case class MappedSerializer[A, B](mapper: TypeMapper[A, B], ser: TypeSerializer[B]) extends SimpleSerializer[A] {
179
override def equals(obj: Any): Boolean = ser.equals(obj)
@@ -41,34 +33,35 @@ object MappedSerializer {
4133
def map(a: A): B
4234
def contramap(b: B): A
4335
}
44-
class MappedSerializerSnapshot[A, B]() extends TypeSerializerSnapshot[A] {
45-
var mapper: TypeMapper[A, B] = _
46-
var ser: TypeSerializer[B] = _
47-
def this(xmapper: TypeMapper[A, B], xser: TypeSerializer[B]) = {
48-
this()
49-
mapper = xmapper
50-
ser = xser
51-
}
36+
37+
class MappedSerializerSnapshot[A, B](
38+
var mapper: TypeMapper[A, B],
39+
var ser: TypeSerializer[B]
40+
) extends TypeSerializerSnapshot[A] {
41+
42+
// Empty constructor is required to instantiate this class during deserialization.
43+
def this() = this(null, null)
5244

5345
override def readSnapshot(readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
5446
val mapperClazz = InstantiationUtil.resolveClassByName[TypeMapper[A, B]](in, userCodeClassLoader)
5547
mapper = InstantiationUtil.instantiate(mapperClazz)
5648
val serClazz = InstantiationUtil.resolveClassByName[TypeSerializer[B]](in, userCodeClassLoader)
57-
ser = InstantiationUtil.instantiate(serClazz)
49+
ser = TypeSerializerSnapshot.readVersionedSnapshot[B](in, userCodeClassLoader).restoreSerializer()
5850
}
5951

6052
override def resolveSchemaCompatibility(
61-
newSerializer: TypeSerializerSnapshot[A]
53+
oldSerializerSnapshot: TypeSerializerSnapshot[A]
6254
): TypeSerializerSchemaCompatibility[A] =
6355
TypeSerializerSchemaCompatibility.compatibleAsIs()
6456

6557
override def writeSnapshot(out: DataOutputView): Unit = {
6658
out.writeUTF(mapper.getClass.getName)
67-
out.writeUTF(ser.getClass.getName)
59+
TypeSerializerSnapshot.writeVersionedSnapshot(out, ser.snapshotConfiguration())
6860
}
6961

7062
override def restoreSerializer(): TypeSerializer[A] = new MappedSerializer[A, B](mapper, ser)
7163

72-
override def getCurrentVersion: Int = 1
64+
override def getCurrentVersion: Int = 2
7365
}
66+
7467
}

modules/flink-2-api/src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseObjectSerializer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package org.apache.flinkx.api.serializer
22

3-
import org.apache.flinkx.api.serializer.ScalaCaseObjectSerializer.ScalaCaseObjectSerializerSnapshot
4-
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
53
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
4+
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
65
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
76
import org.apache.flink.util.InstantiationUtil
7+
import org.apache.flinkx.api.serializer.ScalaCaseObjectSerializer.ScalaCaseObjectSerializerSnapshot
88

99
class ScalaCaseObjectSerializer[T](clazz: Class[T]) extends TypeSerializerSingleton[T] {
1010
override def isImmutableType: Boolean = true
@@ -37,7 +37,7 @@ object ScalaCaseObjectSerializer {
3737

3838
override def getCurrentVersion: Int = 1
3939
override def resolveSchemaCompatibility(
40-
newSerializer: TypeSerializerSnapshot[T]
40+
oldSerializerSnapshot: TypeSerializerSnapshot[T]
4141
): TypeSerializerSchemaCompatibility[T] =
4242
TypeSerializerSchemaCompatibility.compatibleAsIs()
4343

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.apache.flinkx.api.serializer
2+
3+
import org.apache.flink.api.common.serialization.SerializerConfigImpl
4+
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
5+
import org.apache.flink.core.memory.{DataInputDeserializer, DataOutputSerializer}
6+
import org.apache.flinkx.api.serializers.*
7+
import org.scalatest.flatspec.AnyFlatSpec
8+
import org.scalatest.matchers.should.Matchers
9+
10+
class CollectionSerializerSnapshotTest extends AnyFlatSpec with Matchers {
11+
12+
it should "serialize then deserialize" in {
13+
val serializerConfig = new SerializerConfigImpl()
14+
// Create SerializerSnapshot
15+
val tSerializer = implicitly[TypeSerializer[String]]
16+
val serializerSnapshot: CollectionSerializerSnapshot[Set, String, SetSerializer[String]] =
17+
new CollectionSerializerSnapshot(tSerializer, classOf[SetSerializer[String]], classOf[String])
18+
19+
val expectedSerializer = serializerSnapshot.restoreSerializer()
20+
21+
// Serialize SerializerSnapshot
22+
val snapshotOutput = new DataOutputSerializer(1024 * 1024)
23+
TypeSerializerSnapshot.writeVersionedSnapshot(snapshotOutput, serializerSnapshot)
24+
val snapshotInput = new DataInputDeserializer(snapshotOutput.getSharedBuffer)
25+
26+
// Deserialize SerializerSnapshot
27+
val deserializedSnapshot = TypeSerializerSnapshot
28+
.readVersionedSnapshot[SetSerializer[String]](snapshotInput, getClass.getClassLoader)
29+
30+
deserializedSnapshot.restoreSerializer() should be(expectedSerializer)
31+
}
32+
33+
}

0 commit comments

Comments
 (0)