The RocksDB state store of Spark Executors takes up a lot of space in Kubernetes

153 Views Asked by At

I am currently testing the benefits of using RocksDB as a State store for a (Scala) Spark application I am maintaining. It is running on a Kubernetes cluster. (https://spark.apache.org/docs/3.2.0/structured-streaming-programming-guide.html#rocksdb-state-store-implementation)

When the application I encountered ephemeral-storage issues: ephemeral-storage shortage

Looking at the Spark executors logs, RocksDB doesn't have enough space to create its files:

{"log_timestamp": "2023-11-15T23:20:27.964+0000", "log_level": "ERROR", "process_id": 1, "process_name": "kqi", "thread_id": 1, "thread_name": "Executor task launch worker for task 33.0 in stage 6.1 (TID 464)", "action_name": "org.apache.spark.executor.Executor", "log_message": "Exception in task 33.0 in stage 6.1 (TID 464) org.rocksdb.RocksDBException: While open a file for appending: /var/data/spark-1c317e23-92e3-41b6-9543-7a98db12faf3/spark-dc0e0bee-e012-4227-83cf-75024f80f8fa/StateStoreId(opId=0,partId=90,name=left-keyToNumValues)-ed47b686-9148-4d08-9ba0-5af507cc5c26/workingDir-f5f1bc0f-afa9-46c9-a139-efd65077e1b4/000004.dbtmp: No space left on device\n at org.rocksdb.RocksDB.open(Native Method)\n at org.rocksdb.RocksDB.open(RocksDB.java:251)\n at org.apache.spark.sql.execution.streaming.state.RocksDB.openDB(RocksDB.scala:426)\n at org.apache.spark.sql.execution.streaming.state.RocksDB.load(RocksDB.scala:114)\n at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.getStore(RocksDBStateStoreProvider.scala:192)\n at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:496)\n at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$StateStoreHandler.getStateStore(SymmetricHashJoinStateManager.scala:414)\n at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyToNumValuesStore.(SymmetricHashJoinStateManager.scala:438)\n at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.(SymmetricHashJoinStateManager.scala:382)\n at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner.(StreamingSymmetricHashJoinExec.scala:508)\n at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.processPartitions(StreamingSymmetricHashJoinExec.scala:257)\n at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$doExecute$1(StreamingSymmetricHashJoinExec.scala:228)\n at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$doExecute$1$adapted(StreamingSymmetricHashJoinExec.scala:228)\n at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper$StateStoreAwareZipPartitionsRDD.compute(StreamingSymmetricHashJoinHelper.scala:235)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)\n at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)\n at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)\n at org.apache.spark.scheduler.Task.run(Task.scala:131)\n at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)\n at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)\n at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)\n at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n at java.base/java.lang.Thread.run(Thread.java:829)\n"}

Then I allocated a PVC to the spark local dir with quite generous storage (250 Gi) per executor. (3 executors are running)

        - --conf
        - spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand
        - --conf
        - spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=vsphere-thin
        - --conf
        - spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=250Gi
        - --conf
        - spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data/spark/
        - --conf
        - spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false

Then taking a look at the space consumed by the RocksDB State store, it fills up rather quickly at the bootstrap of Spark, taking up to ~200Gi, for each executor. It is not even processing data yet.

bash-4.4$ du -h --max-depth=1
32G     ./spark-bc3a977f-d380-4adc-bfb2-229e8d02d650
16K     ./lost+found
5.4M    ./blockmgr-f5de30e5-683c-443a-a0b4-6360ee516cb3
32G     .
bash-4.4$ du -h --max-depth=1
35G     ./spark-bc3a977f-d380-4adc-bfb2-229e8d02d650
16K     ./lost+found
5.8M    ./blockmgr-f5de30e5-683c-443a-a0b4-6360ee516cb3
35G     .
bash-4.4$ du -h --max-depth=1
39G     ./spark-bc3a977f-d380-4adc-bfb2-229e8d02d650
16K     ./lost+found
6.3M    ./blockmgr-f5de30e5-683c-443a-a0b4-6360ee516cb3
39G     .
bash-4.4$ du -h --max-depth=1
41G     ./spark-bc3a977f-d380-4adc-bfb2-229e8d02d650
16K     ./lost+found
6.7M    ./blockmgr-f5de30e5-683c-443a-a0b4-6360ee516cb3
41G     .
bash-4.4$ du -h --max-depth=1
43G     ./spark-bc3a977f-d380-4adc-bfb2-229e8d02d650
16K     ./lost+found
7.1M    ./blockmgr-f5de30e5-683c-443a-a0b4-6360ee516cb3
43G     .

Here are some other settings set for Spark:

spark:
  submit:
    extraConf:
      "spark.sql.streaming.stateStore.providerClass": "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
      "spark.sql.streaming.stateStore.rocksdb.compactOnCommit": true
      "spark.sql.streaming.stateStore.rocksdb.blockSizeKB": 4
      "spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB": 8
      "spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs": 60000
      "spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad": true
  driver:
    memory: 8g
    memoryOverhead: 2g
    request:
      cores: 5800m
    limit:
      cores: 6
    extraJavaOptions: -XX:+UseG1GC
  executor:
    instances: 3
    cores: 10
    memory: 9g
    memoryOverhead: 3g
    request:
      cores: 5800m
    limit:
      cores: 6
    extraJavaOptions: -XX:MaxMetaspaceSize=2100m -XX:+UseG1GC
properties:
  sqlShufflePartitions: "200"
  sparkUiEnabled: "true"
  spark:
    streaming:
      maxOffsetsPerTrigger: "1000000"

I would like to understand whether it is normal or not that RocksDB state store is taking so much space what parameters could possibly impact the initial size of the store? My guess is that RocksDB is doing some kind of space pre-emption.

Thanks for your help.

0

There are 0 best solutions below