Aggregation and grouping using Hazelcast jet

146 Views Asked by At

I am trying to do grouping and aggregation using Hazelcast Jet but it is getting little bit slow as I have to loop through data twice one for creating groupingKey and after that aggregating all data so is there any better and feasible way to it, Please help

Here is my code first I am creating a groupingKey from using my data as grouping is done by multiple keys:

// Fields with which I want to do grouping.
List<String> fields1 = {"Field1", "Field4"};
List<String> cAggCount = {"CountFiled"};
List<String> sumField = {"SumFiled"}; 
BatchStage<Map<Object, List<Object>>> aggBatchStageDataGroupBy = batchStage
            .aggregate(AggregateOperations.groupingBy(jdbcData -> {
                
                Map<String, Object> m = ((Map<String, Object>)jdbcData);
                Set<String> jset = m.keySet();
                

                StringBuilder stringBuilder = new StringBuilder("");
                fields1.stream().forEach(dataValue -> {     
                    
                    if (!jset.contains(dataValue.toString())) {
                        
                        stringBuilder.append(null+"").append(",");
                    } else {
                        Object k = m.get(dataValue.toString());
                        
                        if (k == null) {
                            stringBuilder.append("").append(",");
                        } else {
                            stringBuilder.append(k).append(",");
                        }
                    }

                });
                return stringBuilder.substring(0, stringBuilder.length() - 1);
            }));

And after that I am doing aggregation on it as below:

BatchStage<List<Map<String, Object>>> aggBatchStageData = aggBatchStageDataGroupBy
                    .map(data -> {
                  
                  data.entrySet().stream().forEach(v -> {
                            Map<String, Object> objectMap = new HashMap<>();
                            IntStream.range(0, cAggCount).forEach(k -> {
                                    objectMap.put(countAlias.get(k), v.getValue().stream().mapToLong(dataMap -> {
                                        return new BigDecimal(1).intValue();
                                    }).count());
                                });
              }
      return mapList;
});

So can we do this whole process in one go instead of doing loop twice like groupigByKey first and than aggregating it.

0

There are 0 best solutions below