Atomic counter increment in etcd transaction

1.7k Views Asked by At

I'm using go with etcd as DB and trying to have transactions for atomic counter. I need to have a transaction with 2 pre-conditions:

  1. some key exists i.e: clientv3.Compare(clientv3.Version(wantedKey), "!=", 0)
  2. counter is less than some maxVal i.e: clientv3.Compare(clientv3.Value(counterKey), "<", maxVal)

The problem is at the If() call, I would like to increment / decrement that counter on the same transactions, instead of having 1st getCounterVal txn and 2nd inc/decCounterVal (you know.. like compareAndSet) txn..

Does anybody knows how to do it? is 2 txn's with loops on compareAndSet() is the only way?

2

There are 2 best solutions below

0
On BEST ANSWER

Yes unfortunately (and surprisingly) etcd does not seem to provide any atomic inc/dec operations. It seems the only way is to do two seperate transactions, first one to read the current value and then a transaction which has the two comparisons plus a check on the version of the key and if all 3 checks pass, put the incremented value.

And as you said, you will need to do this in a loop because concurrent clients might have changed the value between the first transaction (Read) and the second (CompareAndSet)

It's a quite inefficient way to handle counters. An Inc or Dec op would make many things much easier but there is only Put and it can't reference anything, just carry a specified value.

See also this Github issue: https://github.com/etcd-io/etcd/issues/9714

0
On

There ia a possible way to atomic increase value by using the Revision of a key

this is a demo. I'm not sure it 100% correctly.

    wg := sync.WaitGroup{}
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            ctx, cancel := context.WithTimeout(context.Background(), timeout)
            resp, err := cli.Put(ctx, "/pub/sample_key", "sample_value")
            cancel()
            if err != nil {
                t.Error(err)
            }
            t.Log(resp.Header.Revision)
        }()
    }
    wg.Wait()