Are accumulators thread-safe?

2.1k Views Asked by At

I'm using accumulators and wanted to know if these objects are thread-safe?

accumInt is a type of AccumulatorParam<Integer>.

// Current value accumInt -> 6
AccumulatorThread t1 = new AccumulatorThread();
t1.setAccum(accumInt); 
t1.setValueToAdd(5);

AccumulatorThread t2 = new AccumulatorThread();
t2.setAccum(accumInt);
t2.setValueToAdd(7);

new Thread(t1).start();
new Thread(t2).start();

System.out.println(accumInt.value()); // 11 or 13 or 18

AccumlatorThread class:

class AccumulatorThread implements Runnable {
    Accumulator<Integer> accum;
    Integer              valueToAdd;

    public Integer getValueToAdd() {
        return valueToAdd;
    }


    public void setValueToAdd(Integer valueToAdd) {
        this.valueToAdd = valueToAdd;
    }

    public Accumulator<Integer> getAccum() {
        return accum;
    }


    public void setAccum(Accumulator<Integer> accum) {
        this.accum = accum;
    }

    public void run() {
        System.out.println("Value to Add in Thread : "+valueToAdd);
        accum.add(valueToAdd);
    }
}

The behavior shows that it is not a thread safe. Am I missing something?

4

There are 4 best solutions below

0
On BEST ANSWER

OOC why are you both setting and reading the accumulator in the same program? Accumulators are generally added to by the worker threads and may only be read by the driver thread.

Worker1:   accumulator.add(increment)
Worker2:   accumulator.add(someOtherIncrement)

Driver:  println(accumulator.value)

Now you are asking about mulithreading for setting/reading values in different threads on the driver. To what purpose? In that case just use a local JVM AtomicInteger or AtomicLong.

Accumulators are variables that are only “added” to through an associative operation and can therefore be efficiently supported in parallel.

1
On

Accumulators are not thread-safe. Only SparkContext can be used in multiple threads.

2
On

To expand on the other two great answers from @javadba and @zsxwing.

My understanding of Apache Spark is that they may or may not be thread-safe. It does not really matter. Since the driver is "far away" from its workers (they usually talk to each other over the network or at least between JVMs -- unless it's local mode) all updates to an accumulator arrive in messages that are processed one by one and therefore ensure single-threaded update to the accumulator.

0
On

Accumulators are not thread-safe, in fact they do not need to be thread-safe. For executors, accumulators are write only variables, they can be added by executors and they can be read by the driver. Driver makes use of DAGScheduler.updateAccumulators method to update values of accumulators after task is completed, and this method is called only from a thread that runs scheduling loop. At a time, only one task completion event is handled. That's why there is no need for accumulators to be thread-safe.