pyspark - How to split the string inside an array column and make it into json?

201 Views Asked by At

My df schema is:

 |-- students: array (nullable = true)
 |    |-- element: string (containsNull = true)

and the actual data is as follows:

+--------------------------------------------------+
|                                          students|
+--------------------------------------------------+
|                            [(Alice:20), (Bob:13)]|
|[(James:39), (Robert:29), (Kevin:31), (Andrew:48)]|
|                                    [(Richard:88)]|
+--------------------------------------------------+

I want to convert this column to json format.

+-----------------------------------------------------------------------------------------------------------+
|                                                                                               json_student|
+-----------------------------------------------------------------------------------------------------------+
|[{"name":"Alice","age":20},{"name":"Bob","age":13}]                                                        |
|[{"name":"James","age":39},{"name":"Robert","age":29},{"name":"Kevin","age":31},{"name":"Andrew","age":48}]|
|[{"name":"Richard","age":88}]                                                                              |
+-----------------------------------------------------------------------------------------------------------+

What drives me hard is that the length of the array is different for each row.
Is there a simple solution to create the format I want?
I'm using spark 2.4.4 version.

2

There are 2 best solutions below

3
user238607 On

Write a udf to solve this as follows.

import json
import sys
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import udf


sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [
    [["(Alice:20)", "(Bob:13)"] ],
    [["(James:39)", "(Robert:29)", "(Kevin:31)", "(Andrew:48)"] ],
    [["(Richard:88)"] ],

]

df1Columns = ["students"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)

def list_of_dicts(col):
    split_string = [x[1:-1].split(":") for x in col]
    dict_format = [ {"name": x[0], "age": x[1]} for x in split_string]
    req_json = json.dumps(dict_format)
    return req_json

my_udf = F.udf(list_of_dicts)

new_df = df1.withColumn("json_students", my_udf(F.col("students")))

print("Result")
new_df.select(["json_students" ]).show(n=100, truncate=False)

Output :

Result

+----------------------------------------------------------------------------------------------------------------------------------+
|json_students                                                                                                                     |
+----------------------------------------------------------------------------------------------------------------------------------+
|[{"name": "Alice", "age": "20"}, {"name": "Bob", "age": "13"}]                                                                    |
|[{"name": "James", "age": "39"}, {"name": "Robert", "age": "29"}, {"name": "Kevin", "age": "31"}, {"name": "Andrew", "age": "48"}]|
|[{"name": "Richard", "age": "88"}]                                                                                                |
+----------------------------------------------------------------------------------------------------------------------------------+
0
Bernhard Stadler On

First of all, your problem is a bit harder to solve with pure Spark DF without SQL because you specified Spark 2.4.4. From Spark 3.1.x onwards, there the function transform, which would make things easier, is available in the Python API, and not only in SQL.

Building on user238607's answer, the following would work in Spark 3.1.0 and above:

from pyspark import SparkContext
from pyspark.sql import functions as F, SQLContext

sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [
    [["(Alice:20)", "(Bob:13)"]],
    [["(James:39)", "(Robert:29)", "(Kevin:31)", "(Andrew:48)"]],
    [["(Richard:88)"]],
]

df1Columns = ["students"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)

student_string_pattern = r'^\(([^:]+):([^)]+)\)$'

out_df = df1 \
    .withColumn("students_json", F.transform("students", lambda student, i: F.to_json(
    F.struct(
        F.regexp_extract(student, student_string_pattern, 1).alias("name"),
        F.regexp_extract(student, student_string_pattern, 2).astype('int').alias("age")
    ))))
out_df.show(truncate=False)

If you are stuck with Spark 2.4.4, then there are several options: The limitation above only refers to the Python interface - the function transform already exists in SQL, so you can use the transform function in SQL:

from pyspark import SparkContext
from pyspark.sql import functions as F, SQLContext

sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [
    [["(Alice:20)", "(Bob:13)"]],
    [["(James:39)", "(Robert:29)", "(Kevin:31)", "(Andrew:48)"]],
    [["(Richard:88)"]],
]

df1Columns = ["students"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)

sqlContext.registerDataFrameAsTable(df1, "raw_students")

student_string_pattern = r'^\\(([^:]+):([^)]+)\\)$'
# noinspection SqlNoDataSourceInspection
out_df = sqlContext.sql(f"""
    SELECT
        transform(students, student -> to_json(named_struct(
                                        'name', regexp_extract(student, '{student_string_pattern}', 1),
                                        'age', CAST(regexp_extract(student, '{student_string_pattern}', 2) AS int)
                                       ))
        ) as json_students                                                                                                                     
    FROM raw_students
""")

out_df.show(truncate=False)

If using SQL is not an option, then there is still the option of using explode to flatten the records. But as you want to keep the arrays, it will be necessary to collect them into arrays again after parsing the student string into JSON.

If the order of the students within each array doesn't matter, the following should work:

from pyspark import SparkContext
from pyspark.sql import functions as F, SQLContext

sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [
    [["(Alice:20)", "(Bob:13)"]],
    [["(James:39)", "(Robert:29)", "(Kevin:31)", "(Andrew:48)"]],
    [["(Richard:88)"]],
]

df1Columns = ["students"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)

student_string_pattern = r'^\(([^:]+):([^)]+)\)$'

out_df = df1 \
    .withColumn("id", F.monotonically_increasing_id()) \
    .select("id", F.explode("students").alias("student")) \
    .withColumn("student_json",
                F.to_json(
                    F.struct(
                        F.regexp_extract("student", student_string_pattern, 1).alias("name"),
                        F.regexp_extract("student", student_string_pattern, 2).astype('int').alias(
                            "age")
                    ))
                ) \
    .groupby("id") \
    .agg(F.collect_list("student_json").alias("students_json")) \
    .select("students_json")

out_df.show(truncate=False)

If you need the same order as in the original to be guaranteed, then you need to sort the array according to the original order somehow, e.g.:

from pyspark import SparkContext
from pyspark.sql import functions as F, SQLContext

sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [
    [["(Alice:20)", "(Bob:13)"]],
    [["(James:39)", "(Robert:29)", "(Kevin:31)", "(Andrew:48)"]],
    [["(Richard:88)"]],
]

df1Columns = ["students"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)

student_string_pattern = r'^\(([^:]+):([^)]+)\)$'

out_df = df1 \
    .withColumn("id", F.monotonically_increasing_id()) \
    .select("id", F.posexplode("students").alias("arr_idx", "student")) \
    .withColumn("student_json",
                F.to_json(F.struct(
                    F.regexp_extract("student", student_string_pattern, 1).alias("name"),
                    F.regexp_extract("student", student_string_pattern, 2).astype('int').alias("age")
                ))) \
    .groupby("id") \
    .agg(F.sort_array(F.collect_list(F.struct("arr_idx", "student_json"))).alias("pos_students_json")) \
    .select(F.col("pos_students_json.student_json").alias("students_json"))

out_df.show(truncate=False)