Perform Left,Right and Inner Join in Hazelcast Jet in Stateless manner

41 Views Asked by At

I am trying to Join two datasoures using Hazelcast Jet but it seems to be done in Statefull form that it collect all data/grouping and then joins but can we join as it goes?

Here below is my code:

public BatchStage<Object> JoinData() {

        //Here is the logic for the inner joining

         BatchStageWithKey<Object, String> jdbcGroupByKey = batch1.filter(k -> ((Map<String, Object>)k).get(col1) != null).groupingKey(jdbcData ->  {
                // gorup by join key
        });
        BatchStageWithKey<Object, String> csvGroupByKey = batch2.filter(k -> ((Map<String, Object>)k).get(col1) != null).groupingKey(jdbcData ->  {
                // gorup by join key
        });
        
        //Aggregate here (So here it becoming stateful if I am not wrong can we make it stateless?).
        BatchStage<Entry<String, Tuple2<List<Object>, List<Object>>>> d = jdbcGroupByKey.aggregate2(AggregateOperations.toList(),csvGroupByKey,AggregateOperations.toList());
        
       //Here I am trying to do inner join
        BatchStage<List<Object>> jdbcBatchStageData = d.filter(h -> {
             return !h.getValue().f0().isEmpty() && !h.getValue().f1().isEmpty();
         }).map(e -> { 

             List<Object> list = new ArrayList<Object>();
             e.getValue().f0().forEach(z ->  {
                    
                     if (e.getValue().f1().size() > 0) {
                                 
                        
                             e.getValue().f1().forEach(z1 ->  {
                                 Map<String, Object> a = new HashMap<String, Object>();
                                 Map<String, Object> f1 = new HashMap<String, Object>((Map<String, Object>) z);
                                Map<String, Object> f0 = new HashMap<String, Object>((Map<String, Object>) z1);
                                a.putAll(f0);
                                a.putAll(f1);
                                list.add(a); 
                             });
                     }
                     
                 });

             return list;
             
         });
        
        return jdbcBatchStageData;
        
}

Please let me know if there is way to do it or not?

0

There are 0 best solutions below