Is it possible to write two disjoint Datasets to data sync using foreachBatch in Spark Structured Streaming?

781 Views Asked by At

I'm trying to write the data from single source to multiple DataSinks (Mongo and Postgres DBs). Incoming data

Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "topic1")
        .load();

Dataset<Row> personalDetails = df.selectExpr("name", "id", "age");


personalDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "PI").save();
    }).start();

Dataset<Row> salDetails = df.selectExpr("basicSal", "bonus");
salDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "SAL").save();
    }).start();

Problem with this is, I can see the Spark is opening the two Streams and reading the same events twice. Is it possible to read once and apply different transformations and write to different collection?

2

There are 2 best solutions below

7
On BEST ANSWER

You should cache the DataFrame. See here:

Write to multiple locations - If you want to write the output of a streaming query to multiple locations, then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it.

And their example:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

You can put all of your code in one foreachBatch and write the dataframe to your 2 sinks. You can do that by caching the dataframe, and performing the selectExpr on this cached dataframe, and saving it.

As a side note - please notice that in any case, if you wanted "all or nothing"(i.e. you don't want a situation where you wrote to mongo and not to postgres), you must use only one foreachBatch, because otherwise (if you have 2 foreachBatch, as in your question) you have 2 independent batches - one might fail when the other one succeeded, for the same data.

0
On

Finally I could solve this issue using Spark 3.1.1 using Structured Streaming Table API.

Since Spark 3.1, you can also use DataStreamReader.table() to read tables as streaming DataFrames and use DataStreamWriter.toTable() to write streaming DataFrames as tables:

    // Stream to myTable
    df.writeStream()
      .option("checkpointLocation", "/tmp/test")
      .toTable("myTable");
    // Stream from myTable
    Dataset<Row> tableDf = spark.readStream()
                                .table("myTable");

Refer: Spark 3.1 Table API

With this I could solve the problem of multiple reads from source "kafka". With this it'll create only one Kafka consumer and stream data to Table. From there it will use multiple streams to read from table and apply additional transformations on top of table dataset. So the complete example looks as follows:

Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "topic1")
        .load();

// Stream to myTable
df.writeStream.option("checkpointLocation", "c:/temp/test").toTable("myTable");

// Stream from myTable
Dataset<Row> tableDf = spark.readStream().table("myTable");

Dataset<Row> personalDetails = tableDf.selectExpr("name", "id", "age");


personalDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "PI").save();
    }).start();

Dataset<Row> salDetails = tableDf.selectExpr("basicSal", "bonus");
salDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "SAL").save();
    }).start();