Quarkus Transaction error when running on multiple threads

199 Views Asked by At

When I run post to http://localhost:80/poc/copyOne, the code runs perfectly, but when I curl http://localhost:80/poc/streamHPO/2, it fails with the message:

Cannot use the EntityManager/Session because neither a transaction nor a CDI request context is active. Consider adding @Transactional to your method to automatically activate a transaction, or @ActivateRequestContext if you have valid reasons not to use transactions

I'm using an MSSQL server (not sure if it's related), and PanacheEntityBase-s for both HighPriorityOpenEntity and ComPocTtEntity

@GET
    @Path("/streamHPO/{maxt}")
    public Response streamHPO(@PathParam("maxt") int maxt) throws IOException {
        //get the largest id
        //start on a new thread
        new Thread(() ->
        {
            try {
                streamHPOThread(maxt);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).start();
        return Response.ok().build();
    }

    @POST
    @Transactional
    @Path("/copyOne")
    public void copyOne(long id)
    {
        ComPocTtEntity ComPocTtEntityn = ComPocTtEntity.findById(id);
        System.out.println("Id: " + ComPocTtEntityn.getId());
        //Copy item to HighPriorityOpenEntity
        HighPriorityOpenEntity highPriorityOpenEntityn = new HighPriorityOpenEntity();
        highPriorityOpenEntityn.setId(ComPocTtEntityn.getId());
        highPriorityOpenEntityn.setName(ComPocTtEntityn.getName());
        highPriorityOpenEntityn.setDescription(ComPocTtEntityn.getDescription());
        highPriorityOpenEntityn.setType(ComPocTtEntityn.getType());
        highPriorityOpenEntityn.setOwner(ComPocTtEntityn.getOwner());
        highPriorityOpenEntityn.setCreated(ComPocTtEntityn.getCreated());
        highPriorityOpenEntityn.setUpdated(ComPocTtEntityn.getUpdated());
        highPriorityOpenEntityn.setDue(ComPocTtEntityn.getDue());
        highPriorityOpenEntityn.setResolved(ComPocTtEntityn.getResolved());
        highPriorityOpenEntityn.setClosed(ComPocTtEntityn.getClosed());
        highPriorityOpenEntityn.setSeverity(ComPocTtEntityn.getSeverity());
        //Persist item and flush
        highPriorityOpenEntityn.persistAndFlush();
    }
    public void streamHPOThread(int maxt) throws IOException {
        //Get all HIGH priority OPEN status items
        List<ComPocTtEntity> ComPocTtEntityList = ComPocTtEntity.listAll();
        List<Long> HPOids = ComPocTtEntityList.stream()
                .filter(ComPocTtEntity -> ComPocTtEntity.getPriority().equals("HIGH"))
                .filter(ComPocTtEntity -> ComPocTtEntity.getStatus().equals("OPEN"))
                .toList().stream().map(ComPocTtEntity::getId).toList();
        System.out.println("HPO size: " + HPOids.size());
        int s = HPOids.size();
        List<Thread> threads = new ArrayList<>();
        for (int c = 0; c < s; c++) {
            while (threads.size() >= maxt)
                for (int t = 0; t < threads.size(); t++)
                    if (!threads.get(t).isAlive())
                        threads.remove(t);
            //Get first item
            long id = HPOids.get(c);
            //Start copyone on a new thread
            threads.add(new Thread(() ->
                copyOne(id)
            ));
            threads.get(c).start();

            System.out.println("HPO: " + c + " of " + s);
        }
    }
1

There are 1 best solutions below

0
On

You should not create threads by yourself. To use CDI and JTA with multiple threads you can inject or create a managed executor. Furthermore to get the transaction you should move your transactional method to another bean.

@ApplicationScope
public class Service {

    @Inject
    Logger logger;

    @Transactional
    public void copyOne(long id)
    {
        ComPocTtEntity comPocTtEntityn = ComPocTtEntity.findById(id);
        logger.info("Id: " + comPocTtEntityn.getId());
        //Copy item to HighPriorityOpenEntity
        HighPriorityOpenEntity highPriorityOpenEntityn = new HighPriorityOpenEntity();
        highPriorityOpenEntityn.setId(comPocTtEntityn.getId());
        highPriorityOpenEntityn.setName(comPocTtEntityn.getName());
        highPriorityOpenEntityn.setDescription(comPocTtEntityn.getDescription());
        highPriorityOpenEntityn.setType(comPocTtEntityn.getType());
        highPriorityOpenEntityn.setOwner(comPocTtEntityn.getOwner());
        highPriorityOpenEntityn.setCreated(comPocTtEntityn.getCreated());
        highPriorityOpenEntityn.setUpdated(comPocTtEntityn.getUpdated());
        highPriorityOpenEntityn.setDue(comPocTtEntityn.getDue());
        highPriorityOpenEntityn.setResolved(comPocTtEntityn.getResolved());
        highPriorityOpenEntityn.setClosed(comPocTtEntityn.getClosed());
        highPriorityOpenEntityn.setSeverity(comPocTtEntityn.getSeverity());
        //Persist item and flush
        highPriorityOpenEntityn.persistAndFlush();
    }
}
@Path("/")
public class Endpoint {

    @Inject
    Logger logger;

    @Inject
    Service service;

    @POST
    @Path("/copyOne")
    public void copyOne(long id) {
        service.copyOne(id);
    }

    @Path("/streamHPO/{maxt}")
    public Response streamHPO(@PathParam("maxt") int maxt) throws IOException {
        
        ManagedExecutor executor = ManagedExecutor.builder()
            .maxAsync(maxt)
            .propagated(ThreadContext.CDI, ThreadContext.TRANSACTION)
            .build();

        //Get all HIGH priority OPEN status items
        List<ComPocTtEntity> comPocTtEntityList = ComPocTtEntity.listAll();
        List<Long> hpoIdList = comPocTtEntityList.stream()
                .filter(e -> e.getPriority().equals("HIGH"))
                .filter(e -> e.getStatus().equals("OPEN"))
                .toList().stream().map(ComPocTtEntity::getId).toList();
        int s = hpoIdList.size();
        int i = 0;
        logger.info("HPO size: " + s);
        for (long id : hpoIdList) {
            executor.runAsync(() => {
                service.copyOne(id);
            });

            logger.info("HPO: " + i + " of " + s);
            ++i;
        }
        return Response.ok().build();
    }
}

I am not totally sure but I think it should be enough to propagate the CDI context, because the other CDI bean has the transactional context.

https://github.com/eclipse/microprofile-context-propagation/tree/main

ManagedExecutor executor = ManagedExecutor.builder()
    .maxAsync(maxt)
    .propagated(ThreadContext.CDI)
    .build();