parse pyspark dataframe column of varying keys into new column for one key's values

109 Views Asked by At

I have an input pyspark dataframe df. the dataframe df has a column "field1" that has values that are dictionaries. the dictionaries do not all have the same keys. I would like to parse the "b" key into a new field "newcol". to further complicate things field1 is of datatype string. I've tried the code below, but I'm getting the error below. does anyone have a suggestion how to do this?

input df:

+--+---------------------+
|id|field1               |
+--+---------------------+
| 1|{"a":1,"b":"f"}      |
+--+---------------------+
| 2|{"a":1,"b":"e","c":3}|
+--+---------------------+

output df:

+--+---------------------+------+
|id|field1               |newcol|
+--+---------------------+------+
| 1|{"a":1,"b":"f"}      |'f'   |
+--+---------------------+------+
| 2|{"a":1,"b":"e","c":3}|'e'   |
+--+---------------------+------+

code:

df.select(
    col('id'),
    col('field1'),
    from_json(col("field1"), ArrayType(StringType())).getItem("b")
).show(truncate=False)

error:

An error was encountered:
An error occurred while calling o571.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 8.0 failed 4 times, most recent failure: Lost task 2.3 in stage 8.0 (TID 59, ip-10-100-190-16.us-west-2.compute.internal, executor 49): org.apache.spark.sql.execution.datasources.FileDownloadException: Failed to download file path: s3://treez-data-lake/product_api/products/part-00005-ab9a676d-e9fa-4594-a998-77e8ae0dd95b-c000.snappy.parquet, range: 0-41635218, partition values: [empty row], isDataPresent: false

...
1

There are 1 best solutions below

0
On

Lets try foreign library literal_eval to convert to maptype and then use pyspark method map_values to get values into list and slice by index

from ast import literal_eval
import pyspark.sql.functions as F
df2 = df.withColumn('newcol', map_values(F.udf(literal_eval, 'map<string,string>')('field1'))[1])#.select(map_values("newcol").alias("newcol")[1])
df2.show(truncate=False)


+---+---------------------+------+
|d  |field1               |newcol|
+---+---------------------+------+
|1  |{"a":1,"b":"f"}      |f     |
|2  |{"a":1,"b":"e","c":3}|e     |
+---+---------------------+------+