I want to compute PageRank
from a CSV file of edges formatted as follows:
12,13,1.0
12,14,1.0
12,15,1.0
12,16,1.0
12,17,1.0
...
My code:
var filename = "<filename>.csv"
val graph = Graph.fromCsvReader[Long,Double,Double](
env = env,
pathEdges = filename,
readVertices = false,
hasEdgeValues = true,
vertexValueInitializer = new MapFunction[Long, Double] {
def map(id: Long): Double = 0.0 } )
val ranks = new PageRank[Long](0.85, 20).run(graph)
I get the following error from the Flink Scala Shell:
error: type mismatch;
found : org.apache.flink.graph.scala.Graph[Long,_23,_24] where type _24 >: Double with _22, type _23 >: Double with _21
required: org.apache.flink.graph.Graph[Long,Double,Double]
val ranks = new PageRank[Long](0.85, 20).run(graph)
^
What am I doing wrong?
( And are the initial values 0.0 for every vertex and 1.0 for every edge correct? )
The problem is that you're giving the Scala
org.apache.flink.graph.scala.Graph
toPageRank.run
which expects the Javaorg.apache.flink.graph.Graph
.In order to run a
GraphAlgorithm
for a ScalaGraph
object, you have to call therun
method of the ScalaGraph
with theGraphAlgorithm
.Update
In the case of the
PageRank
algorithm it is important to note that the algorithm expects an instance of typeGraph[K, java.lang.Double, java.lang.Double]
. Since Java'sDouble
type is different from Scala'sDouble
type (in terms of type checking), this has to be accounted for.For the example code this means