Flink Serialization Error

2.8k Views Asked by At

I'm trying to run the Label propagation protocol on my Apache Flink Gelly Graph.
Here is my code:

        Graph<String, Long, String> ugraph = Graph.fromDataSet(vertex, edgeSet, env).getUndirected();
        DataSet<Tuple2<String, Long>> idsWithInitialLabels = DataSetUtils
            .zipWithUniqueId(graph.getVertexIds())
            .map(new MapFunction<Tuple2<Long, String>, Tuple2<String, Long>>() {
                public Tuple2<String, Long> map(Tuple2<Long, String> tuple2) throws Exception {
                    return new Tuple2<String, Long>(tuple2.f1, tuple2.f0);
                }
            }); 
        DataSet<Vertex<String, Long>> verticesWithCommunity = graph.joinWithVertices(idsWithInitialLabels,
            new VertexJoinFunction<Long, Long>() {
            public Long vertexJoin(Long vertexValue, Long inputValue) {
            return inputValue;
        }})
    .run(new LabelPropagation<String, Long, String>(10)); 

I got the following error message:

org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.graph.Graph$ApplyCoGroupToVertexValues@4dde0543 not serializable at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61) at org.apache.flink.api.java.DataSet.clean(DataSet.java:186) at org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets$CoGroupOperatorSetsPredicate$CoGroupOperatorWithoutFunction.with(CoGroupOperator.java:619) at org.apache.flink.graph.Graph.joinWithVertices(Graph.java:587) at tu.master.ConceptDetection.TextProcessor.clustering(TextProcessor.java:405) at tu.master.ConceptDetection.TextProcessor$4.actionPerformed(TextProcessor.java:210)

Thank you for your help :)

1

There are 1 best solutions below

0
On

I'm guessing that the class that contains your graph code is not Serializable. Anonymous classes in Java are really non-static inner classes, meaning they have a reference to the containing class's this (see this answer). If the containing class is not Serializable, the this reference won't serialize and neither will the anonymous class.

That would explain why switching to a lambda expression would make it serialize. Lambda expressions are not anonymous classes, so they don't automatically capture an implicit this reference.

What it doesn't explain is why declaring your MapFunction as an anonymous class still works. If you still have this code, @Nesrine, I'd be curious what the whole class looks like.