Scio/Apache beam, how to map grouped results

560 Views Asked by At

I have a simple pipeline that reads from pubsub within a fixed window, parses messages and groups them by a specific property. However if I map after the groupBy my function doesn't seem to get executed.

Am I missing something?

sc.pubsubSubscription[String](s"projects/$project/subscriptions/$subscription")
  .withFixedWindow(Duration.standardSeconds(windowSeconds))
  .map(parseMessage)
  .groupBy(_.ip_address)
  .map(entry => log.info(s"${entry._1} was repeated ${entry._2.size} times"))
1

There are 1 best solutions below

1
On

I was able to reproduce the issue with the DirectRunner and a simple pipeline that reads from Pub/Sub, uses the first word of the message as the key, applies the GroupByKey and then logs the entries. Seems like the GBK step waits for all the data to arrive and, as it's an unbounded source, does not emit any result. What worked for me is to define a windowing strategy with triggering such as:

object PubSubTest {
  private lazy val log = LoggerFactory.getLogger(this.getClass)

  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)

    val defaultInputSub = "test_sub"
    val subscription = args.getOrElse("input", defaultInputSub)
    val project = "PROJECT_ID"

    sc.pubsubSubscription[String](s"projects/$project/subscriptions/$subscription")
      // provide window options including triggering
      .withFixedWindows(duration = Duration.standardSeconds(10), options = WindowOptions(
        trigger = Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
          .plusDelayOf(Duration.standardSeconds(2))),
        accumulationMode = AccumulationMode.ACCUMULATING_FIRED_PANES,
        closingBehavior = ClosingBehavior.FIRE_IF_NON_EMPTY,
        allowedLateness = Duration.standardSeconds(0))
      )
      // use first word of the Pub/Sub message as the key
      .keyBy(a => a.split(" ")(0))
      .groupByKey
      .map(entry => log.info(s"${entry._1} was repeated ${entry._2.size} times"))

    val result = sc.close().waitUntilFinish()
  }
}