I am currently testing the benefits of using RocksDB as a State store for a (Scala) Spark application I am maintaining. It is running on a Kubernetes cluster. (https://spark.apache.org/docs/3.2.0/structured-streaming-programming-guide.html#rocksdb-state-store-implementation)
When the application I encountered ephemeral-storage issues:
Looking at the Spark executors logs, RocksDB doesn't have enough space to create its files:
{"log_timestamp": "2023-11-15T23:20:27.964+0000", "log_level": "ERROR", "process_id": 1, "process_name": "kqi", "thread_id": 1, "thread_name": "Executor task launch worker for task 33.0 in stage 6.1 (TID 464)", "action_name": "org.apache.spark.executor.Executor", "log_message": "Exception in task 33.0 in stage 6.1 (TID 464) org.rocksdb.RocksDBException: While open a file for appending: /var/data/spark-1c317e23-92e3-41b6-9543-7a98db12faf3/spark-dc0e0bee-e012-4227-83cf-75024f80f8fa/StateStoreId(opId=0,partId=90,name=left-keyToNumValues)-ed47b686-9148-4d08-9ba0-5af507cc5c26/workingDir-f5f1bc0f-afa9-46c9-a139-efd65077e1b4/000004.dbtmp: No space left on device\n at org.rocksdb.RocksDB.open(Native Method)\n at org.rocksdb.RocksDB.open(RocksDB.java:251)\n at org.apache.spark.sql.execution.streaming.state.RocksDB.openDB(RocksDB.scala:426)\n at org.apache.spark.sql.execution.streaming.state.RocksDB.load(RocksDB.scala:114)\n at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.getStore(RocksDBStateStoreProvider.scala:192)\n at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:496)\n at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$StateStoreHandler.getStateStore(SymmetricHashJoinStateManager.scala:414)\n at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyToNumValuesStore.(SymmetricHashJoinStateManager.scala:438)\n at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.(SymmetricHashJoinStateManager.scala:382)\n at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner.(StreamingSymmetricHashJoinExec.scala:508)\n at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.processPartitions(StreamingSymmetricHashJoinExec.scala:257)\n at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$doExecute$1(StreamingSymmetricHashJoinExec.scala:228)\n at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$doExecute$1$adapted(StreamingSymmetricHashJoinExec.scala:228)\n at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper$StateStoreAwareZipPartitionsRDD.compute(StreamingSymmetricHashJoinHelper.scala:235)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)\n
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)\n at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)\n at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)\n at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)\n at org.apache.spark.scheduler.Task.run(Task.scala:131)\n at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)\n at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)\n at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)\n at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n at java.base/java.lang.Thread.run(Thread.java:829)\n"}
Then I allocated a PVC to the spark local dir with quite generous storage (250 Gi) per executor. (3 executors are running)
- --conf
- spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand
- --conf
- spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=vsphere-thin
- --conf
- spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=250Gi
- --conf
- spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data/spark/
- --conf
- spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false
Then taking a look at the space consumed by the RocksDB State store, it fills up rather quickly at the bootstrap of Spark, taking up to ~200Gi, for each executor. It is not even processing data yet.
bash-4.4$ du -h --max-depth=1
32G ./spark-bc3a977f-d380-4adc-bfb2-229e8d02d650
16K ./lost+found
5.4M ./blockmgr-f5de30e5-683c-443a-a0b4-6360ee516cb3
32G .
bash-4.4$ du -h --max-depth=1
35G ./spark-bc3a977f-d380-4adc-bfb2-229e8d02d650
16K ./lost+found
5.8M ./blockmgr-f5de30e5-683c-443a-a0b4-6360ee516cb3
35G .
bash-4.4$ du -h --max-depth=1
39G ./spark-bc3a977f-d380-4adc-bfb2-229e8d02d650
16K ./lost+found
6.3M ./blockmgr-f5de30e5-683c-443a-a0b4-6360ee516cb3
39G .
bash-4.4$ du -h --max-depth=1
41G ./spark-bc3a977f-d380-4adc-bfb2-229e8d02d650
16K ./lost+found
6.7M ./blockmgr-f5de30e5-683c-443a-a0b4-6360ee516cb3
41G .
bash-4.4$ du -h --max-depth=1
43G ./spark-bc3a977f-d380-4adc-bfb2-229e8d02d650
16K ./lost+found
7.1M ./blockmgr-f5de30e5-683c-443a-a0b4-6360ee516cb3
43G .
Here are some other settings set for Spark:
spark:
submit:
extraConf:
"spark.sql.streaming.stateStore.providerClass": "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
"spark.sql.streaming.stateStore.rocksdb.compactOnCommit": true
"spark.sql.streaming.stateStore.rocksdb.blockSizeKB": 4
"spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB": 8
"spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs": 60000
"spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad": true
driver:
memory: 8g
memoryOverhead: 2g
request:
cores: 5800m
limit:
cores: 6
extraJavaOptions: -XX:+UseG1GC
executor:
instances: 3
cores: 10
memory: 9g
memoryOverhead: 3g
request:
cores: 5800m
limit:
cores: 6
extraJavaOptions: -XX:MaxMetaspaceSize=2100m -XX:+UseG1GC
properties:
sqlShufflePartitions: "200"
sparkUiEnabled: "true"
spark:
streaming:
maxOffsetsPerTrigger: "1000000"
I would like to understand whether it is normal or not that RocksDB state store is taking so much space what parameters could possibly impact the initial size of the store? My guess is that RocksDB is doing some kind of space pre-emption.
Thanks for your help.