Quarkus blocking grpc vert.x eventloop error

1.3k Views Asked by At

Trying to use grpc for communication between microservices with Quarkus 1.12.1.Final. I'm trying to access my entity but I'm getting a vert.x-eventloop-thread-o error;

(vert.x-eventloop-thread-0) Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@8e09d21: java.lang.IllegalStateException: You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.

Supposedly adding @Blocking to my method should fix this but this doesn't seem to work.

@Singleton
public class FillProductService extends SourceGrpc.SourceImplBase {

    @Override
    @Blocking
    public void fillProductsWithSourceInfo(FillProductsRequest request, StreamObserver<FillProductsReply> responseObserver) {
        List<SourceInfo> sourceInfo = request.getProductIdsList().stream().map(productId -> {
            List<SourceEntity> sources = SourceEntity.find("productId = ?1", productId, Sort.descending("upvotes")).list();
            double bestPrice = !sources.isEmpty() ? sources.get(0).price : 0.00;
            return SourceInfo.newBuilder().setSources(sources.size())
                        .setBestPrice(bestPrice).setProductId(productId).build();
        }).collect(Collectors.toList());

        responseObserver.onNext(FillProductsReply.newBuilder().addAllSourceInfo(sourceInfo).build());
        responseObserver.onCompleted();
    }
}

I've also tried using Mutiny but it seems to give the same error

1

There are 1 best solutions below

0
On BEST ANSWER

Using mutiny with runSubscribtionOn does give me a response now;

@Singleton
public class FillProductService extends MutinySourceGrpc.SourceImplBase {

    @Override
    public Uni<FillProductsReply> fillProductsWithSourceInfo(FillProductsRequest request) {
        return Uni.createFrom().item(() -> {
            List<SourceInfo> sourceInfo = request.getProductIdsList().stream().map(productId -> {
                List<SourceEntity> sources = SourceEntity.find("productId = ?1", productId).list();
                double bestPrice = !sources.isEmpty() ? sources.get(0).price : 0.00;
                return SourceInfo.newBuilder().setSources(sources.size())
                            .setBestPrice(bestPrice).setProductId(productId).build();
            }).collect(Collectors.toList());
            return FillProductsReply.newBuilder().addAllSourceInfo(sourceInfo).build();
        }).runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
        .onFailure().invoke(t -> System.out.println("Oh no! We received a failure: " + t.getMessage())
        );
    }
}