I'm trying to create a SSE Spring Cloud Stream supplier using the following code:
@Bean
fun sseSupplier(sseSupplierPub: Publisher<Message<Any>>): Supplier<Flux<Message<Any>>> {
return Supplier {
Flux.from(sseSupplierPub)
}
}
@Bean
fun sseSupplierPub(
sseSupplierProperties: sseSupplierProperties
): Publisher<Message<Any>> {
return IntegrationFlow.from(channelsConfiguration.trigger())
.enrichHeaders {
it.header(CONTENT_TYPE, "application/json")
it.header(ACCEPT, "text/event-stream")
}
.handle(sseMessageHandlerSpec())
.log()
.toReactivePublisher(true)
}
fun webClient(): WebClient {
val client = HttpClient.create(provider)
.keepAlive(true)
val clientRegistryRepo = InMemoryReactiveClientRegistrationRepository(
ClientRegistration
.withRegistrationId("clientId")
.tokenUri("https://openid-connect/token")
.clientId("clientId")
.clientSecret("clientSecret")
.authorizationGrantType(AuthorizationGrantType.CLIENT_CREDENTIALS)
.build())
val clientService = InMemoryReactiveOAuth2AuthorizedClientService(clientRegistryRepo)
val authorizedClientManager =
AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager(clientRegistryRepo, clientService)
val oauthFilter = ServerOAuth2AuthorizedClientExchangeFilterFunction(authorizedClientManager)
oauthFilter.setDefaultClientRegistrationId("clientId")
return WebClient.builder()
.filter(oauthFilter)
.clientConnector(ReactorClientHttpConnector(client))
.exchangeStrategies(strategies)
.build()
}
fun sseMessageHandlerSpec(): WebFluxMessageHandlerSpec {
return WebFlux.outboundGateway(
"https://api/src", webClient())
.httpMethod(HttpMethod.POST)
.replyPayloadToFlux(true)
.expectedResponseType(typeReference<ServerSentEvent<String>>())
}
@Bean
fun commandLineRunner(ctx: ApplicationContext?): CommandLineRunner {
return CommandLineRunner { _ ->
val message = MessageBuilder.withPayload("{}").build()
logger.info { "Trigger SSE supplier" }
channelsConfiguration.trigger().send(message)
}
}
I'm able to get the authorisation token and connected to https://api/src
successfully but I'm not able to get any response except for this message
{
"scanAvailable": true,
"prefetch": -1
}
when I'm connected. After receiving the above message, I will not receive any more messages. What could be the problem?
I managed to receive data but I need to subscribe to
body
in the response (see the updated code below). The problem I'm facing now is how to map DataBuffer to ServerSentEvent.