I have a simple pipeline that reads from pubsub within a fixed window, parses messages and groups them by a specific property. However if I map
after the groupBy
my function doesn't seem to get executed.
Am I missing something?
sc.pubsubSubscription[String](s"projects/$project/subscriptions/$subscription")
.withFixedWindow(Duration.standardSeconds(windowSeconds))
.map(parseMessage)
.groupBy(_.ip_address)
.map(entry => log.info(s"${entry._1} was repeated ${entry._2.size} times"))
I was able to reproduce the issue with the
DirectRunner
and a simple pipeline that reads from Pub/Sub, uses the first word of the message as the key, applies theGroupByKey
and then logs the entries. Seems like the GBK step waits for all the data to arrive and, as it's an unbounded source, does not emit any result. What worked for me is to define a windowing strategy with triggering such as: