wordcount test shows slowness in Flink

300 Views Asked by At

i am doing some benchmark comparison between streaming processing frameworks,

I selected WordCount such "Hello world" task (with some twists) in this area, and tested Flink and Hazelcast Jet so far, the result is Flink is taking 80+s to complete, while Jet only takes 30+s

I know Flink is very popular, what did I wrong here? Really curious about this

My sample code is here

https://github.com/ChinW/stream-processing-compare


Below are details (spec, pipeline, log)

The Tested WordCount Pipeline

Source (read from file, 5MB)
 -> Process: Split line into words (Here here is a bomb, every word emit 1000 times)
 -> Group/Count
 -> Sink (do nothing)
My Local Result
  • MacBook Pro (13-inch, 2020, Four Thunderbolt 3 ports)
  • 2 GHz Quad-Core Intel Core i5 (8 logic processors)
  • 16 GB 3733 MHz LPDDR4X
  • JDK 11
Jet 4.4

Pipeline:

digraph DAG {
    "items" [localParallelism=1];
    "fused(flat-map, filter)" [localParallelism=8];
    "group-and-aggregate-prepare" [localParallelism=8];
    "group-and-aggregate" [localParallelism=8];
    "do-nothing-sink" [localParallelism=1];
    "items" -> "fused(flat-map, filter)" [queueSize=1024];
    "fused(flat-map, filter)" -> "group-and-aggregate-prepare" [label="partitioned", queueSize=1024];
    subgraph cluster_0 {
        "group-and-aggregate-prepare" -> "group-and-aggregate" [label="distributed-partitioned", queueSize=1024];
    }
    "group-and-aggregate" -> "do-nothing-sink" [queueSize=1024];
}

Log:

Start time: 2021-04-18T13:52:52.106
Duration: 00:00:36.459
Jet: finish in 36.45935081 seconds.

Start time: 2021-04-19T16:51:53.806
Duration: 00:00:30.143
Jet: finish in 30.625740453 seconds.

Start time: 2021-04-19T16:52:48.906
Duration: 00:00:37.207
Jet: finish in 37.862554137 seconds.
Flink 1.12.2 for Scala 2.11

flink-config.yaml Configuration:

jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 2096m
taskmanager.memory.process.size: 12288m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 8

Pipeline:

{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Flat Map",
    "pact" : "Operator",
    "contents" : "Flat Map",
    "parallelism" : 8,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "REBALANCE",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "Keyed Aggregation",
    "pact" : "Operator",
    "contents" : "Keyed Aggregation",
    "parallelism" : 8,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "Sink: Unnamed",
    "pact" : "Data Sink",
    "contents" : "Sink: Unnamed",
    "parallelism" : 8,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}

Log:

❯ flink run -c chiw.spc.flink.FlinkWordCountKt stream-processing-compare-1.0-SNAPSHOT.jar
Job has been submitted with JobID 163ce849a663e45f3c3028a98f260e7c
Program execution finished
Job with JobID 163ce849a663e45f3c3028a98f260e7c has finished.
Job Runtime: 88614 ms

❯ flink run -c chiw.spc.flink.FlinkWordCountKt stream-processing-compare-1.0-SNAPSHOT.jar
Job has been submitted with JobID fcf12488204969299e4e5d7f23f4ea6e
Program execution finished
Job with JobID fcf12488204969299e4e5d7f23f4ea6e has finished.
Job Runtime: 90165 ms

❯ flink run -c chiw.spc.flink.FlinkWordCountKt stream-processing-compare-1.0-SNAPSHOT.jar
Job has been submitted with JobID 37e349e4fad90cd7405546d30239afa4
Program execution finished
Job with JobID 37e349e4fad90cd7405546d30239afa4 has finished.
Job Runtime: 78908 ms

Many thanks for help!

2

There are 2 best solutions below

0
On

I don't think you've done anything wrong, our testing has shown Jet to be significantly faster than Spark and Flink, and word count is one of the examples we used to measure that.

0
On

Given that your bomb creates a big number of small items (as opposed to a smaller number of large items), my best guess as to why Jet might have an advantage here is its single-producer-single-consumer (SPSC) queues coupled with coroutine-like concurrency.

You have 8 flat-mapping stages talking to 8 aggregating stages. Jet will execute this on a total of 8 threads (assuming you have 8 availableProcessors), so there will be almost no thread scheduling done on the OS level. Data will move between threads in big chunks: flatMap will enqueue 1024 at a time and then each aggregator will pull all the items destined for it. Communication over SPSC queues happens without any interference from other threads: each aggregating processor has 8 input queues, one dedicated to each flat-mapper.

In Flink, every stage will start another 8 threads and I also notice the sink has a parallelism of 8, so that's 24 threads and another one for the source. The OS will have to schedule them on the 8 physical cores. Communication will occur over multiple-producer-single-consumer (MPSC) queues, which means all flat-mapper threads must coordinate so that only a single thread at a time enqueues an item to any given aggregator, and contention results in hot CAS loops in all the threads.

To confirm this suspicion, try to gather some profiling data. If the above story is correct, you should see Flink spending a lot of CPU time enqueuing the data.