There are 5,000,000 entities in my database. I am connecting to the database via a reactive driver (r2dbc). Next, I want to split it on 100,000 entities, split them into bundles of 1,000 entities and send each bundle further. When a portion of 100,000 is processed, get another 100,000. If I get more than 100,000, it will be OOM
BIG_DATA -get 100.000-> make Batch -1.000(parallel)-> process. If success - get more another
but when I try to simulate it, all the original data is extracted and only then it is divided into bundles
public static void main(String[] args) throws InterruptedException {
Flux.range(0, 12)
.limitRate(6)
.doOnNext(in -> log.info("NEW"))
.parallel(2)
.runOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(2)))
.collect(() -> new ArrayList<Integer>(), List::add)
.flatMap(DataMigrationServiceApplication::check)
.subscribe();
Thread.sleep(10000);
}
private static Mono<List<Integer>> check(List<Integer> input) {
return Mono.defer(() -> Mono.just(input))
.doOnNext(in -> log.info("IN {}", in))
.subscribeOn(schedulers.parallel());
}
log
16:53:57.871 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO .DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.881 [pool-1-thread-2] INFO DataMigrationServiceApplication -- IN [1, 3, 5, 7, 9, 11]
16:53:57.881 [pool-1-thread-1] INFO DataMigrationServiceApplication -- IN [0, 2, 4, 6, 8, 10]
I try limitRate, buffer, window
Please, help
Well, I make this:
It makes Flux with 20 elements, then 4-rails (in parallel) take elements by round-robin. Each rail take 5 elements and process it.