I have to upgrade Cassandra to 4.x. This was the code previously written in cassandra 3.5
protected <T> Stream<T> getAll(Stream<Statement> statements, Mapper<T> mapper) {
List<ResultSetFuture> futures = statements
.peek(p -> cassandraReads.inc())
.map(s -> session.session().executeAsync(s))
.collect(Collectors.toList());
return futures.stream()
.map(ResultSetFuture::getUninterruptibly)
.map(mapper::map)
.flatMap(r -> StreamSupport.stream(r.spliterator(), false));
}
I have done some changes...
List<CompletionStage<AsyncResultSet>> futures = statements
.peek(p -> cassandraReads.inc())
.map(s -> session.getSession().executeAsync(s))
.collect(Collectors.toList());
BUT what should I use in place of .map(ResultSetFuture::getUninterruptibly) since it has been removed now.
Since I am new to Cassandra and asynchronous programming any help would be appreciated.
Since you are already mixing async and blocking calls in your code, I'd suggest that you go fully blocking and use
Session.executeinstead ofSession.executeAsync. It's going to make things much easier. Then you could rewrite your code as follows:Note: I'm changing slightly your
Mapperinterface, which I assume would be:(You could actually replace this
Mapperinterface with justFunction<Row, T>btw.)Other than that, as Erick Ramirez said, please have a look at this page to understand how to write proper async code with driver 4.x.