Using DynamoDBEnhancedAsyncClient to scan and fetch futureObject

2k Views Asked by At

I am trying to use v2 library to persist & retrieve data in non-blocking manner. Put method of DynamoDBEnhancedAsyncClient returns CompletableFuture object but scan and query methods return PagePublisher object - that tends to tell me that this is a blocking call. Can someone please help me understand/fix this. I want to implement end-to-end non-blocking calls. I tried with DynamoAsyncClient and that works perfect but I want to get rid of manually mapping of objects using DynamoDBEnhancedAsyncClient*, but I see no method that returns CompletableFutures.

Here is my code block

DynamoDbAsyncTable<User> asyncTable = dynamoDBEnhancedAsyncClient.table("userTable", TableSchema.fromBeab(User.class));
Map<String, AttribiuteValue> expVal = new HashMap();
expVal.put(":val", AttributeValue.builder().n(String.valueOf(userId)).build());
Expression exp = Expression.builder().expression("userId = :val").expressionValues(expVal).build();
ScanEnhancedRequest req  = ScanEnhancedRequest.builder().filterExpression(exp).build();
PagePublisher<User> pagePublisher = asyncTable.scan(req);

Dependencies I used

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactid>
<version>2.10.76</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb-enhanced</artifactid>
<version>2.12.0</version>
</dependency>
1

There are 1 best solutions below

0
On

AWS SDK v2 leverages reactive streams to build its asynchronous functions.

PagePublisher<T> won't make your call blocking, this class implements the Publisher(doc) interface which allows you to subscribe on it.

Option1

According to your question which you would like to transfer Publisher to CompletableFuture, here is a rough example of how to do it:

var publisher = asyncTable.scan(req);
var future = new CompletableFuture<Page<User>>();
publisher.subscribe(
    new Subscriber<>() {
      @Override
      public void onSubscribe(Subscription s) {
        s.request(1);
      }

      @Override
      public void onNext(Page<User> userPage) {
        future.complete(userPage);
      }

      @Override
      public void onError(Throwable t) {
        future.completeExceptionally(t);
      }

      @Override
      public void onComplete() {
        future.complete(null);
      }
    });
var result = future.join();

Option2 (Recommended)

However, I saw you tagged this question with spring-boot and you mention that you want to implement nonblocking end to end calls.

I highly recommend you to integrate Spring Webflux with AWS SDK v2, which makes you to create a nonblocking/reactive web service easier.

By adopting Spring Webflux, you can integrate your code like:

Mono.from(asyncTable.scan(req))

which makes the code cleaner and simpler.