Graphframes PageRank performance: PySpark vs sparklyr

2k Views Asked by At

I am using Spark/GraphFrames from Python and from R. When I call PageRank on a small graph from Python, it is a lot slower than with R. Why is it so much slower with Python, considering that both Python and R are calling the same libraries?

I'll try to demonstrate the problem below.

Spark/GraphFrames includes examples of graphs, such as friends, as described on this link. This is a very small directed graph with 6 nodes and 8 edges (note that the example is not the same compared to other versions of GraphFrames).

enter image description here

When I run the following piece of code with R, it takes almost not time to calculate PageRank:

library(graphframes)
library(sparklyr)
library(dplyr)

nodes <- read.csv('nodes.csv')
edges <- read.csv('edges.csv')

sc <- spark_connect(master = "local", version = "2.1.1")

nodes_tbl <- copy_to(sc, nodes)
edges_tbl <- copy_to(sc, edges)

graph <- gf_graphframe(nodes_tbl, edges_tbl)
ranks <- gf_pagerank(graph, reset_probability = 0.15, tol = 0.01)
print(ranks$vertices)

results <- as.data.frame(ranks$vertices)
results <- arrange(results, id)
results$pagerank <- results$pagerank / sum(results$pagerank)

print(results)

When I run the equivalent with PySpark, it takes 10 to 30 minutes:

from pyspark.sql import SparkSession
from graphframes.examples import Graphs

if __name__ == '__main__':

    sc = SparkSession.builder.master("local").getOrCreate()
    g = Graphs(sc).friends()
    results = g.pageRank(resetProbability=0.15, tol=0.01)
    results.vertices.select("id", "pagerank").show()
    results.edges.select("src", "dst", "weight").show()

I tried different version of Spark and GraphFrames for Python to be aligned with the settings of R.

1

There are 1 best solutions below

0
On BEST ANSWER

In, general when you see such significant runtime differences between pieces of code that are apparently equivalent in different backends you have to consider two possibilities:

  • There are not really equivalent. Despite using the same Java libraries under the hood, the path which different language use to interact with the JVM are not the same, and when the code reaches the JVM, it might not use the same call chain.
  • The methods are equivalent but the configuration and / or data distribution is not the same.

In this particular case the first and the most obvious reason is how you load the data.

However, as far as I can tell tell, these options shouldn't affect the runtime in this particular case. Moreover the path before code reaches JVM backend in both cases, doesn't seem to differ enough to explain the difference.

This suggests that problem lies somewhere in the configuration. In general there are at least two options which can significantly affect data distribution, and therefore the execution time:

  • spark.default.parallelism - used with RDD API to determine the number of partitions in different cases, including default post-shuffle distribution. For possible implications see for example Spark iteration time increasing exponentially when using join

    It doesn't look like it affects your code here.

  • spark.sql.shuffle.partitions - used with Dataset API to determine the number of partitions after a shuffle (groupBy, join, etc.).

    While PageRank code uses old GraphX API, and this parameter is not directly applicable there, before data is passed to the older API, involves indexing edges and vertices with Dataset API.

    If you check the source you'll see that both indexedEdges and indexVertices use joins, and therefore depend on spark.sql.shuffle.partitions.

    Furthermore the number of partitions set by aforementioned methods will be inherited by the GraphX Graph object, significantly affecting execution time.

    If you set spark.sql.shuffle.partitions to a minimum value:

    spark: SparkSession
    spark.conf.set("spark.sql.shuffle.partitions", 1)
    

    the execution time on such small data should be negligible.

Conclusion:

You environments are likely to use different values of spark.sql.shuffle.partitions.

General Directions:

If you see behavior like this, and want to roughly narrow down the problem you should take a look at the Spark UI, and see where things diverge. In this case you're likely to see significantly different numbers of tasks.