Hazelcast Jet after rollingAggregate how to get each grouped values in one go only?

57 Views Asked by At

I am trying to to do the aggregation on the data but I am trying rollingAggregate but as it's streaming process it processes asyn but my need is I need to do sum of amount and group by id and country.

Here is my below code:

BatchStage<Object> list= // coming from jdbc

list.groupingKey( data -> {
  // grouping ke logic for ID, Country
}.rollingAggregate(AggregateOperations.toList()).
map(entry -> {

      if (((Entry<String, List<Object>>)entry).getKey().equals("36465,Indonesia")) {
        **//Here for this ID and Country it is coming twice with different records but for each grouping key I want it just once only.**
      }         

     List<Object> resultRow = new ArrayList<>();
     ((Entry<String, List<Object>>)entry).getValue().stream().forEach(data -> {
     
      });
      return resultRow;
});

Data (Here I am getting data from Jdbc source):

[['id','country','id2','amount']
 [3638, Dominican Republic, 'Qee', 973029], 
 [3638, Dominican Republic, 'Hee', 95571], 
 [3668, USA, 'Fee', 986839], 
 [3668, USA, 'CEE', 201017]]

Result Needed (But as I have millions records do not want to group in one go):

[['id','country','id2','amount']
 [3638, Dominican Republic, 1068600],  
 [3668, USA,1187856]]

And I am doing grouping by id and country columns.

And in grouping by key comes two time in map:

First time getting:

{"3638,Dominican Republic" : [[3638, Dominican Republic, 'Hee', 95571]]}

Second time getting:

{"3638,Dominican Republic" : [[3638, Dominican Republic, 'Qee', 973029]]}

But need it one go only (May be because using Jdbc source which sends data in Batchstage?):

{"3638,Dominican Republic" : [[3638, Dominican Republic, 'Qee', 973029], 
 [3638, Dominican Republic, 'Hee', 95571]]}

So Could anyone help how for each grouping key wise we get all the records in one go only but I don't want all the grouping key in one go?

1

There are 1 best solutions below

3
On

The rollingAggregate outputs the result of each aggregation for each input item, see the JavaDoc.

So what you should see is the following:

3638,Dominican Republic=[Item[id=3638, country=Dominican Republic, id2=Qee, amount=973029]]
3638,Dominican Republic=[Item[id=3638, country=Dominican Republic, id2=Qee, amount=973029], Item[id=3638, country=Dominican Republic, id2=Hee, amount=95571]]
3668,USA=[Item[id=3668, country=USA, id2=Fee, amount=986839]]
3668,USA=[Item[id=3668, country=USA, id2=Fee, amount=986839], Item[id=3668, country=USA, id2=Cee, amount=201017]]

First you receive the key with 1 item, than 2 items etc.. If you only see one item at a time, your group function is probably wrong.

If you don't want the intermediate results, but only final aggregation with all items, you should just use aggregate instead:

3668,USA=[Item[id=3668, country=USA, id2=Fee, amount=986839], Item[id=3668, country=USA, id2=Cee, amount=201017]]
3638,Dominican Republic=[Item[id=3638, country=Dominican Republic, id2=Qee, amount=973029], Item[id=3638, country=Dominican Republic, id2=Hee, amount=95571]]

which is produced by the following code:

public record Item(int id, String country, String id2, int amount) implements Serializable {
}

public static void main(String[] args) {

    Pipeline p = Pipeline.create();
    p.readFrom(TestSources.items(
             new Item(3638, "Dominican Republic", "Qee", 973029),
             new Item(3638, "Dominican Republic", "Hee", 95571),
             new Item(3668, "USA", "Fee", 986839),
             new Item(3668, "USA", "Cee", 201017)
     )).groupingKey(item -> item.id + "," + item.country)
     .aggregate(AggregateOperations.toList())
     .writeTo(Sinks.logger());

    HazelcastInstance hz = Hazelcast.bootstrappedInstance();
    hz.getJet().newJob(p).join();
}