I recently implemented a CSV parser in which I used switchOnFirst to extract the header, which then is passed along in an AtomicReference.
Here is a simplified version:
Flux<String> lines = Flux.just("HEADER", "line1", "line2", "line3", "line4");
AtomicReference<String> header = new AtomicReference<>();
AtomicInteger count = new AtomicInteger(0);
lines.switchOnFirst((sig, flux) -> {
String item = sig.get();
if (sig.isOnNext() && item != null) {
header.set(item);
return flux.skip(1);
}
return Flux.error(() -> new RuntimeException("No header was found"));
})
.flatMap(line -> Mono.just(header.get() + ":" + line))
.doOnNext(x -> count.incrementAndGet())
.subscribe(
System.out::println,
System.err::println,
() -> System.out.println(count.get() + " rows processed")
);
I am aware that there is Flux.deferContextual(..), but I didn't figure out how to read context data from the subscription (to print out "X rows processed").
Is there a best practises for this?
Would the capturing atomic references approach above still be a better fit?
I tried to look for information around switchOnFirst and Flux.deferContextual(..) but I haven't found anything that suggests what the best practises is for this particular scenario in which I am trying to:
- 1st parse a header that has to be passed downstream or somehow be made available to downstream emissions
- 2nd include a count of emissions
If you just want a count of the number of items processed, it looks like the
count()operator would work: