I'm trying to implement a stream-stream join toy with Spark 2.3.0
The stream joins work fine when the condition matches, but lost the left stream value when the condition mismatched even using leftOuterJoin.
Thanks in advance
Here are my source code and data, basically, I'm creating two sockets, one is 9999 as right stream source and 9998 as left stream source.
val spark = SparkSession
.builder
.appName("StreamStream")
.master("local")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
val s9999: DataFrame = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val s9999Dataset: Dataset[S9999] = s9999
.map(line => {
val strings = line.get(0).toString.split(",")
val id = strings(0).toInt
val time = Timestamp.valueOf(strings(1))
S9999(id, time)
})
.withWatermark("timestamp99", "30 seconds")
val s9998Dataset: Dataset[S9998] = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9998)
.load()
.map(line => {
val strings = line.get(0).toString.split(",")
val id = strings(0).toInt
val time = Timestamp.valueOf(strings(1))
S9998(id, time)
})
val resultDataset = s9998Dataset
.join(s9999Dataset,
joinExprs = expr(
"""
id99 = id98 AND
timestamp98 >= timestamp99 AND
timestamp98 <= timestamp99 + interval 6 seconds
"""),
joinType = "leftOuter")
val streamingQuery = resultDataset
.writeStream
.outputMode("append")
.format("console")
.start()
streamingQuery.awaitTermination()
}
case class S9999(id99: Int, timestamp99: Timestamp)
case class S9998(id98: Int, timestamp98: Timestamp)
Sample Data:
left socket:
1,2011-10-02 18:50:20.123
2,2011-10-02 18:50:25.123
3,2011-10-02 18:50:30.123
4,2011-10-02 18:50:35.123
5,2011-10-02 18:50:40.123
6,2011-10-02 18:50:45.123
7,2011-10-02 18:50:50.123
8,2011-10-02 18:50:55.123
9,2011-10-02 18:51:00.123
10,2011-10-02 18:51:05.123
11,2011-10-02 18:51:10.123
12,2011-10-02 18:51:15.123
13,2011-10-02 18:51:20.123
14,2011-10-02 18:51:25.123
15,2011-10-02 18:51:30.123
right stream data:
1,2011-10-02 18:50:20.123
3,2011-10-02 18:50:30.123
7,2011-10-02 18:50:50.123
8,2011-10-02 18:50:55.123
9,2011-10-02 18:51:00.123
13,2011-10-02 18:51:20.123
14,2011-10-02 18:51:25.123
15,2011-10-02 18:51:30.123
After spending 6 hours on this question, I found the left side optional watermark is actually mandatory