Beam/Dataflow stateful processing, ParDo never runs

362 Views Asked by At

I'm trying to use Beam's stateful processing on Dataflow, but I get these errors in the log every time I try to output data. The result is that nothing is outputted from the stateful ParDo+DoFn:

16:45:56.948 CEST Proposing dynamic split of work unit myproject;2020-03-31_07_34_07-7523868393961495218;8536385410242733529 at {"fractionConsumed":0.5}
16:45:56.948 CEST Rejecting split request because custom reader returned null residual source.

Edit This seems be coincidental. It seems like the stateful ParDo doesn't output any elements until the window fires. Is this correct?

This example replicates the error with Scio's .batchByKey (it's using stateful processing under the hood):

    val create = Create.of(()).withCoder(CoderMaterializer.beam(sc, Coder[Unit]))
    sc.customInput("Unit input", create)
      .map(_ => println("STARTING"))
      .applyTransform(ParDo.of(new Increasing)) // Outputs infinite stream of increasing numbers, one per second, prints each number to stdout
      .keyBy(1 -> _)
      .batchByKey(5)
      .map {
        case (key, vs) => vs.foreach(v => println(s"GOT batch with $v"))
      }
    sc.run()

The final .map, which is just a ParDo+DoFn with a single output, never runs.

In the output I see five lines of increasing numbers (from new Increasing) followed by the two messages above. This repeats.

Anyone know what the error may be? This seems to be the source apache/beam/../WorkerCustomSources.java#L698

1

There are 1 best solutions below

2
On

That error simply means that a source that you are using in your pipeline cannot be split. So it's not directly related to progress of your pipeline but might be related in this case. The only source I see you using is Create and your issue could potentially be due to your Create being initialized with an empty tuple. Can try seeding your pipeline with a Create that has one or more elements.