I'm running a test where I create a DataFrame, encode a field using to_avro()
and then decode it using from_avro()
.
The initial (not-encoded) DataFrame's schema has got the StructField
instance for the field I encode in Avro.
My goal is to use the Avro-schema extracted from this StructField
as an argument for from_avro()
.
However, decoding using from_avro()
only works if I wrap a StructField
into an extra nesting layer:
val keyAvroSchema = SchemaConverters.toAvroType(
StructType( // ???
Array( // ???
keySparkSchema
)
)
).toString()
... and then need to unwrap the decoded column in order to dig in to the values:
val decoded = encoded
.select(
from_avro($"key", keyAvroSchema).as("wrapped_key")
)
.selectExpr("wrapped_key.*") // <== want to avoid this step
My question is: is there a way NOT to wrap the schema?
Full code in Jupyter:
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.avro.from_avro
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
val jsons = Seq(
"""{"key": {"id": "8166"}, "value": {"timestamp": 1596789012}}""",
"""{"key": {"id": "8167"}, "value": {"timestamp": 1596789123}}"""
)
val rdd = spark.sparkContext.parallelize(jsons)
val df = spark.read.json(rdd)
jsons = List({"key": {"id": "8166"}, "value": {"timestamp": 1596789012}}, {"key": {"id": "8167"}, "value": {"timestamp": 1596789123}})
rdd = ParallelCollectionRDD[0] at parallelize at <console>:35
df = [key: struct<id: string>, value: struct<timestamp: bigint>]
[key: struct<id: string>, value: struct<timestamp: bigint>]
val encoded = df
.select(
to_avro($"key").as("key")
)
encoded = [key: binary]
[key: binary]
val keySparkSchema: StructField = df.schema("key")
keySparkSchema = StructField(key,StructType(StructField(id,StringType,true)),true)
StructField(key,StructType(StructField(id,StringType,true)),true)
val keyAvroSchema = SchemaConverters.toAvroType(
StructType( // ???
Array( // ???
keySparkSchema
)
)
).toString()
keyAvroSchema = {"type":"record","name":"topLevelRecord","fields":[{"name":"key","type":[{"type":"record","name":"key","namespace":"topLevelRecord","fields":[{"name":"id","type":["string","null"]}]},"null"]}]}
{"type":"record","name":"topLevelRecord","fields":[{"name":"key","type":[{"type":"record","name":"key","namespace":"topLevelRecord","fields":[{"name":"id","type":["string","null"]}]},"null"]}]}
val decoded = encoded
.select(
from_avro($"key", keyAvroSchema).as("wrapped_key")
)
.selectExpr("wrapped_key.*") // <== want to avoid this step
decoded = [key: struct<id: string>]
[key: struct<id: string>]
decoded.printSchema
root
|-- key: struct (nullable = true)
| |-- id: string (nullable = true)
decoded.show()
+------+
| key|
+------+
|[8166]|
|[8167]|
+------+
decoded.selectExpr("key.*").show
+----+
| id|
+----+
|8166|
|8167|
+----+