Interval Task leads to IDLE Connection Exhaustion in R2DBC

264 Views Asked by At

I am using reactor java to run a periodic task against Postgres using r2dbc as follows;

Flux.interval(Duration.ofMillis(1000)).doOnNext(i->{
            System.out.print("TIME HAS TICKED\n");
            Flux.range(0,10).flatMap(j->{
                return service.getJob(this.consumerQueueName, this.filter).then();
            }).subscribe();
        }).subscribe();

after about 5 mins it stops processing jobs and when i check postgres connections are all idle:


select        datname as database_name,
       client_addr as client_address,
       application_name,
       backend_start,
       state,
       state_change
from pg_stat_activity;

integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.786098  idle    2020-09-18 04:11:40.471893
integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.785822  idle    2020-09-18 04:12:01.196558
integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.785598  idle    2020-09-18 04:11:50.971738
integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.785317  idle    2020-09-18 04:11:30.506207
integrity_service   10.0.73.1   r2dbc-postgresql    2020-09-18 04:11:07.665800  idle    2020-09-18 04:11:20.570714

How can I appropriately use r2dbc and databaseClient to periodically fetch data from a table without causing this exception?

//ConnectionFactory Settings:

ConnectionFactories.get(
                ConnectionFactoryOptions.builder()
                        .option(Option.valueOf("driver"), "pool")
                        .option(Option.valueOf("protocol"), "postgresql")
                        //.option(ConnectionFactoryOptions.DRIVER, "postgresql")
                        .option(ConnectionFactoryOptions.HOST, "localhost")
                        .option(ConnectionFactoryOptions.PORT, 5432)  // optional, defaults to 5432
                        .option(ConnectionFactoryOptions.USER, "db")
                        .option(ConnectionFactoryOptions.DATABASE, "integrity_service")
                        .option(MAX_SIZE, 5)
                        .build());

private final String fetchJobFormat =
            " WITH cte AS ( SELECT id FROM %s WHERE chain_id='%s' and is_complete=%b  ORDER BY id ASC LIMIT 1\n" +
                    "    )\n" +
                    "                UPDATE queue q\n" +
                    "                SET timestamp = extract(epoch from now()),\n" +
                    "                is_complete = TRUE\n" +
                    "                FROM cte WHERE q.id  = cte.id\n" +
                    "                RETURNING q.id, q.chain_id,q.timestamp, q.is_complete, q.payload";

 public Mono<Queue> getJob(String queue, String filter){

        return databaseClient.execute(String.format(fetchJobFormat,queue,filter,false))
                .fetch().all()
                .flatMap((v) -> {
                    System.out.println("retrieved result" + v.get("id").toString());
                    Queue q = this.objectMapper.convertValue(v, Queue.class);
                    return Mono.just(q);
                }).last();
    }
0

There are 0 best solutions below