StreamEx grouping into lists returns an incorrect number of records

850 Views Asked by At

The following code splits a stream of objects into chunks of 1000, processes them on materialisation and returns the total number of objects at the end.

In all cases the number returned is correct unless the stream size happens to be 1. In the case the stream size is 1, the number returned is 0.

Any help would be greatly appreciated. I have also had to hack the return call in the case there are no records in the stream to be 0. I'd like to fix this too.

AtomicInteger recordCounter = new AtomicInteger(0);
try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
        stream.groupRuns((prev, next) -> recordCounter.incrementAndGet() % 1000 != 0)
              .forEach((chunk) ->
                      {
                          //... process each chunk
                      }
              );
    } catch(Exception e) {
        throw new MyRuntimeException("Failure streaming...", e);
    } finally {
        myObjects.close();
    }

return recordCounter.get() == 0 ? 0 : recordCounter.incrementAndGet();
4

There are 4 best solutions below

0
On BEST ANSWER

In the end I went with Guava's Iterators.partition() to split my stream of objects into chunks:

MutableInt recordCounter = new MutableInt();
try {
    Iterators.partition(myObjects.iterator(), 1000)
             .forEachRemaining((chunk) -> {
                      //process each chunk
                      ...
                      recordCounter.add(chunk.size());
             });
} catch (Exception e) {
    throw new MyRuntimeException("Failure streaming...", e);
} finally {
    myObjects.close();
}

return recordCounter.getValue();
1
On

As JavaDoc says:

sameGroup - a non-interfering, stateless predicate to apply to the pair of adjacent elements which returns true for elements which belong to the same group.

The predicate must be stateless, which is not your case. You are misusing the method, that's why you cannot get an expected result. It works close to what you want purely by chance, you cannot rely on this behavior, it may change in future StreamEx versions.

1
On

Originally counter was used to know when to split chunks and it is not reliable to count total number of objects. When stream has size 0 or 1 groupRuns function is not executed.

So you need another way to count objects. Instead of just consuming items in forEach you could return number of objects processed chunk.size() and sum them in the end

    AtomicInteger counter = new AtomicInteger(0);
    try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
        return stream
                .groupRuns((prev, next) -> counter.incrementAndGet() % 1000 != 0)
                .mapToLong((chunk) -> {
                     //... process each chunk
                     return chunk.size();
                 })
                .sum();
    } catch(Exception e) {
        throw new MyRuntimeException("Failure streaming...", e);
    } finally {
        myObjects.close();
    }
1
On

@Nazarii Bardiuk explained, why it doesn't work. I meet the similar requirements to split the stream before. So I forked it and made a few changes at: StreamEx-0.8.7. Here is a simple example:

int count = IntStreamEx.range(0, 10).boxed().splitToList(3).mapToInt(chunk -> {
    System.out.println(chunk);
    return chunk.size();
}).sum();

System.out.println(count);

If you're at the begin of your project, You can take a try and the code will be:

try (StreamEx<MyObject> stream = StreamEx.of(myObjects).onClose(() -> myObjects.close())) {
    return stream.splitToList(1000)
                 .mapToInt((chunk) -> {
                              //... process each chunk
                     return chunk.size();
                  }).sum();
}