Introduction
I'm currently using EnableBinding
, StreamListener
from Spring v2: https://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/2.0.0.RELEASE/org/springframework/cloud/stream/annotation/package-summary.html
The code of my project that uses these annotations:
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import com.enterprise.production.model.exceptions.ProductionStoreException;
import com.enterprise.production.model.message.EnergyProductionMessage;
import com.enterprise.production.stream.service.ProductionStoreService;
import lombok.extern.slf4j.Slf4j;
@EnableBinding(Sink.class)
@Slf4j
@Service
public class ProductionMessageConsumer {
@Autowired
private ProductionStoreService productionService;
@Autowired
private Clock clock;
@StreamListener(target = Sink.INPUT)
public void handleEnergyProductionMessage(@Payload EnergyProductionMessage energyProductionMessage) throws ProductionStoreException {
Instant start = clock.instant();
log.debug("Processing energy productions message with original interval: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
energyProductionMessage.getDeviceId());
log.info("Processing {} energy productions ", energyProductionMessage.getSolarEnergies().size());
productionService.saveProductions(energyProductionMessage);
log.debug("Ending energy productions message with original interval: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
.between(start, clock.instant()).toMillis());
Instant startNormalization = clock.instant();
log.debug("Processing energy productions message with normalization 30m: {}|{}|{}", energyProductionMessage.getTenantId(), energyProductionMessage.getUsername(),
energyProductionMessage.getDeviceId());
productionService.saveProductions30m(energyProductionMessage);
log.debug("Ending energy productions message with normalization 30m: {}|{}|{}: ended in {}ms", energyProductionMessage.getTenantId(),
energyProductionMessage.getUsername(), energyProductionMessage.getDeviceId(), Duration
.between(startNormalization, clock.instant()).toMillis());
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
log.error("Fail to read message with error '{}'", message.getPayload());
}
}
Problem
I need to migrate to Spring v4 but these annotations are not available in Spring v4: https://www.javadoc.io/doc/org.springframework.cloud/spring-cloud-stream/4.0.0/org/springframework/cloud/stream/annotation/package-summary.html
Question
Someone does know how to migrate these annotation Spring v2 to Spring v4 ?
The annotation you are using are deprecated and mentionned in the appropriate section of 3.2.x of Spring Cloud Stream here
When following the link above, you'll see that
In order to move to v4, you need to declare your actual configuration of
@StreamListener
to actual Spring beans defined in a specific class with@org.springframework.context.annotationConfiguration
annotationFor the error handling part, by defaut it creates a log with the message payload but you can follow this section on how to handle it manually