How to limit PCollection in Apache Beam as soon as possible?

723 Views Asked by At

I'm using Apache Beam 2.28.0 on Google Cloud DataFlow (with Scio SDK). I have a large input PCollection (bounded) and I want to limit / sample it to a fixed number of elements, but I want to start the downstream processing as soon as possible.

Currently, when my input PCollection has e.g. 20M elements and I want to limit it to 1M by using https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/transforms/Sample.html#any-long-

input.apply(Sample.<String>any(1000000))

it waits until all of the 20M elements are read, which takes a long time.

How to efficiently limit number of elements to a fixed size and start downstream processing as soon as the limit is reached, discarding the rest of the input processing?

1

There are 1 best solutions below

1
On BEST ANSWER

OK, so my initial solution for that is to use Stateful DoFn like this (I'm using Scio's Scala SDK as mentioned in the question):

import java.lang.{Long => JLong}

class MyLimitFn[T](limit: Long) extends DoFn[KV[String, T], KV[String, T]] {
  @StateId("count") private val count = StateSpecs.value[JLong]()

  @ProcessElement
  def processElement(context: DoFn[KV[String, T], KV[String, T]]#ProcessContext, @StateId("count") count: ValueState[JLong]): Unit = {
    val current = count.read()
    if(current < limit) {
      count.write(current + 1L)
      context.output(context.element())
    }
  }
}

The downside of this solution is that I need to synthetically add the same key (e.g. an empty string) to all elements before using it. So far, it's much faster than Sample.<>any().

I still look forward to see better / more efficient solutions.