I'm trying to write the data from single source to multiple DataSinks (Mongo and Postgres DBs). Incoming data
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
Dataset<Row> personalDetails = df.selectExpr("name", "id", "age");
personalDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "PI").save();
}).start();
Dataset<Row> salDetails = df.selectExpr("basicSal", "bonus");
salDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "SAL").save();
}).start();
Problem with this is, I can see the Spark is opening the two Streams and reading the same events twice. Is it possible to read once and apply different transformations and write to different collection?
You should cache the DataFrame. See here:
And their example:
You can put all of your code in one
foreachBatch
and write the dataframe to your 2 sinks. You can do that by caching the dataframe, and performing theselectExpr
on this cached dataframe, and saving it.As a side note - please notice that in any case, if you wanted "all or nothing"(i.e. you don't want a situation where you wrote to mongo and not to postgres), you must use only one
foreachBatch
, because otherwise (if you have 2foreachBatch
, as in your question) you have 2 independent batches - one might fail when the other one succeeded, for the same data.