I'm trying to implement cursor-based pagination with a custom implementation using R2dbc. Forward pagination works fine, but backward pagination isn't working correctly.
The query for forward pagination would be something like this, depending on the cursor fields (in this case, we use createdAt):
SELECT SUBSCRIBERS.* FROM SUBSCRIBERS WHERE SUBSCRIBERS.CREATED_AT > $1 ORDER BY SUBSCRIBERS.CREATED_AT ASC LIMIT 10 + 1; -- SIZE + 1
And the query for backward pagination would be a subquery within the main query:
SELECT
*
FROM
(
SELECT
SUBSCRIBERS.*
FROM
SUBSCRIBERS
WHERE
SUBSCRIBERS.CREATED_AT < $1
ORDER BY
SUBSCRIBERS.CREATED_AT DESC
LIMIT 10 + 1
) AS "DATA"
ORDER BY
CREATED_AT ASC;
My question is, how can I use R2dbcEntityTemplate or another class to build the query with its subquery?
I've tried implementing it like this, but it's executing two separate queries:
val subQuery = query(criteria.and(cursorCriteriaParsed))
.with(PageRequest.of(0, pageSize, sort.and(criteriaSpringSort)))
val properties = cursor.getSort().orders.map { it.property }
@Suppress("SpreadOperator")
val mainQuery: Query = query(Criteria.empty())
.sort(Sort.by(Sort.Direction.ASC, *properties.toTypedArray()))
r2dbcTemplate.select(domainType.java).matching(subQuery).all()
.thenMany(r2dbcTemplate.select(domainType.java).matching(mainQuery).all()).collectList()
Is there a way to use criteria and sorts to implement queries with subqueries?
package com.lyra.spring.boot.repository
import com.lyra.common.domain.presentation.pagination.Cursor
import com.lyra.common.domain.presentation.pagination.PageResponse
import com.lyra.common.domain.presentation.sort.Direction
import com.lyra.spring.boot.presentation.sort.toSpringSort
import kotlin.reflect.KClass
import org.springframework.data.domain.Page
import org.springframework.data.domain.PageImpl
import org.springframework.data.domain.PageRequest
import org.springframework.data.domain.Pageable
import org.springframework.data.domain.Sort
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate
import org.springframework.data.relational.core.query.Criteria
import org.springframework.data.relational.core.query.Query
import org.springframework.data.relational.core.query.Query.query
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.util.function.Tuple2
class ReactiveSearchRepositoryImpl<T : Any>(
private val r2dbcTemplate: R2dbcEntityTemplate
) : ReactiveSearchRepository<T> {
/**
* Fetches all entities that match the given criteria.
*
* @param criteria The criteria to match.
* @param domainType The class of the entity.
* @return A Flux of entities that match the criteria.
*/
override fun findAll(criteria: Criteria, domainType: KClass<T>): Flux<T> {
return r2dbcTemplate.select(domainType.java)
.matching(query(criteria))
.all()
}
/**
* Fetches all entities that match the given criteria, with support for pagination.
*
* @param criteria The criteria to match.
* @param pageable The pagination information.
* @param domainType The class of the entity.
* @return A Mono of a Page of entities that match the criteria.
*/
override fun findAll(
criteria: Criteria,
pageable: Pageable,
domainType: KClass<T>
): Mono<Page<T>> {
val list = r2dbcTemplate.select(domainType.java)
.matching(query(criteria).with(pageable))
.all()
.collectList()
val count = r2dbcTemplate.select(domainType.java)
.matching(query(criteria))
.count()
return Mono.zip<List<T>, Long>(list, count)
.map { tuple: Tuple2<List<T>, Long> ->
PageImpl(
tuple.t1,
pageable,
tuple.t2,
)
}
}
/**
* Fetches all entities that match the given criteria, with support for pagination and a cursor.
*
* @param criteria The criteria to match.
* @param size The number of entities to fetch. Default is 10.
* @param domainType The class of the entity.
* @param cursor The cursor to use for pagination.
* @return A Mono of a [PageResponse] of entities that match the criteria.
*/
override fun findAllByCursor(
criteria: Criteria,
size: Int,
domainType: KClass<T>,
sort: Sort,
cursor: Cursor
): Mono<PageResponse<T>> {
val pageSize = size + 1
val cursorDirection = cursor.direction
val cursorCriteriaParsed = R2DBCCriteriaParser(domainType).parse(cursor.getCriteria())
val criteriaSpringSort = cursor.getSort().toSpringSort()
val subQuery = query(criteria.and(cursorCriteriaParsed))
.with(PageRequest.of(0, pageSize, sort.and(criteriaSpringSort)))
val properties = cursor.getSort().orders.map { it.property }
@Suppress("SpreadOperator")
val mainQuery: Query = query(Criteria.empty())
.sort(Sort.by(Sort.Direction.ASC, *properties.toTypedArray()))
val list = when (cursorDirection) {
Direction.ASC -> r2dbcTemplate.select(domainType.java).matching(subQuery).all().collectList()
Direction.DESC -> r2dbcTemplate.select(domainType.java).matching(subQuery).all()
.thenMany(r2dbcTemplate.select(domainType.java).matching(mainQuery).all()).collectList()
}
return list.flatMap { entities ->
val content = entities.take(size)
if (content.isEmpty()) {
Mono.just(PageResponse(emptyList(), null, null))
} else {
val hasNextPage = entities.size > size
val nextCursor = if (hasNextPage) cursor.serialize(content.last()) else null
val previousCursor = getPreviousCursor(cursorDirection, cursor, content, hasNextPage)
Mono.just(PageResponse(content, previousCursor, nextCursor))
}
}
}
private fun getPreviousCursor(
cursorDirection: Direction,
cursor: Cursor,
content: List<T>,
hasNextPage: Boolean
): String? {
val newPreviousCursor = cursor.serialize(content.first(), Direction.DESC)
val shouldSerialize = when (cursorDirection) {
Direction.ASC -> !cursor.isDefault()
Direction.DESC -> hasNextPage && newPreviousCursor != cursor.serialize()
}
return if (shouldSerialize) newPreviousCursor else null
}
}
Still looking at your code, simply want to ask first, why not use offset and limit for pagination ?