Service is using org.springframework.r2dbc.core.DatabaseClient
with reactor-pool and r2dbc-mysql driver.
I'm doing inserts in the database every 5-10 seconds (50-100 insert statements) and randomly after every 2-3 hours I'm getting reactor.pool.PoolShutdownException: Pool has been shut down
, what might be the reason for this behavior?
Dependecy versions:
- r2dbc-pool: 0.8.8.RELEASE
- r2dbc-mysql: 0.8.2
- spring-r2dbc: 5.3.15
Stacktrace:
org.springframework.dao.DataAccessResourceFailureException: Failed to obtain R2DBC Connection; nested exception is reactor.pool.PoolShutdownException: Pool has been shut down
at org.springframework.r2dbc.connection.ConnectionFactoryUtils.lambda$getConnection$0(ConnectionFactoryUtils.java:88)
at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3733)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
at reactor.core.publisher.FluxRetry$RetrySubscriber.onError(FluxRetry.java:95)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
at reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:477)
at reactor.pool.SimpleDequePool.doAcquire(SimpleDequePool.java:264)
at reactor.pool.AbstractPool$Borrower.request(AbstractPool.java:432)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110)
at reactor.pool.SimpleDequePool$QueueBorrowerMono.subscribe(SimpleDequePool.java:676)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.FluxRetry$RetrySubscriber.resubscribe(FluxRetry.java:117)
at reactor.core.publisher.MonoRetry.subscribeOrReturn(MonoRetry.java:50)
at reactor.core.publisher.Mono.subscribe(Mono.java:4385)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:104)
at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251)
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoCompletionStage.lambda$subscribe$0(MonoCompletionStage.java:83)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(Unknown Source)
at reactor.core.publisher.MonoCompletionStage.subscribe(MonoCompletionStage.java:58)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:126)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Basically, it happens when you have too many pending acquired connections
In this situation, reactor-pool disposes connection pool. So to avoid such an issue, I’m controlling the number of parallel executions.
Actually I see two ways to handle this case:
In this case, you won't have more parallel executions than your connection pool can afford.
delayUntil
operator, which delays elements until there are unused connections (you can use connection pool metrics to retrieve that). I wouldn't recommend this approach because you can end up with of memory exception if you are not controlling back-pressure, or you will have to drop some items if the buffer overflows.Method that delays message until there are available connections
Usage: