Wrapping StructField to use it in from_avro()

282 Views Asked by At

I'm running a test where I create a DataFrame, encode a field using to_avro() and then decode it using from_avro().

The initial (not-encoded) DataFrame's schema has got the StructField instance for the field I encode in Avro.

My goal is to use the Avro-schema extracted from this StructField as an argument for from_avro().

However, decoding using from_avro() only works if I wrap a StructField into an extra nesting layer:

val keyAvroSchema = SchemaConverters.toAvroType(
    StructType( // ???
        Array( // ???
            keySparkSchema
        )
    )
).toString()

... and then need to unwrap the decoded column in order to dig in to the values:

val decoded = encoded
    .select(
      from_avro($"key", keyAvroSchema).as("wrapped_key")
    )
    .selectExpr("wrapped_key.*") // <== want to avoid this step

My question is: is there a way NOT to wrap the schema?

Full code in Jupyter:

import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.avro.from_avro

import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
val jsons = Seq(
  """{"key": {"id": "8166"}, "value": {"timestamp": 1596789012}}""",
  """{"key": {"id": "8167"}, "value": {"timestamp": 1596789123}}"""
)
val rdd = spark.sparkContext.parallelize(jsons)

val df = spark.read.json(rdd)
jsons = List({"key": {"id": "8166"}, "value": {"timestamp": 1596789012}}, {"key": {"id": "8167"}, "value": {"timestamp": 1596789123}})
rdd = ParallelCollectionRDD[0] at parallelize at <console>:35
df = [key: struct<id: string>, value: struct<timestamp: bigint>]


[key: struct<id: string>, value: struct<timestamp: bigint>]
val encoded = df
  .select(
    to_avro($"key").as("key")
  )
encoded = [key: binary]


[key: binary]
val keySparkSchema: StructField = df.schema("key")
keySparkSchema = StructField(key,StructType(StructField(id,StringType,true)),true)


StructField(key,StructType(StructField(id,StringType,true)),true)
val keyAvroSchema = SchemaConverters.toAvroType(
    StructType( // ???
        Array( // ???
            keySparkSchema
        )
    )
).toString()
keyAvroSchema = {"type":"record","name":"topLevelRecord","fields":[{"name":"key","type":[{"type":"record","name":"key","namespace":"topLevelRecord","fields":[{"name":"id","type":["string","null"]}]},"null"]}]}


{"type":"record","name":"topLevelRecord","fields":[{"name":"key","type":[{"type":"record","name":"key","namespace":"topLevelRecord","fields":[{"name":"id","type":["string","null"]}]},"null"]}]}
val decoded = encoded
    .select(
      from_avro($"key", keyAvroSchema).as("wrapped_key")
    )
    .selectExpr("wrapped_key.*") // <== want to avoid this step
decoded = [key: struct<id: string>]


[key: struct<id: string>]
decoded.printSchema
root
 |-- key: struct (nullable = true)
 |    |-- id: string (nullable = true)
decoded.show()
+------+
|   key|
+------+
|[8166]|
|[8167]|
+------+
decoded.selectExpr("key.*").show
+----+
|  id|
+----+
|8166|
|8167|
+----+
0

There are 0 best solutions below