I am using spring boot to connect to a Pulsar message broker but I cannot access the message payload no matter what class I use in the call to:

//...
void listen(<T> message) 
//...

I'm using Intellij 2023.1 (Community edition)

The full class code is:

package dan.teachingagency.covernotices;

import com.fasterxml.jackson.databind.ObjectMapper;
import dan.teachingagency.Exception.MapperException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.schema.SchemaType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.listener.AckMode;
import org.springframework.stereotype.Service;


@Service
@SpringBootApplication
@EnablePulsar
public class CovernoticesApplication {
    public static void main(String[] args) {

        SpringApplication.run(CovernoticesApplication.class, args);
    }

//This was for my debugging purposes.
    private int messageCount = 0;

    @Autowired
    public TeacherRepository teacherRepository;

    ObjectMapper mapper = new ObjectMapper();

    @PulsarListener(
            subscriptionName = "TeacherAdminSub",
            topics = "persistent://school1/admin_topics/userdetails",
            schemaType = SchemaType.JSON,
            ackMode = AckMode.RECORD)

    void listen(Message<String> message) {
//Initially messages weren't arriving. Then they kept getting re-delivered every time
//I started the application until I changed the ackMode as shown
messageCount++;
        System.out.println("["+new java.util.Date(System.currentTimeMillis())+"] Message ["+messageCount+"]received.");

//If I uncomment this code (which has a call to the Spring Boot message implementation, I think
//the call deadlocks because the line of code does not appear to run; nothing is output.

        //System.out.println("["
        //      +new java.util.Date(System.currentTimeMillis())
    //          +"] Message received ["
//              +this.messageCount+"]: ["
                //+message.getPayload()+"].");

//This is the equivalent in the Pulsar API implementation. It also does not run.
        //System.out.println("Message value: ["+message.getValue()+"]");

        /*Map<String,String> messageProperties = message.getProperties();
        String messageType = messageProperties.get("TYPE");
        System.out.println("Message type: ["+messageType+"]");
        if("TEACHER".equals(messageType)) {
            System.out.println("Saving message now. ["+message.getValue()+"]");
            StaffMember teacher = fromJson(message.getValue(), StaffMember.class);
            System.out.println("Teacher=["+teacher+"]");
            teacherRepository.save(teacher);
            System.out.println("Saved message:[" + message.getValue() + "]");
        } else {
            System.out.println("Received rogue message ["+message.getValue()+"]");
        }*/
    }

    /**
     * Convert json to Object
     * @param json String json value
     * @param clazz Instances of the class
     * @param <T> Object Class
     * @return Object class
     */
    private <T > T fromJson(String json, Class < T > clazz) {
        try {
            return mapper.readValue(json, clazz);
        } catch (Exception e) {
            throw new MapperException(e.getMessage());
        }
    }
}

I have tried: Googling for similar problems Inserting debugging statements Swapping the Message API implementation between Spring Boot and Pulsar APIs Turning on DEBUG mode to see if any 'hidden' exceptions are being thrown (none are)

1

There are 1 best solutions below

4
On

I checked the quick start code given by @onobc in the comments but discovered this was code I originally started learning with anyway. It worked but its Message parameter in the listener method (void listen...) was just a String. That had worked for me anyway. It was the Message.getValue() that was causing the problem which I have now solved.

In the end, I took the following steps and it started working. I'm not certain but I suspect the problem was caused by some kind of corruption on the pulsar broker which makes it most likely step 1 (below) solved it. I think this because I found some errors in my pulsar broker start-up logs saying there was some kind of bookkeeper corruption - the broker was set to ignore unrecoverable errors in the journals but it noted them in the logs anyway. The behaviour of message delivery was haphazard, sometimes messages would keep getting re-delivered and other times, they would behave as expected at first then suddenly all get re-delivered again. This now doesn't happen. Message arrive, are ACK'd by the API ('under the hood' and are never delivered again as expected):

  1. Deleted the topic and allowed to auto-create on client startup (listener / consumer startup) (using pulsar-admin client in the docker container I'm running my pulsar standalone broker)

  2. Opened the docker terminal for my pulsar standalone broker and used the pulsar admin client to allow schema update:

    bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable school1/admin_topics

  3. Removed unused reactive pulsar Spring Boot dependencies from my pom.xml - just in case of conflicting classes though I was fairly certain there were none because I checked all my imports) and reloaded the project via the Maven menu (right click on top-level module node->maven->Reload project). Those dependencies were there because I was originally thinking of using reactive pulsar but decided against, for now.

  4. Removed various Pulsar annotations from the class and changed the Message parameter in void listen(){...} to Message. Generally amended the class code as follows:

    package dan.teachingagency.covernotices;

    import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.common.schema.SchemaType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.pulsar.annotation.PulsarListener;

    @SpringBootApplication public class CovernoticesApplication { public static void main(String[] args) { SpringApplication.run(CovernoticesApplication.class, args); } private int messageCount = 0;

     @Autowired
     public TeacherRepository teacherRepository;
    
     @PulsarListener(
             subscriptionName = "TeacherAdminSub",
             topics = "persistent://school1/admin_topics/userdetails",
             schemaType = SchemaType.JSON)
     void listen(Message<StaffMember> message) {
         messageCount++;
         StaffMember staffMember = message.getValue();
         teacherRepository.save(staffMember);
         System.out.println("["+new java.util.Date(System.currentTimeMillis())+"] Received message ["+messageCount+"] with data received: ["+staffMember+"]");
     }
    

    }

Other than corruption on the broker, the only other possibility I can think of is the Pulsar annotations I removed since they are the only other significant change I made to the code. I had previously tried: void listen(Message){...} and it also locked in Message.getValue() so I don't think having it as 'String message' or Message was the problem.