java-nats-streaming: Publishing Messages after Server Reconnects

873 Views Asked by At

I have a NATS streaming cluster with 3 nodes set up. It seems that NATS messages published by my java application during server downtime is lost (i.e. not republished again when my servers are back up and running).

A more detailed description:

  1. NATS cluster online. Publisher and Subscriber applications come online. Publisher begins to publish a message every second. Subscriber receives messages.
  2. NATS servers are shut off. Publisher continues to publish messages (let's call these messages 'offline messages'). Subscriber stops receiving anything
  3. NATS servers come back online. Subscriber begins to receive messages again, but 'offline messages' are never received.

Both my publisher and subscriber applications are configured to attempt reconnection to NATS server and does not timeout. I do not get any exceptions throughout.

NATS connection:

Options options = new Options.Builder().servers(serverList).maxReconnects(-1).build();

Connection nc = Nats.connect(options);

StreamingConnectionFactory cf = new StreamingConnectionFactory(natsProperties.getClusterId(), natsProperties.getClientId());
cf.setNatsConnection(nc);
streamingConnection = cf.createConnection();

Publisher:

// subject and message String variables are passed in
streamingConnection.publish(subject, message.getBytes());

Subscriber:

streamingConnection.subscribe(subject, new MessageHandler() {
    public void onMessage(Message m) {
        System.out.prinf("Received msg: %s\n", m.getData())
    }
},  new SubscriptionOptions.Builder().durableName(durableName).build());

From the docs, the Java NATS client seems to have a reconnect buffer built in. I tried increasing the buffer by a factor of 10 but to no avail (also, my messages consist only of 2-digit numbers). How do I get it to resend these 'offline messages'?

1

There are 1 best solutions below

0
On

I have the same problem, the only solution that I see that another method of subscription is occupied, save the sequence of messages but this I do not think is the best

   // Receive messages starting at a specific sequence number
   sc.subscribe("foo", new MessageHandler() {
   public void onMessage(Message m) {
     logger.info("Sequence message " +  m.getSequence());
     System.out.printf("Received a message: %s\n", m.getData());
   }
   }, new SubscriptionOptions.Builder().startAtSequence(22).build());