I work with Apache Spark 2.2.0.
I'd like to know how many events were late in a streaming batch in Structured Streaming. Is there a way to know the number or (better) what events exactly were late?
I use the following example to explore watermark and late events.
val valuesPerDevice = spark.
readStream.
format("kafka").
option("subscribe", "topic1").
option("kafka.bootstrap.servers", "localhost:9092").
load.
withColumn("tokens", split('value, ",")).
withColumn("seconds", 'tokens(0) cast "long").
withColumn("event_time", to_timestamp(from_unixtime('seconds))). // <-- Event time has to be a timestamp
withColumn("device", 'tokens(1)).
withColumn("level", 'tokens(2) cast "int").
withWatermark(eventTime = "event_time", delayThreshold = "10 seconds"). // <-- define watermark (before groupBy!)
groupBy($"event_time"). // <-- use event_time for grouping
agg(collect_list("level") as "levels", collect_list("device") as "devices").
withColumn("event_time", to_timestamp($"event_time")) // <-- convert to human-readable date
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = valuesPerDevice.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime(5.seconds)).
outputMode(OutputMode.Append). // <-- Append output mode
start