I want to write a java client consumer, such that it need to pull a set of messages from a jetStream. Here I want to either pull all of the messages or pull the last messages(batch size can be specified).
Is there anyway to implement without using PullSubscribeOptions.durable() ?
Because I found out that if I don't use the durable, everytime the server sends the same batch.
public static void main(String[] args)
throws IOException, InterruptedException, JetStreamApiException, IllegalStateException, TimeoutException {
String userName = "local";
String password = "password";
String host = "0.0.0.0";
String port = "37733";
String streamName = "test_stream";
String subjectName = "test_subject_1";
int BATCH_SIZE = 100;
Connection nc = Nats.connect("nats://" + userName + ":" + password + "@" + host + ":" + port);
JetStream js = nc.jetStream();
PullSubscribeOptions pullOptions = PullSubscribeOptions.builder()
.durable("configurator_service_9")
.stream(streamName)
.build();
outerloop:
while(true){
JetStreamSubscription sub = js.subscribe(subjectName, pullOptions);
List<Message> messages = sub.fetch(1000, Duration.ofMillis(1000));
for (Message m : messages) {
System.out.println(m);
m.ack();
if(m==null) {
System.out.println(m);
nc.flush(Duration.ofSeconds(1));
nc.close();
break;
}
}
System.out.println("Message count is " + messages.size());
}
By the way as I see we can pull the entire steam on nats-cli. I need to do this again and again.