I am trying to to do the aggregation on the data but I am trying rollingAggregate but as it's streaming process it processes asyn but my need is I need to do sum of amount and group by id and country.
Here is my below code:
BatchStage<Object> list= // coming from jdbc
list.groupingKey( data -> {
// grouping ke logic for ID, Country
}.rollingAggregate(AggregateOperations.toList()).
map(entry -> {
if (((Entry<String, List<Object>>)entry).getKey().equals("36465,Indonesia")) {
**//Here for this ID and Country it is coming twice with different records but for each grouping key I want it just once only.**
}
List<Object> resultRow = new ArrayList<>();
((Entry<String, List<Object>>)entry).getValue().stream().forEach(data -> {
});
return resultRow;
});
Data (Here I am getting data from Jdbc source):
[['id','country','id2','amount']
[3638, Dominican Republic, 'Qee', 973029],
[3638, Dominican Republic, 'Hee', 95571],
[3668, USA, 'Fee', 986839],
[3668, USA, 'CEE', 201017]]
Result Needed (But as I have millions records do not want to group in one go):
[['id','country','id2','amount']
[3638, Dominican Republic, 1068600],
[3668, USA,1187856]]
And I am doing grouping by id and country columns.
And in grouping by key comes two time in map:
First time getting:
{"3638,Dominican Republic" : [[3638, Dominican Republic, 'Hee', 95571]]}
Second time getting:
{"3638,Dominican Republic" : [[3638, Dominican Republic, 'Qee', 973029]]}
But need it one go only (May be because using Jdbc source which sends data in Batchstage?):
{"3638,Dominican Republic" : [[3638, Dominican Republic, 'Qee', 973029],
[3638, Dominican Republic, 'Hee', 95571]]}
So Could anyone help how for each grouping key wise we get all the records in one go only but I don't want all the grouping key in one go?
The
rollingAggregate
outputs the result of each aggregation for each input item, see the JavaDoc.So what you should see is the following:
First you receive the key with 1 item, than 2 items etc.. If you only see one item at a time, your group function is probably wrong.
If you don't want the intermediate results, but only final aggregation with all items, you should just use
aggregate
instead:which is produced by the following code: