JSON aggregation using s3-dist-cp for Spark application consumption

414 Views Asked by At

My spark application running on AWS EMR loads data from JSON array stored in S3. The Dataframe created from it is then processed via Spark engine.

My source JSON data is in the form of multiple S3 objects. I need to compact them into a JSON array to reduce the number of S3 objects to read from within my Spark application. I tried using "s3-dist-cp --groupBy", but the result is a concatenated JSON data which in itself is not a valid JSON file, so I cannot create a Dataframe from it.

Here is simplified example to illustrate it further.

Source data :

S3 Object Record1.json : {"Name" : "John", "City" : "London"}

S3 Object Record2.json : {"Name" : "Mary" , "City" : "Paris"}

s3-dist-cp --src s3://source/ --dest s3://dest/ --groupBy='.*Record.*(\w+)'

Aggregated output

{"Name" : "Mary" , "City" : "Paris"}{"Name" : "John", "City" : "London"}

What I need :

[{"Name" : "John", "City" : "London"},{"Name" : "Mary" , "City" : "Paris"}]

Application code for reference

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
val schema = new StructType()
                 .add("Name",StringType,true)
                 .add("City",StringType,true)

val df = spark.read.option("multiline","true").schema(schema).json("test.json")
df.show()

Expected output

+----+------+

|Name| City|

+----+------+

|John|London|

|Mary| Paris|

+----+------+

Is s3-dist-cp the right tool for my need? Any other suggestion for aggregating json data to be loaded by Spark app as Dataframe?

1

There are 1 best solutions below

0
On

Alternatively you can use regexp_replace to replace a single line string into multiline strings on json format, before that would be transformed into a dataset.

Check for the sample:

val df = spark.read.text("test.json")\
    .withColumn("json", from_json(regexp_replace(col("value"), "\}\{", "\}\n\{"), schema))\
        .select("json.*")

df.show()

About regexp_replace: Pyspark replace strings in Spark dataframe column