update query in a for loop using batch updates and threads

487 Views Asked by At

i have the following code. it fetch data from a table in database.then for each item in that data list it checks a condition and base on that updates a column in that table. i know batch updates will help in such situations. and also async programming will be useful. so i created 10 threads by a ThreadPoolExecutor and did batch updates using them:


    @Inject
    private EntityManager em;


    @Resource
    UserTransaction utx;

   
    public Response determineOwnership() throws SystemException, NotSupportedException, HeuristicRollbackException, HeuristicMixedException, RollbackException {

        List<Object[]> list = em.createNativeQuery("SELECT u.usr_mobile,u.national_code from usr_mobile_verification u").getResultList();
        utx.begin();
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
        List<Future<Object>> futures = new ArrayList<>(0);
        AtomicInteger i = new AtomicInteger();
        list.forEach(t -> {
            i.getAndIncrement();
            UpdateCallable callable = null;
            try {
                callable = new UpdateCallable(String.valueOf(t[0]), String.valueOf(t[1]), infoService);
            } catch (NamingException e) {
                e.printStackTrace();
            }
            if ((i.get() % 30) == 0) {
                em.flush();
                em.clear();
            }
            Future<Object> result = executor.submit(callable);
            futures.add(result);
        });
        executor.shutdown();
        utx.commit();
        for (Future f : futures) {
            try {
                f.get();
            } catch (Exception ex) {
                ex.printStackTrace();
                return ResponseHelper.serverError();
            }
        }
        return ResponseHelper.ok("operation finished");
}

the UpdateCallable class:

public class UpdateCallable implements Callable<Object> {

    private String mobile;
    private String nid;
    private EntityManager em =  Persistence.createEntityManagerFactory("primary").createEntityManager();
    private MobileIdentityInfoService infoService;
    private UserTransaction utx = ((UserTransaction) new InitialContext().lookup("java:comp/UserTransaction"));


    public UpdateCallable(String mobile, String nid, MobileIdentityInfoService infoService) throws NamingException {
        this.mobile = mobile;
        this.nid = nid;
        this.infoService = infoService;
    }

    public UpdateCallable() throws NamingException {
    }


    @Override
    public Exception call() throws Exception {
        try {
            try {
                utx.begin();
                em.joinTransaction();
                if (isContactInfoValid(mobile, nid))
                    em.createNativeQuery("update usr_mobile_verification m set m.is_owner='1' where m.national_code=?").setParameter(1, nid).executeUpdate();
                else
                    em.createNativeQuery("update usr_mobile_verification m set m.is_owner='0' where m.national_code=?").setParameter(1, nid).executeUpdate();
            } catch (RegistrationException e) {
                e.printStackTrace();
            }
        } catch (Exception ex) {
            return ex;
        }
        return null;
    }

with this approach i reduced time from 1 hour to 2 minutes!!

but the problem is at the end most of the records in the table don't get updated. i guess same iterations are getting processed by more than one thread and overlapped. what should i do? how can i ensure that iterations are not repeated and thus all records are updated

0

There are 0 best solutions below