How to read streaming data in XML format from Kafka?

6.9k Views Asked by At

I am trying to read XML data from Kafka topic using Spark Structured streaming.

I tried using the Databricks spark-xml package, but I got an error saying that this package does not support streamed reading. Is there any way I can extract XML data from Kafka topic using structured streaming?

My current code:

df = spark \
      .readStream \
      .format("kafka") \
      .format('com.databricks.spark.xml') \
      .options(rowTag="MainElement")\
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option(subscribeType, "test") \
      .load()

The error:

py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
5

There are 5 best solutions below

1
On BEST ANSWER
.format("kafka") \
.format('com.databricks.spark.xml') \

The last one with com.databricks.spark.xml wins and becomes the streaming source (hiding Kafka as the source).

In order words, the above is equivalent to .format('com.databricks.spark.xml') alone.

As you may have experienced, the Databricks spark-xml package does not support streaming reading (i.e. cannot act as a streaming source). The package is not for streaming.

Is there any way I can extract XML data from Kafka topic using structured streaming?

You are left with accessing and processing the XML yourself with a standard function or a UDF. There's no built-in support for streaming XML processing in Structured Streaming up to Spark 2.2.0.

That should not be a big deal anyway. A Scala code could look as follows.

val input = spark.
  readStream.
  format("kafka").
  ...
  load

val values = input.select('value cast "string")

val extractValuesFromXML = udf { (xml: String) => ??? }
val numbersFromXML = values.withColumn("number", extractValuesFromXML('value))

// print XMLs and numbers to the stdout
val q = numbersFromXML.
  writeStream.
  format("console").
  start

Another possible solution could be to write your own custom streaming Source that would deal with the XML format in def getBatch(start: Option[Offset], end: Offset): DataFrame. That is supposed to work.

0
On

You cannot mix format this way. Kafka source is loaded as Row including number of values, like key, value and topic, with value column storing payload as a binary type:

Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:

...

value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.

Parsing this content is the user responsibility and cannot be delegated to other data sources. See for example my answer to How to read records in JSON format from Kafka using Structured Streaming?.

For XML you'll likely need an UDF (UserDefinedFunction), although you can try Hive XPath functions first. You should also decode binary data.

0
On
import xml.etree.ElementTree as ET
df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option(subscribeType, "test") \
      .load()

Then I wrote a python UDF

def parse(s):
  xml = ET.fromstring(s)
  ns = {'real_person': 'http://people.example.com',
      'role': 'http://characters.example.com'}
  actor_el = xml.find("DNmS:actor",ns)

  if(actor_el ):
       actor = actor_el.text
  role_el.find('real_person:role', ns)
  if(role_el):
       role = role_el.text
  return actor+"|"+role

Register this UDF

extractValuesFromXML = udf(parse)

   XML_DF= df .withColumn("mergedCol",extractroot("value"))

   AllCol_DF= xml_DF.withColumn("actorName", split(col("mergedCol"), "\\|").getItem(0))\
        .withColumn("Role", split(col("mergedCol"), "\\|").getItem(1))
0
On

You can use the SQL built-in functions xpath and the like to extract data from a nested XML structure that comes as the value of a Kafka message.

Given a nested XML like

<root>
  <ExecutionTime>20201103153839</ExecutionTime>
  <FilterClass>S</FilterClass>
  <InputData>
    <Finance>
      <HeaderSegment>
        <Version>6</Version>
        <SequenceNb>1</SequenceNb>
      </HeaderSegment>
    </Finance>
  </InputData>
</root>

you can then just use those SQL functions in your selectExpr statment as below:

df.readStream.format("kafka").options(...).load()
  .selectExpr("CAST(value AS STRING) as value")
  .selectExpr(
    "xpath(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsArryString",
    "xpath_long(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsLong",
    "xpath_string(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsString",
    "xpath_int(value, '/CofiResults/InputData/Finance/HeaderSegment/Version/text()') as VersionAsInt")

Remember that the xpath function will return an Array of Strings whereas you may find it more convenient to extract the value as String or even Long. Applying the code above in Spark 3.0.1 with a console sink stream will result in:

+-------------------------+-------------------+---------------------+------------+
|ExecutionTimeAsArryString|ExecutionTimeAsLong|ExecutionTimeAsString|VersionAsInt|
+-------------------------+-------------------+---------------------+------------+
|[20201103153839]         |20201103153839     |20201103153839       |6           |
+-------------------------+-------------------+---------------------+------------+
1
On

Looks like the above approach works but it is not using the passed schema to parse the XML Document.

If you print the relation schema it is always

INFO  XmlToAvroConverter - .convert() : XmlRelation Schema ={} root
 |-- fields: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- nullable: boolean (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- type: string (nullable = true)

For ex: I am streaming following XML Documents from Kafka Topic

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<Book>
<Author>John Doe</Author>
<Title>Test</Title>
<PubishedDate></PublishedDate>
</Book>

And here is the code i have to parse the XML into a DataFrame

kafkaValueAsStringDF = kafakDF.selectExpr("CAST(key AS STRING) msgKey","CAST(value AS STRING) xmlString")

  var parameters = collection.mutable.Map.empty[String, String]

  parameters.put("rowTag", "Book")

kafkaValueAsStringDF.writeStream.foreachBatch {
          (batchDF: DataFrame, batchId: Long) =>

 val xmlStringDF:DataFrame = batchDF.selectExpr("xmlString")

            xmlStringDF.printSchema()

            val rdd: RDD[String] = xmlStringDF.as[String].rdd


            val relation = XmlRelation(
              () => rdd,
              None,
              parameters.toMap,
              xmlSchema)(spark.sqlContext)


            logger.info(".convert() : XmlRelation Schema ={} "+relation.schema.treeString)

}
        .start()
        .awaitTermination()

When i read the same XML Documents from File System or S3 and use the spark-xml and it is parsing the schema as expected.

Thanks Sateesh