R2DBC MSSQL r2dbc.mssql.client.ReactorNettyClient : Connection has been closed by peer

401 Views Asked by At

I have started to work on a Spring WebFlux and R2DBC project. Mainly, my code works fine. But after some elements I am receiving this warning

r2dbc.mssql.client.ReactorNettyClient : Connection has been closed by peer

after this warning I am getting this exception and normally program stops to read from Flux which source is R2DBC driver.

ReactorNettyClient$MssqlConnectionClosedException: Connection unexpectedly closed

My main pipeline like this;

Sinks.Empty<Void> completionSink = Sinks.empty();

 Flux<Event> events = service.getPairs(
                taskProperties.A,
                taskProperties.B);

events
    .flatMap(some operation)
    .doOnComplete(() -> {
        log.info("Finished Job");
        completionSink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
    })
    .subscribe();
completionSink.asMono().block();

After run, flatMap requesting 256 element as a default, then after fetching trying to request(1) for next signal.

At somewhere between 280. and 320. element it is getting above error. It is not idempotent, sometimes it reads 280 element sometimes it is reading 303, 315 etc.

I think it is about network maybe? But not sure and cannot find the reason. Do I need a pool or something different?

Sorry if I missed anything, in case you want I will try to update here. Thank you in advance

I have tried to change request size of flatMap to unbounded, adding scheduler, default r2dbc pool but for now I don't have any clue.

1

There are 1 best solutions below

0
On

Turns out it is related to cursor settings. After we have changed preferCursoredExecution to true it has fixed.

ConnectionFactory factory = new MssqlConnectionFactory(
        MssqlConnectionConfiguration.builder()
                .host(host)
                .port(port)
                .database(database)
                .username(username)
                .password(password)
                .tcpKeepAlive(true)
                .preferCursoredExecution(true)
                .build()
);