I am working on a pyspark code, where I connect to a BigQuery table and import that source table as a df. The process requires to have the df column names renamed. To do so I define a dictionary, basically hardcoding it.
cols_new_to_original = {'colA_new':'colA_original', 'colB_new':'colB_Original'...}
This has around 3000+ key:value pairs, further I am using the following step to rename the columns of the df using cols_new_to_original
.
Code:
# Replace column names using the cols_new_to_original
df = df.repartition(30)
for new_name, original_name in cols_new_to_original.items():
df = df.withColumnRenamed(new_name, original_name)
While doing so I am getting following error:
Traceback (most recent call last):
File "/tmp/5642d3d6-77f7-4615-aae9-dcd4e1c9bbdb/scorer.py", line 132, in <module>
score()
File "/tmp/5642d3d6-77f7-4615-aae9-dcd4e1c9bbdb/scorer.py", line 90, in score
df = df.withColumnRenamed(new_name, original_name)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 2475, in withColumnRenamed
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o118.withColumnRenamed.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.lang.reflect.Array.newInstance(Array.java:75)
Below is my cluster configuration:
cluster_config = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n2-standard-2",
"disk_config": {
"boot_disk_size_gb": 500
}
},
"worker_config": {
"num_instances": 8,
"machine_type_uri": "n2-standard-8",
"disk_config": {
"boot_disk_size_gb": 1000
}
},
"secondary_worker_config": {
"num_instances": 1,
"machine_type_uri": "n2-standard-8",
"disk_config": {
"boot_disk_size_gb": 1000
},
"preemptibility": "NON_PREEMPTIBLE"
},
"software_config": {
"image_version": "2.0.27-centos8",
"optional_components": [
"JUPYTER"
],
"properties": {
"spark:spark.dynamicAllocation.enabled": "true",
"spark:spark.dynamicAllocation.minExecutors": "1",
"spark:spark.dynamicAllocation.maxExecutors": "10",
"spark:spark.shuffle.service.enabled": "true"
}
}, .............................
Initially I also tried "spark:spark.executor.cores": "2"
and "spark:spark.executor.memory": "16g"
, but I get the same issue.
Thanks @Dagang for the suggestion. The OOM was with driver instead of executor, and tuning
spark.driver.memory
helped.I also had to make some changes to how I was renaming the columns.
New Code:
This worked.