How to keep a SQLContext instance alive in a spark streaming application's life cycle?

592 Views Asked by At

I used SQLContext in a spark streaming application as blew:

case class topic_name (f1: Int, f2: Int)

val sqlContext = new SQLContext(sc)
@transient val ssc = new StreamingContext(sc, new Duration(5 * 1000))
ssc.checkpoint(".")
val theDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set("topic_name"))

theDStream.map(x => x._2).foreach { rdd =>
  sqlContext.jsonRDD(newsIdRDD).registerTempTable("topic_name")
  sqlContext.sql("select count(*) from topic_name").foreach { x => 
    WriteToFile("file_path", x(0).toString)
  }
}

ssc.start()
ssc.awaitTermination()

I found i could only get every 5 seconds's count of message, because "The lifetime of this temporary table is tied to the SQLContext that was used to create this DataFrame", i guess every 5 seconds, a new sqlContext will be create and the temporary table can only alive just 5 seconds, i want to the sqlContext and the temporary table alive all the streaming application's life cycle, how to do it?

Thanks~

1

There are 1 best solutions below

0
On BEST ANSWER

You are right. A SQLContext only remembers the tables registered for the lifetime of that object. So, instead of using registerTempTable, you should proabably use a persistent storage like Hive using saveAsTable command.