I launched a DataProc cluster in GCP, with one master node and 3 work nodes. Every node has 8 vCPU and 30G memory.
I developed a pyspark code, which read one csv file from GCS. The csv file is about 30G in size.
df_raw = (
spark
.read
.schema(schema)
.option('header', 'true')
.option('quote', '"')
.option('multiline', 'true')
.csv(infile)
)
df_raw = df_raw.repartition(20, "Product")
print(df_raw.rdd.getNumPartitions())
Here is how I launched the pyspark into dataproc:
gcloud dataproc jobs submit pyspark gs://<my-gcs-bucket>/<my-program>.py \
--cluster=${CLUSTER} \
--region=${REGION} \
I got the partition number of only 1.
I attached the nodes usage image here for your reference.
Seems it used only one vCore from one worker node.
How to make this in parallel with multiple partitions and using all nodes and more vCores?
Tried repartition to 20, but it still only used one vCore from one work node, as below:
Pyspark default partition is 200. So I was surprised to see dataproc didn't use all available resources for this kind of task.
This isn't a dataproc issue, but a pure Spark/pyspark one.
In order to parallelize your data it needs to split into multiple partitions - a number larger than the number of executors (total worker cores) you have. (E.g. ~ *2, ~ *3, ...)
There are various ways to do this e.g.:
Split data into files or folders and parallelize the list of files/folders and work on each one (or use a database that already does this and keeps this partitioning in Spark read).
Repartition your data after you get a Spark DF e.g. read the number of executors and multiply them by N and repartition to this many partitions. When you do this, you must chose columns which divide your data well i.e. into many parts, not into a few parts only e.g. by day, by a customer ID, not by a status ID.
The code runs on the master node and the parallel stages are distributed amongst the worker nodes, e.g.
Since Spark functions are lazy, they only run when you reach a step like write or collect which causes the DF to be evaluated.