spring integration flow loses response - with added information

46 Views Asked by At

This is a follow up to spring integration flow loses response which hopefully provides more info as described by Artem Bilan

This is the first flow

@Configuration
@ConditionalOnExpression(" condition evaluates to TRUE ")
public class BeanDefA {

  @Bean
  IntegrationFlow rest() {
    return IntegrationFlow.from((Http.inboundGateway("/myEndpoint")
        .requestMapping(r -> r.methods(HttpMethod.valueOf("POST")))
        .headerMapper(restConfig.httpHeaderMapper())
        .requestPayloadType(String.class)
        .replyTimeout(5000)
        .errorChannel(exceptionConfig.adapterErrorChannel())))

        .handle(hmacValidationService, "validateHMACAuthHeaders")

        .enrichHeaders(h -> h.headerFunction("message-correlation-id", m -> UUID.randomUUID().toString()))

        .publishSubscribeChannel(c -> c

            .subscribe(s -> s

                .routeToRecipients(r -> r

                    .recipient("backupChannelA", m -> true)

                    .recipient("backupChannelB", m -> false)

                    .defaultOutputToParentFlow())

                .channel("nullChannel"))

            .subscribe(s -> s

                .handle(messagePayloadService, "mapMessagePayload")

                .enrich(e -> e.requestChannel("claimCheck.input").headerExpression("claimCheck", "payload"))

                .handle("integrationGateway", "process")))
        .get();
  }

}

This line

.handle(messagePayloadService, "mapMessagePayload")

attaches a bean reference that defines the below method

    public boolean sendReply(Map<String, Object> headers) {
    MessageChannel replyChannel = (MessageChannel) headers.get("replyChannel");

    Message m = MessageBuilder.withPayload('"Status":"Success","ErrorMessage":""'')
       .setHeader(HttpHeaders.STATUS_CODE,response.getStatusCode())
       .setHeader("Content-Type", HEADER_FMT).build();

    return replyChannel.send(m);

}   

This is the output I wish to return to whoever sent the POST to /myEndpoint

Here's the 2nd flow

    @Configuration
@ConditionalOnExpression(" condition evaluates to TRUE ")
public class BeanDefB

  @Bean
  public IntegrationFlow dbWrite() {
    return f -> f

        <MessageItemPayloadInterface>handle((p, h) -> {

            Object o = writer.filterAndEnrich(p, h);

            if (o == null && h.get(KafkaHeaders.ACKNOWLEDGMENT) != null) {
                return null;
            }

            return o;

        }, e -> e.id("dbWriterFilterAndEnrichEndpoint"))

        .<MessageItemPayloadInterface>handle((p, h) -> {

            writer.save(p, h);
            return p;

        }, e -> e.id("dbWriterSaveEndpoint").transactional(false))

        .enrichHeaders(h -> h.headerFunction(IntegrationHeaders.PRODUCER_EVENT_KEY,
            m -> {
                String header = writer.generateEventHeader(m.getPayload(), m.getHeaders());
                return header == null ? "" : header;
            }))

        .<MessageItemPayloadInterface>handle((p, h) -> {

            Object o = writer.notify(p, h);

            if (o == null && h.get(KafkaHeaders.ACKNOWLEDGMENT) != null) {
                LOG("this is an error");
                h.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge();
                return null;
            }

            return o;

        }, e -> e.id("dbWriterNotifyEndpoint"))

        .publishSubscribeChannel(a -> a

            .subscribe(b -> b

                .routeToRecipients(r -> r

                    .recipient("eventProducerChannel", true)

                    .defaultOutputToParentFlow())

                .channel("nullChannel"))

            .subscribe(d -> d

                .handle((p, h) -> {

                    if (h.get(KafkaHeaders.ACKNOWLEDGMENT) != null) {

                        LOG.debug("Message processed! Headers= " + h + ". Payload= " + p);
                        h.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class).acknowledge();

                    }
                    return p;
                })

                .channel("nullChannel")))

        .channel("nullChannel");
  }

}

In the above "writer" is a bean that contains business logic.

Its methods are invoked, performing actions based on Message contents and returning NULL if there was some kind of error.

The output of writer.notify(p, h) is a message I wish to send via Kafka and, for some reason, its now being returned to the poor soul who POSTed to /myEndpoint

I have turned on debugging as we discussed previously and the only time I see '"Status":"Success","ErrorMessage":""'' is when I log it prior to returning it

I do not see it mentioned in any of the messages produced by the processing done by either flow.

This is, I think, an "orchestrator" class that puts it all together.

Its run time actions are controlled by config variables accessible at run time.

@Configuration
@IntegrationComponentScan
public class IntegrationProvider {

  public final Config config;

  public IntegrationProvider(Config config) {
    this.config = config;
  }

  @Bean
  public IntegrationFlow main() {
    return f -> f.routeToRecipients(r -> r.recipient(config.getEndpointType() + ".input"));
  }

  @MessagingGateway(name = "integrationGateway", errorChannel = "errorChannel")
  public interface IntegrationGateway {

    @Gateway(requestChannel = "main.input")
    void start(Object msg);

    @Gateway(requestChannel = "process.input")
    void process(Message<MessagePayloadInterface> msg);

    @Gateway(requestChannel = "write.input")
    void write(Message<MessageItemPayloadInterface> msg);

    @Gateway(requestChannel = "dbWrite.input")
    void dbWrite(Message<MessageItemPayloadInterface> msg);

    @Gateway(requestChannel = "send.input")
    void send(Message<MessageItemPayloadInterface> msg);
  }

}

Is it possible something changed between versions 5 & 6 and now flow 2 is somehow sharing the replyChannel with flow 1?

Or is relying on replyChannel as a way to communicate '"Status":"Success","ErrorMessage":""'' as the POST response the incorrect thing to do?

1

There are 1 best solutions below

0
Artem Bilan On

Turns out the problem of the target application that log() operator is used in the end of IntegrationFlow making it, essentially, as a request-reply. According to the Migration Guide from 5.5 to 6.0, the nullChannel() is recommended to be used in the end of flow, if there is no need in returning a reply upstream: https://github.com/spring-projects/spring-integration/wiki/Spring-Integration-5.x-to-6.0-Migration-Guide#log-at-the-end-of-a-flow