Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions blockingconcurrentqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ class BlockingConcurrentQueue
// includes making the memory effects of construction visible, possibly with a
// memory barrier).
explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
: inner(capacity), sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
: inner(capacity), sema(create<LightweightSemaphore, ssize_t, int>(0, static_cast<int>(Traits::MAX_SEMA_SPINS)), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
{
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
assert(reinterpret_cast<ConcurrentQueue*>(reinterpret_cast<BlockingConcurrentQueue*>(1)) == &(reinterpret_cast<BlockingConcurrentQueue*>(1))->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
if (!sema) {
MOODYCAMEL_THROW(std::bad_alloc());
}
}

BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
: inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
: inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore, ssize_t, int>(0, static_cast<int>(Traits::MAX_SEMA_SPINS)), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
{
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
assert(reinterpret_cast<ConcurrentQueue*>(reinterpret_cast<BlockingConcurrentQueue*>(1)) == &(reinterpret_cast<BlockingConcurrentQueue*>(1))->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
if (!sema) {
MOODYCAMEL_THROW(std::bad_alloc());
}
Expand Down Expand Up @@ -179,7 +179,7 @@ class BlockingConcurrentQueue
inline bool enqueue_bulk(It itemFirst, size_t count)
{
if ((details::likely)(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
sema->signal(static_cast<LightweightSemaphore::ssize_t>(count));
return true;
}
return false;
Expand All @@ -195,7 +195,7 @@ class BlockingConcurrentQueue
inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
if ((details::likely)(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
sema->signal(static_cast<LightweightSemaphore::ssize_t>(count));
return true;
}
return false;
Expand Down Expand Up @@ -264,7 +264,7 @@ class BlockingConcurrentQueue
inline bool try_enqueue_bulk(It itemFirst, size_t count)
{
if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
sema->signal(static_cast<LightweightSemaphore::ssize_t>(count));
return true;
}
return false;
Expand All @@ -279,7 +279,7 @@ class BlockingConcurrentQueue
inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
sema->signal(static_cast<LightweightSemaphore::ssize_t>(count));
return true;
}
return false;
Expand Down Expand Up @@ -327,7 +327,7 @@ class BlockingConcurrentQueue
inline size_t try_dequeue_bulk(It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
max = static_cast<size_t>(sema->tryWaitMany(static_cast<LightweightSemaphore::ssize_t>(max)));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
Expand All @@ -343,7 +343,7 @@ class BlockingConcurrentQueue
inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
max = static_cast<size_t>(sema->tryWaitMany(static_cast<LightweightSemaphore::ssize_t>(max)));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
Expand Down Expand Up @@ -447,7 +447,7 @@ class BlockingConcurrentQueue
inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
max = static_cast<size_t>(sema->waitMany(static_cast<LightweightSemaphore::ssize_t>(max)));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
Expand All @@ -465,7 +465,7 @@ class BlockingConcurrentQueue
inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
max = static_cast<size_t>(sema->waitMany(static_cast<LightweightSemaphore::ssize_t>(max), timeout_usecs));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
Expand All @@ -492,7 +492,7 @@ class BlockingConcurrentQueue
inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
max = static_cast<size_t>(sema->waitMany(static_cast<LightweightSemaphore::ssize_t>(max)));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
Expand All @@ -510,7 +510,7 @@ class BlockingConcurrentQueue
inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
max = static_cast<size_t>(sema->waitMany(static_cast<LightweightSemaphore::ssize_t>(max), timeout_usecs));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
Expand All @@ -537,7 +537,7 @@ class BlockingConcurrentQueue
// Thread-safe.
inline size_t size_approx() const
{
return (size_t)sema->availableApprox();
return static_cast<size_t>(sema->availableApprox());
}


Expand Down
18 changes: 18 additions & 0 deletions concurrentqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,12 @@ struct ConcurrentQueueDefaultTraits

// How many full blocks can be expected for a single implicit producer? This should
// reflect that number's maximum for optimal performance. Must be a power of 2.
// Note: This controls the maximum number of elements that can be enqueued by a
// single implicit producer when using try_enqueue (which does not allocate).
// The limit is BLOCK_SIZE * IMPLICIT_INITIAL_INDEX_SIZE elements; beyond that,
// the block index needs to grow, which requires allocation, causing try_enqueue
// to fail. Increase this value (or use enqueue(), which can allocate) if you need
// more capacity per implicit producer. See also issue #418.
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;

// The initial size of the hash table mapping thread IDs to implicit producers.
Expand Down Expand Up @@ -469,8 +475,15 @@ namespace details
static inline size_t hash_thread_id(thread_id_t id)
{
static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
#if defined(__GNUC__) && !defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wuseless-cast"
#endif
return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
thread_id_converter<thread_id_t>::prehash(id)));
#if defined(__GNUC__) && !defined(__clang__)
#pragma GCC diagnostic pop
#endif
}

template<typename T>
Expand Down Expand Up @@ -1056,6 +1069,11 @@ class ConcurrentQueue
// Does not allocate memory. Fails if not enough room to enqueue (or implicit
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
// is 0).
// Note: For implicit producers (no token), the maximum number of elements that
// can be held is BLOCK_SIZE * IMPLICIT_INITIAL_INDEX_SIZE before the block
// index must grow (which requires allocation and causes try_enqueue to fail).
// Pre-allocating blocks via the constructor does not increase the index size.
// Increase IMPLICIT_INITIAL_INDEX_SIZE in your traits, or use enqueue() instead.
// Thread-safe.
inline bool try_enqueue(T const& item)
{
Expand Down
10 changes: 5 additions & 5 deletions lightweightsemaphore.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ class Semaphore
#else
clock_gettime(CLOCK_REALTIME, &ts);
#endif
ts.tv_sec += (time_t)(usecs / usecs_in_1_sec);
ts.tv_nsec += (long)(usecs % usecs_in_1_sec) * 1000;
ts.tv_sec += static_cast<time_t>(usecs / usecs_in_1_sec);
ts.tv_nsec += static_cast<long>(usecs % usecs_in_1_sec) * 1000;
// sem_timedwait bombs if you have more than 1e9 in tv_nsec
// so we have to clean things up before passing it in
if (ts.tv_nsec >= nsecs_in_1_sec) {
Expand Down Expand Up @@ -306,7 +306,7 @@ class LightweightSemaphore
if (m_sema.wait())
return true;
}
if (timeout_usecs > 0 && m_sema.timed_wait((std::uint64_t)timeout_usecs))
if (timeout_usecs > 0 && m_sema.timed_wait(static_cast<std::uint64_t>(timeout_usecs)))
return true;
// At this point, we've timed out waiting for the semaphore, but the
// count is still decremented indicating we may still be waiting on
Expand Down Expand Up @@ -342,7 +342,7 @@ class LightweightSemaphore
oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
if (oldCount <= 0)
{
if ((timeout_usecs == 0) || (timeout_usecs < 0 && !m_sema.wait()) || (timeout_usecs > 0 && !m_sema.timed_wait((std::uint64_t)timeout_usecs)))
if ((timeout_usecs == 0) || (timeout_usecs < 0 && !m_sema.wait()) || (timeout_usecs > 0 && !m_sema.timed_wait(static_cast<std::uint64_t>(timeout_usecs))))
{
while (true)
{
Expand Down Expand Up @@ -425,7 +425,7 @@ class LightweightSemaphore
ssize_t toRelease = -oldCount < count ? -oldCount : count;
if (toRelease > 0)
{
m_sema.signal((int)toRelease);
m_sema.signal(static_cast<int>(toRelease));
}
}

Expand Down
75 changes: 75 additions & 0 deletions tests/unittests/unittests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,18 @@ struct LargeTraits : public MallocTrackingTraits
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 128;
};

struct SmallImplicitIndexTraits : public MallocTrackingTraits
{
static const size_t BLOCK_SIZE = 4;
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 4;
};

struct LargerImplicitIndexTraits : public MallocTrackingTraits
{
static const size_t BLOCK_SIZE = 4;
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 16;
};

// Note: Not thread safe!
struct Foo
{
Expand Down Expand Up @@ -375,6 +387,7 @@ class ConcurrentQueueTests : public TestClass<ConcurrentQueueTests>

REGISTER_TEST(explicit_strings_threaded);
REGISTER_TEST(large_traits);
REGISTER_TEST(implicit_producer_index_limit);
}

bool postTest(bool testSucceeded) override
Expand Down Expand Up @@ -5031,6 +5044,68 @@ class ConcurrentQueueTests : public TestClass<ConcurrentQueueTests>
return true;
}

bool implicit_producer_index_limit()
{
// Issue #418: try_enqueue() fails around BLOCK_SIZE * IMPLICIT_INITIAL_INDEX_SIZE
// elements even when blocks have been pre-allocated, because the block index
// (not the blocks themselves) needs to grow, which requires allocation that
// try_enqueue refuses to do.

// SmallImplicitIndexTraits: BLOCK_SIZE=4, IMPLICIT_INITIAL_INDEX_SIZE=4, limit=16
// LargerImplicitIndexTraits: BLOCK_SIZE=4, IMPLICIT_INITIAL_INDEX_SIZE=16, limit=64

{
// Demonstrate the limit: try_enqueue fails at BLOCK_SIZE * IMPLICIT_INITIAL_INDEX_SIZE
const int limit = (int)(SmallImplicitIndexTraits::BLOCK_SIZE * SmallImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE);
ConcurrentQueue<int, SmallImplicitIndexTraits> q(limit + 64); // Pre-allocate plenty of blocks

int successCount = 0;
for (int i = 0; i < limit + 64; ++i) {
if (!q.try_enqueue(i))
break;
++successCount;
}
// try_enqueue should stop succeeding at the index limit
ASSERT_OR_FAIL(successCount == limit);
}

{
// Fix #1: enqueue() (which can allocate) works past the limit
const int limit = (int)(SmallImplicitIndexTraits::BLOCK_SIZE * SmallImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE);
ConcurrentQueue<int, SmallImplicitIndexTraits> q(limit + 64);

for (int i = 0; i < limit + 64; ++i) {
ASSERT_OR_FAIL(q.enqueue(i));
}
// Verify all elements are dequeued correctly
int item;
for (int i = 0; i < limit + 64; ++i) {
ASSERT_OR_FAIL(q.try_dequeue(item));
ASSERT_OR_FAIL(item == i);
}
ASSERT_OR_FAIL(!q.try_dequeue(item));
}

{
// Fix #2: larger IMPLICIT_INITIAL_INDEX_SIZE allows more try_enqueue calls
const int small_limit = (int)(SmallImplicitIndexTraits::BLOCK_SIZE * SmallImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE); // 16
const int large_limit = (int)(LargerImplicitIndexTraits::BLOCK_SIZE * LargerImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE); // 64
ConcurrentQueue<int, LargerImplicitIndexTraits> q(large_limit + 64);

int successCount = 0;
for (int i = 0; i < large_limit + 64; ++i) {
if (!q.try_enqueue(i))
break;
++successCount;
}
// Should succeed well past the old small limit
ASSERT_OR_FAIL(successCount > small_limit);
ASSERT_OR_FAIL(successCount == large_limit);
}

return true;
}

bool large_traits()
{
union Elem { uint32_t x; char dummy[156]; };
Expand Down