How to automatically update the Hive external table metadata partitions for streaming data

1.4k Views Asked by At

I am writing the spark streaming data into hdfs partitions using pyspark. please find the code

  data = (spark.readStream.format("json").schema(fileSchema).load(inputDirectoryOfJsonFiles))

  output = (data.writeStream
   .format("parquet")
   .partitionBy("date")
   .option("compression", "none")
   .option("path" , "/user/hdfs/stream-test")
   .option("checkpointLocation", "/user/hdfs/stream-ckp")
   .outputMode("append")
   .start().awaitTermination())

After writing the data into hdfs, i am creating the hive external partition table.

CREATE EXTERNAL TABLE test (id string,record string) 
PARTITIONED BY (`date` date) 
STORED AS PARQUET 
LOCATION '/user/hdfs/stream-test/'
TBLPROPERTIES ('discover.partitions' = 'true');

But the newly created partitions are not been recognized Hive metastore. i am updating the metastore using the msck command.

msck repair table test sync partitions

Now for the streaming data how to automate this task of updating the hive metastore with the real time partitions.

please suggest a solution to this problem.

1

There are 1 best solutions below

0
AudioBubble On

Spark structured streaming don't natively support this, but you can use foreachBatch as workaround

val yourStream = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .load()

val query = yourStream.writeStream.foreachBatch((batchDF: DataFrame, batchId: Long) => {
  batchDF
      .write
      .mode(SaveMode.Append)
      .insertInto("your_db.your_hive_table");
}).start()

query.awaitTermination()

More details refer https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch