GraalVM compiled version of Spring Cloud Stream Kafka with Protobuf not able to serializer

98 Views Asked by At

I have a Spring Cloud Stream project with Kafka and Protobuf, the regular version works fine.

After compile with Graalvm it failed to producer Protobuf messages , the consumer works fine.

My project yaml is :

spring.cloud.stream:
    instanceCount: 1
    default-binder: kafka
    default:
        producer.useNativeEncoding: true
        consumer.useNativeEncoding: true
    kafka:
        binder:
            auto-create-topics: false
            brokers: ${spring.kafka.properties.bootstrap.servers}
            consumer-properties:
                schema.registry.url: ${spring.kafka.properties.schema.registry.url}
                key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
                specific.protobuf.value.type: proto.dataFusion.VehicleInfo
                auto.offset.reset: latest
                auto.commit.enable: false
                ack.mode: manual_immediate
                isolation.level: read_committed
                max.poll.records: 150
                fetch.min.bytes: 1048576 # 1MB
                fetch.max.wait.ms: 1000
            producer-properties:
                schema.registry.url: ${spring.kafka.properties.schema.registry.url}
                value.serializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer
                enable.idempotence: false

I had a simular issue for consumer that was fixed after I set specific.protobuf.value.type property.

If value.serializer of the producer is set to String it works fine in the compiled version, but fail with Protobuf Serializer

But for the producer was no able to fix , I having the error :

2024-01-25T08:50:05.062-03:00 ERROR 8025 --- [sp-sc-cfc] [           main] o.s.cloud.stream.binding.BindingService  : Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder checking the topic (cfc_plates_to_check_muniz):
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:684) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionInfoForProducer(KafkaTopicProvisioner.java:600) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:422) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:168) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:310) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:102) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:153) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:353) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:294) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:311) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:315) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:115) ~[sp-sc-cfc:4.1.0]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[sp-sc-cfc:4.1.0]
    at [email protected]/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:647) ~[na:na]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:59) ~[sp-sc-cfc:4.1.0]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:284) ~[sp-sc-cfc:6.1.1]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:467) ~[sp-sc-cfc:6.1.1]
    at [email protected]/java.lang.Iterable.forEach(Iterable.java:75) ~[sp-sc-cfc:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:256) ~[sp-sc-cfc:6.1.1]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:201) ~[sp-sc-cfc:6.1.1]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:965) ~[sp-sc-cfc:6.1.1]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:619) ~[sp-sc-cfc:6.1.1]
    at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:66) ~[sp-sc-cfc:3.2.0]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:753) ~[sp-sc-cfc:3.2.0]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:455) ~[sp-sc-cfc:3.2.0]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:323) ~[sp-sc-cfc:3.2.0]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1342) ~[sp-sc-cfc:3.2.0]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1331) ~[sp-sc-cfc:3.2.0]
    at io.mobi7.sc.cfc.CFCServiceApplication.main(CFCServiceApplication.java:23) ~[sp-sc-cfc:na]
Caused by: java.lang.RuntimeException: Failed to obtain partition information for the topic cfc_plates_to_check_muniz
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.lambda$getPartitionsForTopic$9(KafkaTopicProvisioner.java:657) ~[sp-sc-cfc:4.1.0]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[sp-sc-cfc:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209) ~[sp-sc-cfc:na]
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:622) ~[sp-sc-cfc:4.1.0]
    ... 28 common frames omitted


2024-01-25T08:50:05.064-03:00 ERROR 8025 --- [sp-sc-cfc] [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'sp-sc-cfc.platesToCheck-out-0'., failedMessage=GenericMessage [payload=plate: "SIXXXXX"
, headers={sequenceNumber=1, correlationId=63e8b192-568c-c9b1-0867-52deb3eefce4, id=672f42d3-a458-0c59-b2e4-5841806869f2, sequenceSize=0, timestamp=1706183405063}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:228)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:210)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:501)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:356)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:285)
    at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:317)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:249)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:151)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:206)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:481)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:467)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:419)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:355)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:348)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
    at [email protected]/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at [email protected]/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at [email protected]/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at [email protected]/java.lang.Thread.run(Thread.java:840)
    at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:775)
    at org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:203)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=plate: "SIE7D4K"
