I need to build a background process that will periodically process all the elements of a database table. Since I can not load all the elements in memory, I need to divide the database in subportions. Unfortunately, I can not use JPQL Stream return type, since the query uses a complex filter based logic which is not achievable by writing a JPQL query.
Thus, I have built the following criteria query, to return only a page of Products. After I have processed a page, how can I iterate over next one until I have processed them all?
import org.springframework.data.jpa.repository.query.QueryUtils;
@Repository
@AllArgsConstructor
public class ProductRepositoryImpl implements CustomProductRepository {
private final EntityManager entityManager;
@Override
public Page<Product> getProducts(Pageable pageable, /* fields used to filter the results */) {
CriteriaBuilder criteriaBuilder = entityManager.getCriteriaBuilder();
CriteriaQuery<Product> criteriaQuery = criteriaBuilder.createQuery(Product.class);
Root<Product> root = criteriaQuery.from(Product.class);
List<Predicate> predicates = getPredicates(/* fields used to filter the results */);
criteriaQuery.where(combinePredicatesWithAndStatement(criteriaBuilder, predicates))
.orderBy(QueryUtils.toOrders(pageable.getSort(), root, criteriaBuilder));
List<Product> result = entityManager.createQuery(criteriaQuery)
.setFirstResult((int) pageable.getOffset())
.setMaxResults(pageable.getPageSize())
.getResultList();
CriteriaBuilder criteriaBuilderCount = entityManager.getCriteriaBuilder();
CriteriaQuery<Long> countQuery = criteriaBuilderCount.createQuery(Long.class);
Root<Product> rootCount = countQuery.from(Product.class);
List<Predicate> predicatesCount = getPredicates(/* fields used to filter the results */);
countQuery.select(criteriaBuilderCount.count(rootCount))
.where(combinePredicatesWithAndStatement(criteriaBuilderCount, predicatesCount));
Long totalElements = entityManager.createQuery(countQuery).getSingleResult();
return new PageImpl<>(result, pageable, totalElements);
}
private List<Product> getPredicates(/* fields used to filter the results */) {
List<Predicate> predicates = new ArrayList<Predicate>();
// assemble predicates based on some complex conditions not replicable with JPQL
return predicates;
}
private Predicate combinePredicatesWithAndStatement(CriteriaBuilder criteriaBuilder, List<Predicate> predicates) {
return criteriaBuilder.and(predicates.stream().filter(Objects::nonNull).toArray(Predicate[]::new));
}
}
To iterate over all the pages you can create a PageStreamer class as follows:
Then you can use it as follows: