USECASE
HazelcastJet version 0.6.1 Hazelcast version 3.10.2
Given this (simpified version) of a DAG
VERTICES
S1 Source that emits 5 items of type A (read from DB with partitioning) Local parallelism = 1
S2 Source that emits 150K items of type B (Iterator that read from DB in batch of 100 with partitioning) Local parallelism = 1
AD Processor that adapts types A->A1 and B->B1 and emits one by one
FA Processors.filterP that accepts only items of type A1 and emits one by one
FB Processors.filterP that accepts only items of type B1 and emits one by one
CL Processor that first accumulate all items of type A1, then when it receive an item of type B1, enriches it with some staff got from proper A1, and emit, one by one.
WR Sink that writes B1 Local parallelism = 1
NOTE: Just to give meaning to the filter processor: in the DAG there are other sources that flows into the same adapter AD and then goes to other paths using filter processors.
EDGES
S1 --> AD
S2 --> AD
AD --> FA (from ordinal 0)
AD --> FB (from ordinal 1)
FA --> CL (to ordinal 0 with priority 0 distributed and broadcast)
FB --> CL (to ordinal 1 with priority 1)
CL --> WR
PROBLEM
If source S2 have "few" items to load (i.e. 15K) the emitFromTraverser never returns false.
If source S2 have "many" items to load (i.e. 150K) the emitFromTraverser returns false after:
- All A1 items have been processed by CL
- About 30% of B1 items have already been transmitted to CL but no one have been processed by CL (DiagnosticProcessor log that element are sent to CL but not processed)
S2 code for reference:
protected void init(Context context) throws Exception {
super.init(context);
this.iterator = new BQueryIterator(querySupplier, batchSize);
this.traverser = Traversers.traverseIterator(this.iterator);
}
public boolean complete() {
boolean result = emitFromTraverser(this.traverser);
return result;
}
QUESTION
- Is it correct that CL doesn't process items until source ends?
- Is the usage of priority + distributed + broadcast correct on CL Vertex?
UPDATE
It seems that the completeEdge on CL edge 1 is never called. Someone can tell me why?
Thanks!
You suffer from a deadlock caused by priority. Your DAG branches from AD and then rejoins in CL, but with a priority.
Setting a priority causes that no item from lower-priority edge is processed before all items from higher-priority edge are processed.
AD
will eventually get blocked by backpressure from the lower-priority path, which is not processed byCL
. SoAD
is blocked because it can't emit to the lower priority edge and CL is blocked, because it it's still waiting for items from the higher priority edge, resulting in a deadlock.In your case, you can resolve it by making 2
AD
vertices, each processing items from one of the sources: