Pull last batch of messages from a nats jetstream

694 Views Asked by At

I want to write a java client consumer, such that it need to pull a set of messages from a jetStream. Here I want to either pull all of the messages or pull the last messages(batch size can be specified).

Is there anyway to implement without using PullSubscribeOptions.durable() ?

Because I found out that if I don't use the durable, everytime the server sends the same batch.

public static void main(String[] args)
  throws IOException, InterruptedException, JetStreamApiException, IllegalStateException, TimeoutException {

String userName = "local";
String password = "password";
String host = "0.0.0.0";
String port = "37733";
String streamName = "test_stream";
String subjectName = "test_subject_1";

int BATCH_SIZE = 100;

Connection nc = Nats.connect("nats://" + userName + ":" + password + "@" + host + ":" + port);
JetStream js = nc.jetStream();

PullSubscribeOptions pullOptions = PullSubscribeOptions.builder()
      .durable("configurator_service_9")
    .stream(streamName)
    .build();

outerloop:
while(true){
  JetStreamSubscription sub = js.subscribe(subjectName, pullOptions);
  List<Message> messages = sub.fetch(1000, Duration.ofMillis(1000));
  for (Message m : messages) {
    System.out.println(m);
    m.ack();
    if(m==null) {
      System.out.println(m);
      nc.flush(Duration.ofSeconds(1));
      nc.close();
      break;
    }
  }
  System.out.println("Message count is " + messages.size());
}

By the way as I see we can pull the entire steam on nats-cli. I need to do this again and again.

0

There are 0 best solutions below