I have an inverse frequency parquet file of the wiki corpus on Google Cloud Storage (GCS). I want to load it from GCS to dataproc serverless (batch). However, the time to load the parquet with pyspark.read on dataproc batch is much slower than my local MacBook (16GB RAM, 8cores Intel CPU). In my local machine, it takes less than 10s to finish the loading and persistent. However, in dataproc batch, it takes 20-30s to finish the reading. I am curious where I am wrong in the setting of dataproc batch.

The inverse_freq.parquet file is 148.8MB and the bucket is using standard storage class. I am using the version 2.0 of the dataproc batch runtime. I also try some smaller parquet in ~50MB, the pyspark.read in dataproc batch still takes 20-30s to read. I think my configuration or setting of dataproc batch has some problems.

I hope someone can tell me how to shorten the time of loading a file from GCS on Google cloud dataproc batch.

Custom docker image

# Debian 11 is recommended.
FROM debian:11-slim

# Suppress interactive prompts
ENV DEBIAN_FRONTEND=noninteractive

# (Required) Install utilities required by Spark scripts.
RUN apt update && apt install -y procps tini libjemalloc2
# RUN apt-key adv --keyserver keyserver.ubuntu.com --recv-keys B8F25A8A73EACF41

# Enable jemalloc2 as default memory allocator
ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2

# (Optional) Add extra jars.
ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
#COPY spark-bigquery-with-dependencies_2.12-0.22.2.jar "${SPARK_EXTRA_JARS_DIR}"

# (Optional) Install and configure Miniconda3.
ENV CONDA_HOME=/opt/miniconda3
ENV PYSPARK_PYTHON=${CONDA_HOME}/bin/python
ENV PATH=${CONDA_HOME}/bin:${PATH}
COPY Miniconda3-py39_4.10.3-Linux-x86_64.sh .
RUN bash Miniconda3-py39_4.10.3-Linux-x86_64.sh -b -p /opt/miniconda3 \
  && ${CONDA_HOME}/bin/conda config --system --set always_yes True \
  && ${CONDA_HOME}/bin/conda config --system --set auto_update_conda False \
  && ${CONDA_HOME}/bin/conda config --system --prepend channels conda-forge \
  && ${CONDA_HOME}/bin/conda config --system --set channel_priority strict

# (Optional) Install Conda packages.
# Use mamba to install packages quickly.
RUN ${CONDA_HOME}/bin/conda install mamba -n base -c conda-forge \
    && ${CONDA_HOME}/bin/mamba install \
      conda \
      google-cloud-logging \
      python

ENV REQUIREMENTSPATH=/opt/requirements/requirements.txt
COPY requirements.txt "${REQUIREMENTSPATH}"
RUN pip install -r "${REQUIREMENTSPATH}"

ENV NLTKDATA_PATH=${CONDA_HOME}/nltk_data/corpora
RUN bash -c 'mkdir -p $NLTKDATA_PATH/{stopwords,wordnet}'
COPY nltk_data/stopwords ${NLTKDATA_PATH}/stopwords
COPY nltk_data/wordnet ${NLTKDATA_PATH}/wordnet

# (Optional) Add extra Python modules.
ENV PYTHONPATH=/opt/python/packages
RUN mkdir -p "${PYTHONPATH}"
RUN bash -c 'mkdir -p $PYTHONPATH/{utils,GCP}'
COPY utils "$PYTHONPATH/utils"
COPY GCP "$PYTHONPATH/GCP"

# (Required) Create the 'spark' group/user.
# The GID and UID must be 1099. Home directory is required.
RUN groupadd -g 1099 spark
RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
USER spark

GCloud CLI to submit a job to dataproc batch

APP_NAME="context-graph"
BUCKET="context-graph"
IDF_PATH='idf_data/idf_data/inverse_freq.parquet'
DOC_PATH="articles/text.txt"

gcloud dataproc batches submit pyspark main.py \
    --version 2.0\
    --batch test \
    --container-image "custom_image:tag1" \
    --project project_id \
    --region us-central1 \
    --deps-bucket context_graph_deps \
    --service-account [email protected] \
    --subnet default \
    --properties spark.dynamicAllocation.initialExecutors=2,spark.dynamicAllocation.minExecutors=2,spark.executor.cores=4,spark.driver.cores=8,spark.driver.memory='16g',\
spark.executor.heartbeatInterval=200s,spark.network.timeout=250s\
    -- --app-name=${APP_NAME} --idf-uri=gs://${BUCKET}/${IDF_PATH} \
    --bucket-name=${BUCKET} --doc-path=${DOC_PATH}

main.py, a very simple code to read the inverse frequent parquet

import time

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

start = time.time()

df = (
    spark.read.option("inferSchema", "true")
    .option("header", "true")
    .parquet("gs://bucket/inverse_freq.parquet")
)
df.persist()

end = time.time()
print("loading time:", end - start)

Warning and error in the log of Cloud Dataproc Batch enter image description here

enter image description here

Solution:

I found that I can add master("local[*]") to fix the problem during create sparksession.

spark = SparkSession.builder.master("local[*]").config(conf=conf).getOrCreate()

If I follow the official's examples or some online resources, they don't use master("local[*]"), it will make the load()/read() of spark from GCS slow. Not just reading parquet will be slow, loading a pyspark.ml model pipeline from GCS is also slow. So if you want to have any read/write from GCS, you should add master("local[*]").

1

There are 1 best solutions below

0
On

You need to benchmark your app on a bigger scale, because reading a small file maybe slower on distributed system than locally on laptop.

In regard to calling .master("local[*]") in your code, it makes your Spark app run in local execution mode, i.e. it executes only on a single driver node and does not scale Spark app execution to executor nodes. You should not modify spark.master property in Dataproc Serverless for Spark - it's already correctly set by the system.