SSE Spring Cloud Stream supplier

64 Views Asked by At

I'm trying to create a SSE Spring Cloud Stream supplier using the following code:

    @Bean
    fun sseSupplier(sseSupplierPub: Publisher<Message<Any>>): Supplier<Flux<Message<Any>>> {
        return Supplier {
            Flux.from(sseSupplierPub)
        }
    }

    @Bean
    fun sseSupplierPub(
        sseSupplierProperties: sseSupplierProperties
    ): Publisher<Message<Any>> {
        return IntegrationFlow.from(channelsConfiguration.trigger())
            .enrichHeaders {
                it.header(CONTENT_TYPE, "application/json")
                it.header(ACCEPT, "text/event-stream")
            }
            .handle(sseMessageHandlerSpec())
            .log()
            .toReactivePublisher(true)
    }

    fun webClient(): WebClient {
        val client = HttpClient.create(provider)
            .keepAlive(true)

        val clientRegistryRepo = InMemoryReactiveClientRegistrationRepository(
            ClientRegistration
                .withRegistrationId("clientId")
                .tokenUri("https://openid-connect/token")
                .clientId("clientId")
                .clientSecret("clientSecret")
                .authorizationGrantType(AuthorizationGrantType.CLIENT_CREDENTIALS)
                .build())
        val clientService = InMemoryReactiveOAuth2AuthorizedClientService(clientRegistryRepo)
        val authorizedClientManager =
            AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager(clientRegistryRepo, clientService)
        val oauthFilter = ServerOAuth2AuthorizedClientExchangeFilterFunction(authorizedClientManager)
        oauthFilter.setDefaultClientRegistrationId("clientId")

        return WebClient.builder()
            .filter(oauthFilter)
            .clientConnector(ReactorClientHttpConnector(client))
            .exchangeStrategies(strategies)
            .build()
    }

    fun sseMessageHandlerSpec(): WebFluxMessageHandlerSpec {
        return WebFlux.outboundGateway(
            "https://api/src", webClient())
            .httpMethod(HttpMethod.POST)
            .replyPayloadToFlux(true)
            .expectedResponseType(typeReference<ServerSentEvent<String>>())
    }

    @Bean
    fun commandLineRunner(ctx: ApplicationContext?): CommandLineRunner {
        return CommandLineRunner { _ ->
            val message = MessageBuilder.withPayload("{}").build()
            logger.info { "Trigger SSE supplier" }
            channelsConfiguration.trigger().send(message)
        }
    }

I'm able to get the authorisation token and connected to https://api/src successfully but I'm not able to get any response except for this message

{
    "scanAvailable": true,
    "prefetch": -1
}

when I'm connected. After receiving the above message, I will not receive any more messages. What could be the problem?

1

There are 1 best solutions below

1
On

I managed to receive data but I need to subscribe to body in the response (see the updated code below). The problem I'm facing now is how to map DataBuffer to ServerSentEvent.

    @Bean
    fun sseSupplier(sseSupplierPub: Publisher<Message<Any>>): Supplier<Flux<Message<Any>>> {
        return Supplier {
            Flux.from(sseSupplierPub)
        }
    }

    @Bean
    fun sseSupplierPub(
        sseSupplierProperties: sseSupplierProperties
    ): Publisher<Message<Any>> {
        return IntegrationFlow.from(channelsConfiguration.trigger())
            .enrichHeaders {
                it.header(CONTENT_TYPE, "application/json")
                it.header(ACCEPT, "text/event-stream")
            }
            .handle(sseMessageHandlerSpec())
            .channel(channelsConfiguration.sseMessage())
            .log()
            .toReactivePublisher(true)
    }

    fun webClient(): WebClient {
        val client = HttpClient.create(provider)
            .keepAlive(true)

        val clientRegistryRepo = InMemoryReactiveClientRegistrationRepository(
            ClientRegistration
                .withRegistrationId("clientId")
                .tokenUri("https://openid-connect/token")
                .clientId("clientId")
                .clientSecret("clientSecret")
                .authorizationGrantType(AuthorizationGrantType.CLIENT_CREDENTIALS)
                .build())
        val clientService = InMemoryReactiveOAuth2AuthorizedClientService(clientRegistryRepo)
        val authorizedClientManager =
            AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager(clientRegistryRepo, clientService)
        val oauthFilter = ServerOAuth2AuthorizedClientExchangeFilterFunction(authorizedClientManager)
        oauthFilter.setDefaultClientRegistrationId("clientId")

        return WebClient.builder()
            .filter(oauthFilter)
            .clientConnector(ReactorClientHttpConnector(client))
            .exchangeStrategies(strategies)
            .build()
    }

    fun sseMessageHandlerSpec(): WebFluxMessageHandlerSpec {
        return WebFlux.outboundGateway(
            "https://api/src", webClient())
            .httpMethod(HttpMethod.POST)
            .bodyExtractor { inputMessage, _ ->
                inputMessage.body.subscribe {
                    val bytes = ByteArray(it.readableByteCount())
                    it.read(bytes)
                    DataBufferUtils.release(it)
                    val output = String(bytes)

                    val message = MessageBuilder.withPayload(output).build()
                    logger.debug { "SSE message payload => ${message.payload}" }
                    channelsConfiguration.sseMessage().send(message)
                }
            }
    }

    @Bean
    fun commandLineRunner(ctx: ApplicationContext?): CommandLineRunner {
        return CommandLineRunner { _ ->
            val message = MessageBuilder.withPayload("{}").build()
            logger.info { "Trigger SSE supplier" }
            channelsConfiguration.trigger().send(message)
        }
    }