I'm using Apache Pulsar for the first time but finding it hard to publish and listen to custom objects. I have defined schemas as directed by this reference pages but for some reason it’s not working.
@SpringBootApplication
public class QuickbooksApplication {
public static void main(String[] args) {
SpringApplication.run(QuickbooksApplication.class, args);
}
@Data
@Builder
@ToString
@NoArgsConstructor
@AllArgsConstructor
public static class SomeClass {
private String someVariable;
}
@Bean
ApplicationRunner runner(PulsarTemplate<SomeClass> pulsarTemplate) {
pulsarTemplate.setSchema(JSONSchema.of(SomeClass.class));
SomeClass someClass = SomeClass.builder().someVariable("Hello World!!!").build();
return (args) -> pulsarTemplate.send("hello-pulsar", someClass);
}
@PulsarListener(
subscriptionName = "hello-pulsar-subscription",
topics = "hello-pulsar",
schemaType = SchemaType.AUTO_CONSUME)
void listen(SomeClass message) {
System.out.println("Message Received: " + message);
}
}
When I run this I simply get two errors,
java.lang.IllegalAccessException: class org.apache.pulsar.common.util.netty.DnsResolverUtil cannot access class sun.net.InetAddressCachePolicy (in module java.base) because module java.base does not export sun.net to unnamed module @544ff9ef at java.base/jdk.internal.reflect.Reflection.newIllegalAccessException(Reflection.java:392) ~[na:an]
java.lang.NullPointerException: Cannot invoke "org.apache.pulsar.client.api.Schema.getSchemaInfo()" because the return value of "org.springframework.pulsar.listener.PulsarContainerProperties.getSchema()" is null
When I consume it as string though, everything works.
Any help will be appreciated. Thanks in advance.
The 1st error is due to Java17 strong encapsulation of internals by default. To get around this you end up having to
add-opens
in all the places you are launching. Here is an example that sets it for thebootRun
when using the Spring Boot Gradle Plugin to launch the app. This is an annoyance, but not the cause of the final NPE.The actual cause is that
SchemaType.AUTO_CONSUME
is not supported in@PulsarListener
. Set it toSchemaType.JSON
. See here for more details.