Spring cloud Stream function not recognized

647 Views Asked by At

I have a Spring Boot Kafka streaming application. After upgrading to SB3, the spring.cloud.function.definition is not recognized anymore.

I enabled debug logs and was of help. I see these messages in logs:

Multiple functional beans were found [myEvents, sendToDlqAndContinue], thus can't determine default function definition. Please use 'spring.cloud.function.definition' property to explicitly define it. 

This is my application.yml:

spring:
  application:
    name: @project.artifactId@
  cloud:
    config:
      name: my-app
    stream:
      default:
        producer:
          useNativeEncoding: true
      function:
        definition:myEvents
        ineligible-definitions: sendToDlqAndContinue

These are the dependencies I have:


<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-dependencies</artifactId>
        <version>3.1.5</version>
    </parent>
<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
</dependencies>

I have tried debugging the Cloud stream Kafka classe which is reporting the error, BeanFactoryAwareFuntionRegistry.java

and found that this.applicationContext.getEnvironment().getProperty(FunctionProperties.FUNCTION_DEFINITION, ""); is ""

It does contain ineligible-definitions I have in yml, but not the definition

Any clues are highly appreciated. Maybe there are some changes in SpringBoot 3 related to cloud streams? Any missing dependency?

Update

I tried to run the app after adding the dependency:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-properties-migrator</artifactId>
    <scope>runtime</scope>
</dependency>

The application now consumes messages, but the dependency does not log the unsupported or migrated properties. How can I know which property is removed/migrated??

I find no official documentation about changes in new release.

1

There are 1 best solutions below

0
On BEST ANSWER

I was changed spring.cloud.stream.function to spring.cloud.function, it works for me.

TO-BE application.yml

spring:
  application:
    name: @project.artifactId@
  cloud:
    config:
      name: my-app
    function:
      definition:myEvents
      ineligible-definitions: sendToDlqAndContinue
    stream:
      default:
        producer:
          useNativeEncoding: true

But, Official Document says both of configuration works.

To specify which functional bean to bind to the external destination(s) exposed by the bindings, you must provide spring.cloud.stream.function.definition or native to spring-cloud-function spring.cloud.function.definition property.

And, you should be check this url, it works for me.

I changed class to function.

I hope this helps.