concurrent_vector vs vector with mutex, thread issues with push_back

1.6k Views Asked by At

I have an object that is being processed on by multiple tasks. I copy this object multiple times and store in a vector for a task to retrieve it's own copy to work on in a parallel_for loop. Below is the code with a standard vector.

The idea is I start out with a vector of size 0 and grow it according to the number of tasks that are launched in parallel and need their own copy. I use an atomic "_poolIndex" to track the global index per run.

    Object& GetObject()
    {
        if (_poolIndex >= _objectPool.size())
        {
            lock_guard<mutex> lock(_mutex);
            Object copy(_original);
            _objectPool.push_back(move(copy));
        }

        int taskIndex = _poolIndex.fetch_add(1);

        return _objectPool[taskIndex];
    }

I get index out of bounds in the below code in the vector class, even though position < size when the debugger breaks:

reference operator[](size_type _Pos)
{   // subscript mutable sequence
        #if _ITERATOR_DEBUG_LEVEL == 2
if (size() <= _Pos)
    {   // report error
    _DEBUG_ERROR("vector subscript out of range");
    _SCL_SECURE_OUT_OF_RANGE;
    }

So obviously the part that retrieves size() <= _Pos has evaluated something different... I'm confused because I have a lock around pushing onto the vector.

I then tried concurrent_vector and the push_back was giving me compile issues, here are the Visual Studio 2013 errors:

Error 35 error C2059: syntax error : '&' c:\program files (x86)\microsoft visual studio 12.0\vc\include\concurrent_vector.h 1492 1 UnitTests

Error 36 error C2143: syntax error : missing ';' before ')' c:\program files (x86)\microsoft visual studio 12.0\vc\include\concurrent_vector.h 1492 1 UnitTests

And in the concurrent_vector class, here is the code giving the problem when I switch _objectPool to be of concurrent_vector from vector:

    void _Init(const void *_Src)
    {
        for(; _I < _N; ++_I)
            new( &_My_array[_I] ) _Ty(*static_cast<const _Ty*>(_Src));
    }

If anyone can give guidance on the two issues above that'd be great.

I'm also trying to minimize the critical sections for efficiency. The idea is after starting up the algorithm and running it many times, the _objectPool will have most, if not all, of the copies already pushed onto the vector.

1

There are 1 best solutions below

2
On BEST ANSWER

Issues

First, there is a data-race because two values read from _poolIndex (in if and taskIndex) are not synchronized. Swap them and use taskIndex in condition instead of reading shared state yet another time.

Object& GetObject()
{
    int taskIndex = _poolIndex.fetch_add(1);
    if (taskIndex >= _objectPool.size())    // issue #2: size() is not thread-safe
    {
        lock_guard<mutex> lock(_mutex);
        //This: Object copy(_original);
        //      _objectPool.push_back(move(copy));
        // can be simplified to:
        _objectPool.push_back(_original); // issue #3: it can push at different index
    }

    return _objectPool[taskIndex];
}

The second issue might not be visible with std::vector in some conditions. But it definitely breaks usage of concurrent_vector (see why).

The third issue is that the taskIndex is not synchronized with the order of locking thus it can construct one object but return not yet constructed or allocated (out of range).

The right way

If I understand your intention correctly, you want to reuse objects that are created on the first pass for the second and create more objects if needed. I'll try to fix the issues in the code below:

Object& GetObject()
{
    int taskIndex = _poolIndex.fetch_add(1);                // get current index in the pool
    if (taskIndex >= _objectPoolSize.load(memory_order_acquire)) // atomic<size_t>
    {
        lock_guard<mutex> lock(_mutex);
        size_t sz = _objectPoolSize.load(memory_order_relaxed);
        if (taskIndex >= sz) {                              // double-check under the lock
            sz *= 2;                  // or any other factor, get a bunch of new objects at once
            _objectPool.resize(sz, _original);              // construct new copies of _original
            _objectPoolSize.store(sz, memory_order_release);// protect from reorder with resize
        }
    }
    return _objectPool[taskIndex];
}

concurrent_vector

As for concurrent_vector (available in both and ), you might want to use grow_to_at_least in order to eliminate the lock.. but:

Object& GetObject()
{
    int taskIndex = _poolIndex.fetch_add(1); // get current index in the pool
    // construct new copies of _original if free objects is about to ran out
    _objectPool.grow_to_at_least(taskIndex+10/*or other*/, _original );
    return _objectPool[taskIndex];  // ISSUE: it might not be constructed yet in other thread
}

It suffers from the same issue as size(). So, it either needs some synchronization with the _objectPoolSize or per-item synchronization with constructors based on zero-filling allocation as described in the same blog.