i am doing some benchmark comparison between streaming processing frameworks,
I selected WordCount such "Hello world" task (with some twists) in this area, and tested Flink and Hazelcast Jet so far, the result is Flink is taking 80+s to complete, while Jet only takes 30+s
I know Flink is very popular, what did I wrong here? Really curious about this
My sample code is here
https://github.com/ChinW/stream-processing-compare
Below are details (spec, pipeline, log)
The Tested WordCount Pipeline
Source (read from file, 5MB)
-> Process: Split line into words (Here here is a bomb, every word emit 1000 times)
-> Group/Count
-> Sink (do nothing)
My Local Result
- MacBook Pro (13-inch, 2020, Four Thunderbolt 3 ports)
- 2 GHz Quad-Core Intel Core i5 (8 logic processors)
- 16 GB 3733 MHz LPDDR4X
- JDK 11
Pipeline:
digraph DAG {
"items" [localParallelism=1];
"fused(flat-map, filter)" [localParallelism=8];
"group-and-aggregate-prepare" [localParallelism=8];
"group-and-aggregate" [localParallelism=8];
"do-nothing-sink" [localParallelism=1];
"items" -> "fused(flat-map, filter)" [queueSize=1024];
"fused(flat-map, filter)" -> "group-and-aggregate-prepare" [label="partitioned", queueSize=1024];
subgraph cluster_0 {
"group-and-aggregate-prepare" -> "group-and-aggregate" [label="distributed-partitioned", queueSize=1024];
}
"group-and-aggregate" -> "do-nothing-sink" [queueSize=1024];
}
Log:
Start time: 2021-04-18T13:52:52.106
Duration: 00:00:36.459
Jet: finish in 36.45935081 seconds.
Start time: 2021-04-19T16:51:53.806
Duration: 00:00:30.143
Jet: finish in 30.625740453 seconds.
Start time: 2021-04-19T16:52:48.906
Duration: 00:00:37.207
Jet: finish in 37.862554137 seconds.
Flink 1.12.2 for Scala 2.11
flink-config.yaml
Configuration:
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 2096m
taskmanager.memory.process.size: 12288m
taskmanager.numberOfTaskSlots: 8
parallelism.default: 8
Pipeline:
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
}, {
"id" : 2,
"type" : "Flat Map",
"pact" : "Operator",
"contents" : "Flat Map",
"parallelism" : 8,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "REBALANCE",
"side" : "second"
} ]
}, {
"id" : 4,
"type" : "Keyed Aggregation",
"pact" : "Operator",
"contents" : "Keyed Aggregation",
"parallelism" : 8,
"predecessors" : [ {
"id" : 2,
"ship_strategy" : "HASH",
"side" : "second"
} ]
}, {
"id" : 5,
"type" : "Sink: Unnamed",
"pact" : "Data Sink",
"contents" : "Sink: Unnamed",
"parallelism" : 8,
"predecessors" : [ {
"id" : 4,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
Log:
❯ flink run -c chiw.spc.flink.FlinkWordCountKt stream-processing-compare-1.0-SNAPSHOT.jar
Job has been submitted with JobID 163ce849a663e45f3c3028a98f260e7c
Program execution finished
Job with JobID 163ce849a663e45f3c3028a98f260e7c has finished.
Job Runtime: 88614 ms
❯ flink run -c chiw.spc.flink.FlinkWordCountKt stream-processing-compare-1.0-SNAPSHOT.jar
Job has been submitted with JobID fcf12488204969299e4e5d7f23f4ea6e
Program execution finished
Job with JobID fcf12488204969299e4e5d7f23f4ea6e has finished.
Job Runtime: 90165 ms
❯ flink run -c chiw.spc.flink.FlinkWordCountKt stream-processing-compare-1.0-SNAPSHOT.jar
Job has been submitted with JobID 37e349e4fad90cd7405546d30239afa4
Program execution finished
Job with JobID 37e349e4fad90cd7405546d30239afa4 has finished.
Job Runtime: 78908 ms
Many thanks for help!
I don't think you've done anything wrong, our testing has shown Jet to be significantly faster than Spark and Flink, and word count is one of the examples we used to measure that.