I've got a question with regards to Spark structured streaming on a Kafka stream.
I have a schema of type:
StructType schema = new StructType()
.add("field1", StringType)
.add("field2", StringType)
.add("field3", StringType)
.add("field4", StringType)
.add("field5", StringType);
I bootstrap my stream from Kafka topic like:
Dataset<Row> ds1 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "brokerlist")
.option("zookeeper.connect", "zk_url")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.option("max.poll.records", 10)
.option("failOnDataLoss", false)
.load();
next convert to string,string type:
Dataset<Row> df1 = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
Now I would like to convert the value field (which is a JSON) to the previously converted schema which should make SQL queries easier:
Dataset<Row> df2 = df1.select(from_json("value", schema=schema).as("data").select("single_column_field");
It seems that Spark 2.3.1 doesn't know the from_json
function?
This is my imports:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
Any idea on how to solve this? Please note that I'm not looking for a Scala solution, but a pure Java based solution!
This code working for me. Hope it'll helpful