Are there examples of using the Histogram accumulator in Flink

2k Views Asked by At

I stumbled on the Histogram class in the Flink hierarchy, but there's no "here's how you can use this" kind of documentation around it. I wanted to do something like:

dataStream
    .countWindowAll(100)
    .fold(new Histogram(), (histogram,data) -> {histogram.add(data.getValue()); return histogram;})
    .flatmap((h, out) -> h.getLocalValue().navigableKeySet.iterator().forEachRemaining(key -> out.collect(key.toString()+","+h.get(key).toString()))
    .print()

but sadly the Histogram isn't serializable through Flink. Maybe there's a "here's how you can use this" or there's another way to histogram via flink.

I'm clearly doing something wrong.

1

There are 1 best solutions below

0
On BEST ANSWER

Flink's accumulators are not meant to be used as data types for DataStream or DataSet.

Instead, they are registered via the RuntimeContext, which is available from RichFunction.getRuntimeContext(). This is usually done in theopen()method of aRichFunction`:

class MyFunc extends RichFlatMapFunction[Int, Int] {

  val hist: Histogram = new Histogram()

  override def open(conf: Configuration): Unit = {
    getRuntimeContext.addAccumulator("myHist", hist)
  }

  override def flatMap(value: Int, out: Collector[Int]): Unit = {
    hist.add(value)
  }
}

All parallel instances of an accumulators are periodically shipped to the JobManager (the master process) and merged. Their values can be accessed from the JobExecutionResult returned from StreamExecutionEnvironment.execute().

I think your use case cannot be addressed by Flink's accumulators. You should create a custom histogram data type.