I'm reading a stream from Kafka, and I convert the value from Kafka ( which is JSON ) in to Structure.
from_json
has a variant that takes a schema of type String
, but I could not find a sample. Please advise what is wrong in the below code.
Error
Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '(' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT',
== SQL ==
STRUCT ( `firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY ( STRUCT ( `city`: STRING, `state`: STRING, `zip`: STRING ) ) )
-------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)
Program
public static void main(String[] args) throws AnalysisException {
String master = "local[*]";
String brokers = "quickstart:9092";
String topics = "simple_topic_6";
SparkSession sparkSession = SparkSession
.builder().appName(EmployeeSchemaLoader.class.getName())
.master(master).getOrCreate();
String employeeSchema = "STRUCT ( firstName: STRING, lastName: STRING, email: STRING, " +
"addresses: ARRAY ( STRUCT ( city: STRING, state: STRING, zip: STRING ) ) ) ";
SparkContext context = sparkSession.sparkContext();
context.setLogLevel("ERROR");
SQLContext sqlCtx = sparkSession.sqlContext();
Dataset<Row> employeeDataset = sparkSession.readStream().
format("kafka").
option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics).load();
employeeDataset.printSchema();
employeeDataset = employeeDataset.withColumn("strValue", employeeDataset.col("value").cast("string"));
employeeDataset = employeeDataset.withColumn("employeeRecord",
functions.from_json(employeeDataset.col("strValue"),employeeSchema, new HashMap<>()));
employeeDataset.printSchema();
employeeDataset.createOrReplaceTempView("employeeView");
sparkSession.catalog().listTables().show();
sqlCtx.sql("select * from employeeView").show();
}
Your question helped me to find that the variant of
from_json
withString
-based schema was only available in Java and has recently been added to Spark API for Scala in the upcoming 2.3.0. I've so long lived with the strong belief that Spark API for Scala was always the most feature-rich and your question helped me to learn it should not have been so before the change in 2.3.0 (!)Back to your question, you can define the string-based schema in JSON or DDL format actually.
Writing JSON by hand may be a bit cumbersome and so I'd take a different approach (that given I'm a Scala developer is fairly easy).
Let's first define the schema using Spark API for Scala.
That seems to match your schema, doesn't it?
With that convert the schema to a JSON-encoded string was a breeze with
json
method.schemaAsJson
is exactly your JSON string which looks pretty...hmmm...complex. For the display purposes I'd rather useprettyJson
method.That's your schema in JSON.
You can use
DataType
and "validate" the JSON string (using DataType.fromJson that Spark uses under the covers forfrom_json
).All seems fine. Mind if I'm checking this out with a sample dataset?