The problem is that the default CollectionSerializer
of Kryo can not deserialize the collection again, because its not modifiable (the .add()
call fails).
To resolve the issue, we can use the UnmodifiableCollectionsSerializer
from the kryo-serializers project. Flink transitively depends on the project, so there is no need to add it as a dependency.
Next, we have to register the serializer with Flink's Kryo instances.
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);
Usually, we don't have to call Class.forName()
for registering a serializer, but in this case, java.util.Collections$UnmodifiableCollection
is package visible, so we can not directly access the class.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…