Spring Data R2DBC - MySQL Stored procedure execution

196 Views Asked by At

I'm trying to call a MySQL stored procedure using Spring R2DBC from my API built on Spring Webflux. However, I'm not getting any data back after executing the query.

I'm new to R2DBC, can anyone help me where I'm going wrong. I have followed what is said in the R2DBC spec, MySQL R2DBC driver's GitHub page and R2DBC 0.9 release notes about Parameterized statements and bindings.

My table, Stored procedure:

CREATE TABLE cars (make varchar(100), model varchar(100),
                  year int, value decimal(10, 2));

INSERT INTO cars VALUES
('Porsche', '911 GT3', 2017, 169700),('Porsche', 'Cayman GT4', 2017, 118000),('Porsche', 'Panamera', 2017, 113200);

CREATE PROCEDURE get_car_stats_by_year2(
IN year_filter INT,
OUT cars_number INT,
OUT min_value DECIMAL(10,2),
OUT avg_value DECIMAL(10,2),
OUT max_value DECIMAL(10,2)
)
BEGIN
  SELECT COUNT(*), MIN(value), AVG(value), MAX(value)
  INTO cars_number, min_value, avg_value, max_value
  FROM cars
  WHERE year = year_filter ORDER BY make, value DESC;
END;

CALL get_car_stats_by_year2(2017, @number, @min, @avg, @max);

SELECT @number, @min, @avg, @max;

Java code:

@Repository
public class CarsRepository {

private ConnectionFactory connectionFactory;

public CarsRepository(ConnectionFactory connectionFactory) {
    this.connectionFactory = connectionFactory;
}


public Flux<CarStats> getCarStatsByYear(int year) {
    return Mono.from(this.connectionFactory.create())
            .flatMapMany(connection -> {
                return connection.createStatement("CALL get_car_stats_by_year2(?, ?number, ?min, ?avg, ?max)")
                        .bind(0, Parameters.in(R2dbcType.INTEGER, year))
                        .bind("number", Parameters.out(R2dbcType.INTEGER))
                        .bind("min", Parameters.out(R2dbcType.FLOAT))
                        .bind("avg", Parameters.out(R2dbcType.FLOAT))
                        .bind("max", Parameters.out(R2dbcType.FLOAT))
                        .execute();
            })
            .filter(Result.OutSegment.class::isInstance) //No OutSegment found
            .map(segment -> {
                Result.OutSegment data = (Result.OutSegment) segment;
                System.err.println("OutParameters: " + data.outParameters().getMetadata());
                return CarStats.builder()
                        .number(Integer.parseInt(data.outParameters().get(0).toString()))
                        .min(Float.parseFloat(data.outParameters().get(1).toString()))
                        .avg(Float.parseFloat(data.outParameters().get(2).toString()))
                        .max(Float.parseFloat(data.outParameters().get(3).toString()))
                        .build();
            });
    }
}

I have added additional logging to determine what Segment type it's returning in the getCarStatsByYear(int) method and it's none of the available Segment types.

...
.flatMapMany(connection -> ...)
.doOnNext(result -> {
    System.err.println("Result.OutSegment.class::isInstance: " + Result.OutSegment.class.isInstance(result));
    System.err.println("Result.RowSegment.class::isInstance: " + Result.RowSegment.class.isInstance(result));
    System.err.println("Result.UpdateCount.class::isInstance: " + Result.UpdateCount.class.isInstance(result));
    System.err.println("Result.Message.class::isInstance: " + Result.Message.class.isInstance(result));
 })
 .filter(Result.OutSegment.class::isInstance)
 ...

Result.OutSegment.class::isInstance: false
Result.RowSegment.class::isInstance: false
Result.UpdateCount.class::isInstance: false
Result.Message.class::isInstance: false
1

There are 1 best solutions below

0
On

There are 3 problems:

  1. Statement.execute() returns Publisher<Result>, which Result is similar to ResultSet in JDBC, it should not be a Segment. So we have to retrieve Segments in the Result, see also Result.flatMap javadoc
  2. MySQL does not allow OUT parameters without @ variables in text protocol, so we have to enable binary protocol, useServerPrepareStatement=true
    • Or we will see argument * for routine **.*** is not a variable or NEW pseudo-variable in BEFORE trigger error in text protocol
    • See also useServerPrepStmts in mysql-connector-j
    • Or use CALL get_car_stats_by_year2(?, @number, @min, @avg, @max);SELECT @number, @min, @avg, @max when useServerPrepareStatement=false
  3. MySQL returns OUT parameters as a data row, so it can be retrieve as RowSegment for now

Try this on io.asyncer:r2dbc-mysql:1.1.2:

// Connect with property useServerPrepareStatement=true
Mono.from(this.connectionFactory.create())
    .flatMapMany(connection -> connection.createStatement("CALL get_car_stats_by_year2(?, ?number, ?min, ?avg, ?max)")
        .bind(0, Parameters.in(R2dbcType.INTEGER, 2017))
        .bind("number", Parameters.out(R2dbcType.INTEGER))
        .bind("min", Parameters.out(R2dbcType.FLOAT))
        .bind("avg", Parameters.out(R2dbcType.FLOAT))
        .bind("max", Parameters.out(R2dbcType.FLOAT))
        .execute())
    .flatMap(result -> result.flatMap(segment -> {
        // Retrieve Segments in the Result
        // Use Result.map((Readable r) -> ...) should be better
        // But this example need metadata, so we using Result.flatMap here
        if (segment instanceof Result.Message) {
            return Mono.error(((Result.Message) segment).exception());
        } else if (segment instanceof Result.UpdateCount) {
            return Mono.empty();
        }

        Result.RowSegment data = (Result.RowSegment) segment;
        Row row = data.row();

        return Mono.just(row.getMetadata().getColumnMetadatas()
            .stream()
            .map(it -> Map.entry(it.getName(), Objects.requireNonNull(row.get(it.getName()))))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
    }))
    .subscribe(System.out::println); // subscribe just for example

Output: {min_value=113200.00, cars_number=3, avg_value=133633.33, max_value=169700.00}