I am using Spark/GraphFrames from Python and from R. When I call PageRank on a small graph from Python, it is a lot slower than with R. Why is it so much slower with Python, considering that both Python and R are calling the same libraries?
I'll try to demonstrate the problem below.
Spark/GraphFrames includes examples of graphs, such as friends, as described on this link. This is a very small directed graph with 6 nodes and 8 edges (note that the example is not the same compared to other versions of GraphFrames).
When I run the following piece of code with R, it takes almost not time to calculate PageRank:
library(graphframes)
library(sparklyr)
library(dplyr)
nodes <- read.csv('nodes.csv')
edges <- read.csv('edges.csv')
sc <- spark_connect(master = "local", version = "2.1.1")
nodes_tbl <- copy_to(sc, nodes)
edges_tbl <- copy_to(sc, edges)
graph <- gf_graphframe(nodes_tbl, edges_tbl)
ranks <- gf_pagerank(graph, reset_probability = 0.15, tol = 0.01)
print(ranks$vertices)
results <- as.data.frame(ranks$vertices)
results <- arrange(results, id)
results$pagerank <- results$pagerank / sum(results$pagerank)
print(results)
When I run the equivalent with PySpark, it takes 10 to 30 minutes:
from pyspark.sql import SparkSession
from graphframes.examples import Graphs
if __name__ == '__main__':
sc = SparkSession.builder.master("local").getOrCreate()
g = Graphs(sc).friends()
results = g.pageRank(resetProbability=0.15, tol=0.01)
results.vertices.select("id", "pagerank").show()
results.edges.select("src", "dst", "weight").show()
I tried different version of Spark and GraphFrames for Python to be aligned with the settings of R.
In, general when you see such significant runtime differences between pieces of code that are apparently equivalent in different backends you have to consider two possibilities:
In this particular case the first and the most obvious reason is how you load the data.
In
sparklyr
copy_to.spark_connection
uses by default only a single partition. With such small data it can be often beneficial, as parallelization / distribution overhead can be much higher than the computation cost, but can also lead to miserable failures.In PySpark,
friends
loader uses standardparallelize
- it means that the number of partitions will usedefaultParallelism
.Based on the master configuration the value is at least 1, but it can be affected by configuration options not visible here (like
spark.default.parallelism
).However, as far as I can tell tell, these options shouldn't affect the runtime in this particular case. Moreover the path before code reaches JVM backend in both cases, doesn't seem to differ enough to explain the difference.
This suggests that problem lies somewhere in the configuration. In general there are at least two options which can significantly affect data distribution, and therefore the execution time:
spark.default.parallelism
- used with RDD API to determine the number of partitions in different cases, including default post-shuffle distribution. For possible implications see for example Spark iteration time increasing exponentially when using joinIt doesn't look like it affects your code here.
spark.sql.shuffle.partitions
- used withDataset
API to determine the number of partitions after a shuffle (groupBy
,join
, etc.).While PageRank code uses old GraphX API, and this parameter is not directly applicable there, before data is passed to the older API, involves indexing edges and vertices with
Dataset
API.If you check the source you'll see that both
indexedEdges
andindexVertices
use joins, and therefore depend onspark.sql.shuffle.partitions
.Furthermore the number of partitions set by aforementioned methods will be inherited by the GraphX
Graph
object, significantly affecting execution time.If you set
spark.sql.shuffle.partitions
to a minimum value:the execution time on such small data should be negligible.
Conclusion:
You environments are likely to use different values of
spark.sql.shuffle.partitions
.General Directions:
If you see behavior like this, and want to roughly narrow down the problem you should take a look at the Spark UI, and see where things diverge. In this case you're likely to see significantly different numbers of tasks.