Future.get() throws java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException

292 Views Asked by At

I have a list of recodes called filterRecords -> List<Tuple<String, GenericRecord>> filterRecords. I am splitting this list into sub lists and giving each sublists to a separate thread for parallel execution. My Call() method implementation returns Tuple<List<FixedLengthRecord>, BatchDataPoints>. But when I am doing future.get(), I am getting ConcurrentModificationException.

Below is the code snippet.

    ConcurrentLinkedQueue <Future<Tuple<List<FixedLengthRecord>, BatchDataPoints>>> futures = new ConcurrentLinkedQueue <>();
    List<List<Tuple<String, GenericRecord>>> partitions = new ArrayList<>();

        int partitionSize = (filterRecords.size() / fixedLengthRecordEnrichmentThreadSize);
        for (int i = 0; i < filterRecords.size(); i += partitionSize) {
            List<Tuple<String, GenericRecord>> sublist = (filterRecords.subList(i, Math.min(i + partitionSize, filterRecords.size())));
            partitions.add(sublist);
        }

        for(int fixedLengthRecordEnrichmentThread = 0; fixedLengthRecordEnrichmentThread < this.fixedLengthRecordEnrichmentThreadSize; fixedLengthRecordEnrichmentThread++) {
            futures.offer(service.submit(new FixedLengthRecordWrapper(partitions.get(fixedLengthRecordEnrichmentThread),dataEnricher)));
        }


Iterator<Future<Tuple<List<FixedLengthRecord>, BatchDataPoints>>> futureIterator = futures.iterator();
while (futureIterator.hasNext()) {
      Future<Tuple<List<FixedLengthRecord>, BatchDataPoints>> future = futureIterator.next();
      if (future.isDone() || future.isCancelled()) {
            Tuple<List<FixedLengthRecord>, BatchDataPoints> fixedLengthRecordsTuple = future.get();  // ConcurrentModificationException

      }
 }

I also Tried other options, like:

 while(!futures.isEmpty()){
   Future<Tuple<List<FixedLengthRecord>, BatchDataPoints>> future = futures.poll();
   if (future.isDone() || future.isCancelled()) {
        Tuple<List<FixedLengthRecord>, BatchDataPoints> fixedLengthRecordsTuple = future.get();
   }
 }

Bewlow is the stackTrace:

 java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.data.driver.BatchCallable.processBatch(BatchCallable.java:161)
at com.data.driver.BatchCallable.call(BatchCallable.java:102)
at com.data.driver.BatchCallable.call(BatchCallable.java:38)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
 Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1241)
at java.util.ArrayList$SubList.size(ArrayList.java:1050)
at java.util.AbstractCollection.toArray(AbstractCollection.java:136)
at java.util.ArrayList.<init>(ArrayList.java:178)
at com.data.tranhist.fixedlength.FixedLengthRecordWrapper.getEnrichedFixedLengthRecordsFromGenericRecords(FixedLengthRecordWrapper.java:93)
at com.data.tranhist.fixedlength.FixedLengthRecordWrapper.call(FixedLengthRecordWrapper.java:63)
at com.data.tranhist.fixedlength.FixedLengthRecordWrapper.call(FixedLengthRecordWrapper.java:22)
... 5 more

Any help is appreciated. Thanks in advance.

1

There are 1 best solutions below

0
Sagar On

The subList method returns the view of the list and does not create actual list,so any structural changes in backing list will throw concurrent modification exception.

From java doc : The semantics of the list returned by this method become undefined if the backing list (i.e., this list) is structurally modified in any way other than via the returned list. (Structural modifications are those that change the size of this list, or otherwise perturb it in such a fashion that iterations in progress may yield incorrect results.) List subList(int fromIndex, int toIndex);

In code snippet mentioned above,since the future is trying to modify the backing list (could be in dataenricher) at later point in time,it throws exception during get call