I am trying to Join two datasoures using Hazelcast Jet but it seems to be done in Statefull form that it collect all data/grouping and then joins but can we join as it goes?
Here below is my code:
public BatchStage<Object> JoinData() {
//Here is the logic for the inner joining
BatchStageWithKey<Object, String> jdbcGroupByKey = batch1.filter(k -> ((Map<String, Object>)k).get(col1) != null).groupingKey(jdbcData -> {
// gorup by join key
});
BatchStageWithKey<Object, String> csvGroupByKey = batch2.filter(k -> ((Map<String, Object>)k).get(col1) != null).groupingKey(jdbcData -> {
// gorup by join key
});
//Aggregate here (So here it becoming stateful if I am not wrong can we make it stateless?).
BatchStage<Entry<String, Tuple2<List<Object>, List<Object>>>> d = jdbcGroupByKey.aggregate2(AggregateOperations.toList(),csvGroupByKey,AggregateOperations.toList());
//Here I am trying to do inner join
BatchStage<List<Object>> jdbcBatchStageData = d.filter(h -> {
return !h.getValue().f0().isEmpty() && !h.getValue().f1().isEmpty();
}).map(e -> {
List<Object> list = new ArrayList<Object>();
e.getValue().f0().forEach(z -> {
if (e.getValue().f1().size() > 0) {
e.getValue().f1().forEach(z1 -> {
Map<String, Object> a = new HashMap<String, Object>();
Map<String, Object> f1 = new HashMap<String, Object>((Map<String, Object>) z);
Map<String, Object> f0 = new HashMap<String, Object>((Map<String, Object>) z1);
a.putAll(f0);
a.putAll(f1);
list.add(a);
});
}
});
return list;
});
return jdbcBatchStageData;
}
Please let me know if there is way to do it or not?