I have done some experiments on Asynchronous paging following the documentation and I have observed about 20% performance degradation between 3.8.0 and 4.6.0 driver versions. 4.6.0 driver app is taking more time to process the rows coming from the database.
Is this something that is already known? Any ways to overcome the more time taken in the 4.6.0 driver application?
Here are the links to the documentations that I followed to write the applications.
3.8.0: https://docs.datastax.com/en/developer/java-driver/3.8/manual/async/#async-paging
4.6.0: https://docs.datastax.com/en/developer/java-driver/4.6/manual/core/paging/#asynchronous-paging
The table has over 100,000 rows.
4.6.0 driver app code:
CompletionStage<AsyncResultSet> futureRs = session.executeAsync(<Select Query>);
start = System.currentTimeMillis();
futureRs.whenComplete(this::processRows);
void processRows(AsyncResultSet rs, Throwable error) {
if (error != null) {
// The query failed, process the error
} else {
for (Row row : rs.currentPage()) {
// Process the row...
}
if (rs.hasMorePages()) {
rs.fetchNextPage().whenComplete(this::processRows);
} else {
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
System.out.println("time elapsed 1 " + timeElapsed);
System. exit(0);
}
}
}
3.8.0 driver app code:
Statement statement = new SimpleStatement(<Select Query>).setFetchSize(5000);
ListenableFuture<ResultSet> future = Futures.transform(
session.executeAsync(statement),
iterate(1));
long start = System.currentTimeMillis();
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
System.out.println("time elapsed 1 " + timeElapsed);
System.exit(0);
private static AsyncFunction<ResultSet, ResultSet> iterate(final int page) {
return new AsyncFunction<ResultSet, ResultSet>() {
public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {
// How far we can go without triggering the blocking fetch:
int remainingInPage = rs.getAvailableWithoutFetching();
for (Row row : rs) {
// Process the row...
if (--remainingInPage == 0)
break;
}
boolean wasLastPage = rs.getExecutionInfo().getPagingState() == null;
if (wasLastPage) {
return Futures.immediateFuture(rs);
} else {
ListenableFuture<ResultSet> future = rs.fetchMoreResults();
return Futures.transform(future, iterate(page + 1));
}
}
};
}