Pagination in CosmosDB Java SDK with continuation token

673 Views Asked by At

I'm trying to create from an async client a method to retrieve items from a CosmosDB but I'm afraid I'm full of questions and little to no documentation from Microsoft side

I've created a function that will read from a cosmosDB a list of items, page by page, which continuation will depend on a continuityToken. The methos looks like this. Please, be aware there could be some minor mistakes non related to the core functionality which is reading page by page:

  @FunctionName("Feed")
  public HttpResponseMessage getFeed(
    @HttpTrigger(
      name = "get",
      methods = { HttpMethod.GET },
      authLevel = AuthorizationLevel.ANONYMOUS,
      route = "Feed"
    ) final HttpRequestMessage<Optional<String>> request,
    @CosmosDBInput(
      name = "Feed",
      databaseName = Constants.DATABASE_NAME,
      collectionName = Constants.LOG_COLLECTION_NAME,
      sqlQuery = "SELECT * FROM c",  // This won't be used actually as we use our own query
      connectionStringSetting = Constants.CONNECTION_STRING_KEY 
    ) final LogEntry[] logEntryArray,
    final ExecutionContext context
  ) {
      context
      .getLogger()
      .info("Query with paging and continuation token");
      String query = "SELECT * FROM c"

      int pageSize = 10; //No of docs per page
      int currentPageNumber = 1;
      int documentNumber = 0;
      String continuationToken = null;

      double requestCharge = 0.0;

      // First iteration (continuationToken = null): Receive a batch of query response pages
      // Subsequent iterations (continuationToken != null): Receive subsequent batch of query response pages, with continuationToken indicating where the previous iteration left off
      do {
          context
          .getLogger()
          .info("Receiving a set of query response pages.");
          context
          .getLogger()
          .info("Continuation Token: " + continuationToken + "\n");

          CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();

          Flux<FeedResponse<LogEntry>> feedResponseIterator =
                  container.queryItems(query, queryOptions, LogEntry.class).byPage(continuationToken,pageSize);

          try {

              feedResponseIterator.flatMap(fluxResponse -> {
                  context
                  .getLogger()
                  .info("Got a page of query result with " +
                          fluxResponse.getResults().size() + " items(s)"
                          + " and request charge of " + fluxResponse.getRequestCharge());

                  context
                  .getLogger()
                  .info("Item Ids " + fluxResponse
                          .getResults()
                          .stream()
                          .map(LogEntry::getDate)
                          .collect(Collectors.toList()));

                  return Flux.empty();
              }).blockLast();

          } catch (Exception e) {
              
          }

      } while (continuationToken != null);

      context
      .getLogger()
      .info(String.format("Total request charge: %f\n", requestCharge));
      return request
                .createResponseBuilder(HttpStatus.OK)
                .header("Content-Type", "application/json")
                .body("ALL READ")
                .build();
  }  

For simplicity the read items are merely logged.

First question: We are using an async document client that returns a Flux. Will the client keep track of the token? It is a stateless client in principle. I understand that the sync client could take easily care of this case, but wouldn't the async client reset its memory of tokens after the first page and token has been generated?

Second: Is the while loop even appropriated? My assumption is a big no, as we need to send back the token in a header and the frontend UI will need to send the token to the Azure Function in a header or other similar fashion. The token should be extracted from the context then

Third: Is the flatMap and blockList way to read the flux appropriate? I was trying to play with the subscribe method but again I don't see how it could work for an async client.

Thanks a lot, Alex.

UPDATE:

I have observed that Flux only uses the items per page value to set the number of items to be retrieved per batch, but after retrieval of one page it doesn't stop and keeps retrieving pages! I don't know how to stop it. I have tried substituting the Flux.empty() per Mono.empty() and setting a LIMIT clause in the sql query. The first option does the same and the second freezes the query and never returns apparently. How can I return one page an only one page along with the continuation token to do the following query once the user clicks on the next page button?

0

There are 0 best solutions below