MQTT Listener to listen to a topic and insert data to database on multiple instance pointing to same database

617 Views Asked by At

So I've made a chat application using Java Spring Boot and MQTT. I've made an MQTT listener which listens to all the messages under the abc topic (i.e. abc/#). For MQTT broker I'm using Amazon MQ.

So what this application does is it listens to all the message and insert the data to the database which is working fine but when I deployed this application on multiple instance I'm facing duplicate data entries because both the instance is pointing to same database which is AWS RDS how to overcome this??

public void subscribe(String topic) throws MqttException {
    logger.info("client connected");

    mqttClient.subscribeWithResponse(topic, (currentTopic, msg) -> {

        logger.info("listened : " + new String(msg.getPayload()));

        try {
            Gson gson = new Gson();
            if (currentTopic.equals("Abc/onlineStatus")) {
                UserOnlineStatusVO userOnlineStatusVO = gson.fromJson(new String(msg.getPayload()), UserOnlineStatusVO.class);
                User user = userRepository.findById(userOnlineStatusVO.getUserId()).orElseThrow(() -> new Exception("user does not exist"));
                LocalDateTime lastSeen = LocalDateTime.parse(userOnlineStatusVO.getLastSeen(), format);
                user.setLastSeen(lastSeen);
                user.setOnlineStatus(userOnlineStatusVO.getOnlineStatus());
                userRepository.save(user);
            } else {
                GroupChatMessage groupChatMessage = gson.fromJson(new String(msg.getPayload()), GroupChatMessage.class);
                if (Boolean.FALSE.equals(groupChatMessage.getIsTesting())) {
                    LocalDateTime sendingAt = LocalDateTime.parse(groupChatMessage.getPublishedAt(), sendingAtFormat);
                    groupChatMessage.setSendingAt(sendingAt);
                    groupChatMessageRepository.save(groupChatMessage);
                    logger.info("data inserted : " + groupChatMessage);

                }
            }
        } catch (Exception e) {
            System.err.println(e.getMessage());
        }
    });
}
1

There are 1 best solutions below

0
Justin Bertram On

What you're seeing is the expected behavior. MQTT is fundamentally a pub/sub protocol where all the subscribers on a particular topic (or set of topics) get every message published to that topic (or set of topics).

I see two ways to tackle the problem of duplicate entries in your database.

  1. Make the insertions into the database idempotent.
  2. Coordinate each instance of this application so that only one instance is ever active at any point in time.

If you used a broker that supported MQTT 5 (e.g. ActiveMQ Artemis) then you could use a "shared" subscription in which case the applications should share all the messages published to the topic (or set of topics) and there would be no duplicate consumption or database inserts.