Creating a Happens Before Relationship with AtomicBoolean

345 Views Asked by At

Reading this code AsyncSubscriber.java : The coder uses AtomicBoolean to create a Happens Before relationships, i want to know :

1_ Is it equivalent to use a synchronized block ? it looks that the lines if (on.get()) dosn't ensure that the block

try {
            final Signal s = inboundSignals.poll(); // We take a signal off the queue


 if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
          // Below we simply unpack the `Signal`s and invoke the corresponding methods
          if (s instanceof OnNext<?>)
            handleOnNext(((OnNext<T>)s).next);
          else if (s instanceof OnSubscribe)
            handleOnSubscribe(((OnSubscribe)s).subscription);
          else if (s instanceof OnError) // We are always able to handle OnError, obeying rule 2.10
            handleOnError(((OnError)s).error);
          else if (s == OnComplete.Instance) // We are always able to handle OnComplete, obeying rule 2.9
            handleOnComplete();
        }
      }

will be executed by 1 thread at time.

Indeed when on.get() return true, what prevent another thread from entering the critical section ?!

2_ Is it more efficient than a synchronized block ? (given that AtomicBoolean uses Volatile variable )

here the part of code :

    // We are using this `AtomicBoolean` to make sure that this `Subscriber` doesn't run concurrently with itself,
      // obeying rule 2.7 and 2.11
      private final AtomicBoolean on = new AtomicBoolean(false);

       @SuppressWarnings("unchecked")
       @Override public final void run() {
        if(on.get()) { // establishes a happens-before relationship with the end of the previous run
          try {
            final Signal s = inboundSignals.poll(); // We take a signal off the queue
            if (!done) { // If we're done, we shouldn't process any more signals, obeying rule 2.8
              // Below we simply unpack the `Signal`s and invoke the corresponding methods
              if (s instanceof OnNext<?>)
                handleOnNext(((OnNext<T>)s).next);
              else if (s instanceof OnSubscribe)
                handleOnSubscribe(((OnSubscribe)s).subscription);
              else if (s instanceof OnError) // We are always able to handle OnError, obeying rule 2.10
                handleOnError(((OnError)s).error);
              else if (s == OnComplete.Instance) // We are always able to handle OnComplete, obeying rule 2.9
                handleOnComplete();
            }
          } finally {
            on.set(false); // establishes a happens-before relationship with the beginning of the next run
            if(!inboundSignals.isEmpty()) // If we still have signals to process
              tryScheduleToExecute(); // Then we try to schedule ourselves to execute again
          }
        }
      }
// What `signal` does is that it sends signals to the `Subscription` asynchronously
  private void signal(final Signal signal) {
    if (inboundSignals.offer(signal)) // No need to null-check here as ConcurrentLinkedQueue does this for us
      tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already
  }

  // This method makes sure that this `Subscriber` is only executing on one Thread at a time
  private final void tryScheduleToExecute() {
    if(on.compareAndSet(false, true)) {
      try {
        executor.execute(this);
      } catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully and not violate rule 2.13
        if (!done) {
          try {
            done(); // First of all, this failure is not recoverable, so we need to cancel our subscription
          } finally {
            inboundSignals.clear(); // We're not going to need these anymore
            // This subscription is cancelled by now, but letting the Subscriber become schedulable again means
            // that we can drain the inboundSignals queue if anything arrives after clearing
            on.set(false);
          }
        }
      }
    }

3_ Is it safe?

4_ Is it commonly used for this purpose (Creating a Happens Before Relationship) ?

1

There are 1 best solutions below

4
On BEST ANSWER

Yes, write/read to AtomicBolean etablishes a happens before relationship:

compareAndSet and all other read-and-update operations such as getAndIncrement have the memory effects of both reading and writing volatile variables.

Since you didn't post the entire code and we don't know how exactly this is used it is hard to say if it is thread safe or not, but:

ad 1. it is not equivalent to synchronized block - threads do not wait

ad 2. yes, it could be more efficient, but the compareAndSwap is not obligated to be backed by volatile variable - this is datail of implementation.

ad 3. Hard to say, but the fact that run is a public method exposes some possibility of errors, eg if two threads will invoke the run directly when go will have the value of true. From my perspective it would be better to do compareAndSwap directly in the run method, but I don't know all the requirements, so it is just a suggestion.

ad 4. Yes, AtomicBoolean is commonly used.