I have a very simple application that uses the functional approach to consume messages from a Kafka topic, process their payloads and send them to a different topic.

I am using Spring Boot 2.7.5 and Spring Cloud Starter Stream Binder Kafka 3.2.6 (managed by Spring Cloud 2021.0.5).

The inbound message contains a header MY_PARTITION_ID which is an integer that indicates the number of the partition in which the outbound message is supposed to produced to.

My application.properties is:

spring.application.name=some-custom-app

spring.cloud.stream.function.definition=someCustomProcessor

spring.cloud.stream.bindings.someCustomProcessor-in-0.destination=some-input-topic
spring.cloud.stream.bindings.someCustomProcessor-in-0.group=${spring.application.name}

spring.cloud.stream.bindings.someCustomProcessor-out-0.destination=some-output-topic
spring.cloud.stream.bindings.someCustomProcessor-out-0.producer.partitionKeyExpression=headers['MY_PARTITION_ID']

And I have a processor like (here I simplified the processing to just add an extra header to the outbound message):

@Component
class CustomProcessors {

    @Bean
    fun someCustomProcessor(): (Message<String>) -> Message<String> =
        { message ->
            MessageBuilder
                .fromMessage(message)
                .setHeader("SOME_CUSTOM_HEADER", "some-custom-value")
                .build()
        }
}

Both input and output topics are already created and have 5 partitions.

If I comment out the partitionKeyExpression line in application.properties and send several messages with MY_PARTITION_ID=3 to the input topic then it is easy to see that the output messages are evenly distributed in the partitions of output topic, as expected.

However, when partitionKeyExpression is enabled I would expect to see all of the output messages go to partition 3 of the output topic. Nevertheless, they all went to partition 0.

I debugged the code and I found org.springframework.cloud.stream.binder.PartitionHandler#determinePartition(Message<?>). The last line reads:

return Math.abs(partition % this.partitionCount);

In runtime field this.partitionCount had value 1 (the default) instead of 5 (the actual value configured in the cluster).

I digged more and I found the snippet in org.springframework.cloud.stream.binding.MessageConverterConfigurer#configureMessageChannel(MessageChannel, String, boolean)

if (partitioned) {
   if (inbound || !functional) {
      messageChannel.addInterceptor(new PartitioningInterceptor(bindingProperties));
   }
}

Apparently, PartitioningInterceptor would have set correctly that partitionCount property but it is never added to the list of interceptors because although the outbound channel is partitioned it is also functional.

After that rather long explanation (sorry for that but I felt that it was necessary) my questions are:

  1. why is there such a restriction for adding a partitioning interceptor in an outbound functional channel? Any architectural reason? It got me really curious.

  2. assuming that it is correct, is there any other property that must be used together with partitionKeyExpression to achieve my goal of determining the outbound partition through an inbound header? I know I can set the partitionCount myself in the properties file but that is not desirable because in my project the topics already exists and they are maintained by a different team.

For now I am programmatically setting header KafkaHeaders.PARTITION_ID in the outbound message in the processor code. It is working but it would be nice if I could have the same effect just by configuring some properties.

Thanks in advance

Extra info

As pointed out by @sobychacko, we can see here that PartitionInfo is retrieved from cluster. Debugging at this point we see that there are indeed 5 partitions (preconfigured in my cluster).

However, when we get here, where the partition is actually calculated, we see that the partitionCount is set to 1, not 5.

In order for the partitionCount to be set to 5 in PartitionHandler it needs to be set by a PartitioningInterceptor, that must be in the list of interceptors as pointed out by @sobychacko here.

However, it seems that the only place where a PartitioningInterceptor is ever instantiated is here, which is the if that combines the concepts of inbound and functional.

1

There are 1 best solutions below

3
On

When relying on the binder-provided partitioning capabilities to select a partition, you must also provide the partitionCount property. In your case, this becomes spring.cloud.stream.bindings.someCustomProcessor-out-0.producer.partitionCount.

Here is a configuration from a sample application. Also, check out this blog for more details on the binder-provided partition.

I believe the output bindings use the partitioning interceptors in the Kafka binder and extract the partition count from the above property. See this and this.

If things still don't work after providing the property, please create a small application and we can triage the issue further.