Flink: PageRank type mismatch error

195 Views Asked by At

I want to compute PageRank from a CSV file of edges formatted as follows:

12,13,1.0
12,14,1.0
12,15,1.0
12,16,1.0
12,17,1.0
...

My code:

var filename = "<filename>.csv"

val graph = Graph.fromCsvReader[Long,Double,Double]( 
                   env = env, 
                   pathEdges = filename, 
                   readVertices = false, 
                   hasEdgeValues = true, 
                   vertexValueInitializer = new MapFunction[Long, Double] { 
                           def map(id: Long): Double = 0.0 } )

val ranks = new PageRank[Long](0.85, 20).run(graph)

I get the following error from the Flink Scala Shell:

error: type mismatch;
 found   : org.apache.flink.graph.scala.Graph[Long,_23,_24] where type _24 >: Double with _22, type _23 >: Double with _21
 required: org.apache.flink.graph.Graph[Long,Double,Double]
            val ranks = new PageRank[Long](0.85, 20).run(graph)
                                                         ^

What am I doing wrong?

( And are the initial values 0.0 for every vertex and 1.0 for every edge correct? )

1

There are 1 best solutions below

11
On BEST ANSWER

The problem is that you're giving the Scala org.apache.flink.graph.scala.Graph to PageRank.run which expects the Java org.apache.flink.graph.Graph.

In order to run a GraphAlgorithm for a Scala Graph object, you have to call the run method of the Scala Graph with the GraphAlgorithm.

graph.run(new PageRank[Long](0.85, 20))

Update

In the case of the PageRank algorithm it is important to note that the algorithm expects an instance of type Graph[K, java.lang.Double, java.lang.Double]. Since Java's Double type is different from Scala's Double type (in terms of type checking), this has to be accounted for.

For the example code this means

val graph = Graph.fromCsvReader[Long,java.lang.Double,java.lang.Double]( 
  env = env, 
  pathEdges = filename, 
  readVertices = false, 
  hasEdgeValues = true, 
  vertexValueInitializer = new MapFunction[Long, java.lang.Double] { 
         def map(id: Long): java.lang.Double = 0.0 } )
  .asInstanceOf[Graph[Long, java.lang.Double, java.lang.Double]]