, headers={sequenceNumber=1, correlationId=63e8b192-568c-c9b1-0867-52deb3eefce4, id=672f42d3-a458-0c59-b2e4-5841806869f2, sequenceSize=0, timestamp=1706183405063}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 63 more

Here is the kafka and protobuf hints using :

 hints.reflection().registerType(NullContextNameStrategy.class, MemberCategory.INVOKE_DECLARED_METHODS);
        hints.reflection().registerType(NullContextNameStrategy.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);

        hints.reflection().registerType(TopicNameStrategy.class, MemberCategory.INVOKE_DECLARED_METHODS);
        hints.reflection().registerType(TopicNameStrategy.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);

        hints.reflection().registerType(Schema.class, MemberCategory.INVOKE_DECLARED_METHODS);
        hints.reflection().registerType(Schema.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);

        hints.reflection().registerType(SchemaString.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(SchemaString.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(SchemaReference.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(SchemaReference.class, MemberCategory.INVOKE_DECLARED_METHODS);

 //Serializers
        hints.reflection().registerType(KafkaProtobufDeserializer.class, MemberCategory.INVOKE_DECLARED_METHODS);
        hints.reflection().registerType(KafkaProtobufDeserializer.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);

        hints.reflection().registerType(KafkaProtobufSerializer.class, MemberCategory.INVOKE_DECLARED_METHODS);
        hints.reflection().registerType(KafkaProtobufSerializer.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);

        //To create Protobuf
        hints.reflection().registerType(DescriptorProtos.FieldOptions.Builder.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.FieldOptions.Builder.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.FieldOptions.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.FieldOptions.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.FieldOptions.newBuilder().getCtype().getClass(), MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.FieldOptions.newBuilder().getCtype().getClass(), MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.FieldOptions.newBuilder().getJstype().getClass(), MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.FieldOptions.newBuilder().getJstype().getClass(), MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.FieldOptions.OptionRetention.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.FieldOptions.OptionRetention.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.FieldOptions.OptionTargetType.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.FieldOptions.OptionTargetType.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.UninterpretedOption.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.UninterpretedOption.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.MessageOptions.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.MessageOptions.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.MessageOptions.Builder.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.MessageOptions.Builder.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.FileOptions.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.FileOptions.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.FileOptions.Builder.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.FileOptions.Builder.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.FileOptions.OptimizeMode.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.FileOptions.OptimizeMode.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.EnumValueOptions.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.EnumValueOptions.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.EnumValueOptions.Builder.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.EnumValueOptions.Builder.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.EnumOptions.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.EnumOptions.class, MemberCategory.INVOKE_DECLARED_METHODS);

        hints.reflection().registerType(DescriptorProtos.EnumOptions.Builder.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
        hints.reflection().registerType(DescriptorProtos.EnumOptions.Builder.class, MemberCategory.INVOKE_DECLARED_METHODS);

There is some others hits for the business ProtoBuf as well

I tried set a specific.protobuf.value.type to the produce.

I expect producer works in a compiled version like the interpreter one's

1

There are 1 best solutions below

8
sobychacko On

This is likely because we are missing a native hint for io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer. See the hint for JsonSerializer in Spring for Apache Kafka, for example. You can try adding a hint for KafkaProtobufSerializer in your application. If it works, add the hint to Oracle's graalvm-reachability-metadata repository by sending a PR there. Specifically, this is where the hint needs to get added. This way, not all applications don't have to add this custom hint in the applications.

Example code in : https://github.com/mmuniz75/scs-producer-graalvm

Update based on debugging the sample app: You are missing a native hint for io.confluent.kafka.serializers.subject.DefaultReferenceSubjectNameStrategy. You need the following hints:

hints.reflection().registerType(DefaultReferenceSubjectNameStrategy.class, MemberCategory.INVOKE_DECLARED_CONSTRUCTORS);
hints.reflection().registerType(DefaultReferenceSubjectNameStrategy.class, MemberCategory.INVOKE_DECLARED_METHODS);

See this for more details:https://github.com/spring-cloud/spring-cloud-stream/issues/2896#issuecomment-1928598548