diff --git a/blockingconcurrentqueue.h b/blockingconcurrentqueue.h index 205a4db7..c68b0fd8 100644 --- a/blockingconcurrentqueue.h +++ b/blockingconcurrentqueue.h @@ -56,18 +56,18 @@ class BlockingConcurrentQueue // includes making the memory effects of construction visible, possibly with a // memory barrier). explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE) - : inner(capacity), sema(create(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy) + : inner(capacity), sema(create(0, static_cast(Traits::MAX_SEMA_SPINS)), &BlockingConcurrentQueue::template destroy) { - assert(reinterpret_cast((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member"); + assert(reinterpret_cast(reinterpret_cast(1)) == &(reinterpret_cast(1))->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member"); if (!sema) { MOODYCAMEL_THROW(std::bad_alloc()); } } BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers) - : inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy) + : inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create(0, static_cast(Traits::MAX_SEMA_SPINS)), &BlockingConcurrentQueue::template destroy) { - assert(reinterpret_cast((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member"); + assert(reinterpret_cast(reinterpret_cast(1)) == &(reinterpret_cast(1))->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member"); if (!sema) { MOODYCAMEL_THROW(std::bad_alloc()); } @@ -179,7 +179,7 @@ class BlockingConcurrentQueue inline bool enqueue_bulk(It itemFirst, size_t count) { if ((details::likely)(inner.enqueue_bulk(std::forward(itemFirst), count))) { - sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count); + sema->signal(static_cast(count)); return true; } return false; @@ -195,7 +195,7 @@ class BlockingConcurrentQueue inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count) { if ((details::likely)(inner.enqueue_bulk(token, std::forward(itemFirst), count))) { - sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count); + sema->signal(static_cast(count)); return true; } return false; @@ -264,7 +264,7 @@ class BlockingConcurrentQueue inline bool try_enqueue_bulk(It itemFirst, size_t count) { if (inner.try_enqueue_bulk(std::forward(itemFirst), count)) { - sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count); + sema->signal(static_cast(count)); return true; } return false; @@ -279,7 +279,7 @@ class BlockingConcurrentQueue inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count) { if (inner.try_enqueue_bulk(token, std::forward(itemFirst), count)) { - sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count); + sema->signal(static_cast(count)); return true; } return false; @@ -327,7 +327,7 @@ class BlockingConcurrentQueue inline size_t try_dequeue_bulk(It itemFirst, size_t max) { size_t count = 0; - max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max); + max = static_cast(sema->tryWaitMany(static_cast(max))); while (count != max) { count += inner.template try_dequeue_bulk(itemFirst, max - count); } @@ -343,7 +343,7 @@ class BlockingConcurrentQueue inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max) { size_t count = 0; - max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max); + max = static_cast(sema->tryWaitMany(static_cast(max))); while (count != max) { count += inner.template try_dequeue_bulk(token, itemFirst, max - count); } @@ -447,7 +447,7 @@ class BlockingConcurrentQueue inline size_t wait_dequeue_bulk(It itemFirst, size_t max) { size_t count = 0; - max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max); + max = static_cast(sema->waitMany(static_cast(max))); while (count != max) { count += inner.template try_dequeue_bulk(itemFirst, max - count); } @@ -465,7 +465,7 @@ class BlockingConcurrentQueue inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs) { size_t count = 0; - max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs); + max = static_cast(sema->waitMany(static_cast(max), timeout_usecs)); while (count != max) { count += inner.template try_dequeue_bulk(itemFirst, max - count); } @@ -492,7 +492,7 @@ class BlockingConcurrentQueue inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max) { size_t count = 0; - max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max); + max = static_cast(sema->waitMany(static_cast(max))); while (count != max) { count += inner.template try_dequeue_bulk(token, itemFirst, max - count); } @@ -510,7 +510,7 @@ class BlockingConcurrentQueue inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs) { size_t count = 0; - max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs); + max = static_cast(sema->waitMany(static_cast(max), timeout_usecs)); while (count != max) { count += inner.template try_dequeue_bulk(token, itemFirst, max - count); } @@ -537,7 +537,7 @@ class BlockingConcurrentQueue // Thread-safe. inline size_t size_approx() const { - return (size_t)sema->availableApprox(); + return static_cast(sema->availableApprox()); } diff --git a/concurrentqueue.h b/concurrentqueue.h index 9d00070f..eba9bde1 100644 --- a/concurrentqueue.h +++ b/concurrentqueue.h @@ -355,6 +355,12 @@ struct ConcurrentQueueDefaultTraits // How many full blocks can be expected for a single implicit producer? This should // reflect that number's maximum for optimal performance. Must be a power of 2. + // Note: This controls the maximum number of elements that can be enqueued by a + // single implicit producer when using try_enqueue (which does not allocate). + // The limit is BLOCK_SIZE * IMPLICIT_INITIAL_INDEX_SIZE elements; beyond that, + // the block index needs to grow, which requires allocation, causing try_enqueue + // to fail. Increase this value (or use enqueue(), which can allocate) if you need + // more capacity per implicit producer. See also issue #418. static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32; // The initial size of the hash table mapping thread IDs to implicit producers. @@ -469,8 +475,15 @@ namespace details static inline size_t hash_thread_id(thread_id_t id) { static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values"); +#if defined(__GNUC__) && !defined(__clang__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wuseless-cast" +#endif return static_cast(hash_32_or_64::thread_id_hash_t)>::hash( thread_id_converter::prehash(id))); +#if defined(__GNUC__) && !defined(__clang__) +#pragma GCC diagnostic pop +#endif } template @@ -1056,6 +1069,11 @@ class ConcurrentQueue // Does not allocate memory. Fails if not enough room to enqueue (or implicit // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE // is 0). + // Note: For implicit producers (no token), the maximum number of elements that + // can be held is BLOCK_SIZE * IMPLICIT_INITIAL_INDEX_SIZE before the block + // index must grow (which requires allocation and causes try_enqueue to fail). + // Pre-allocating blocks via the constructor does not increase the index size. + // Increase IMPLICIT_INITIAL_INDEX_SIZE in your traits, or use enqueue() instead. // Thread-safe. inline bool try_enqueue(T const& item) { diff --git a/lightweightsemaphore.h b/lightweightsemaphore.h index 3ae8100d..c103068f 100644 --- a/lightweightsemaphore.h +++ b/lightweightsemaphore.h @@ -234,8 +234,8 @@ class Semaphore #else clock_gettime(CLOCK_REALTIME, &ts); #endif - ts.tv_sec += (time_t)(usecs / usecs_in_1_sec); - ts.tv_nsec += (long)(usecs % usecs_in_1_sec) * 1000; + ts.tv_sec += static_cast(usecs / usecs_in_1_sec); + ts.tv_nsec += static_cast(usecs % usecs_in_1_sec) * 1000; // sem_timedwait bombs if you have more than 1e9 in tv_nsec // so we have to clean things up before passing it in if (ts.tv_nsec >= nsecs_in_1_sec) { @@ -306,7 +306,7 @@ class LightweightSemaphore if (m_sema.wait()) return true; } - if (timeout_usecs > 0 && m_sema.timed_wait((std::uint64_t)timeout_usecs)) + if (timeout_usecs > 0 && m_sema.timed_wait(static_cast(timeout_usecs))) return true; // At this point, we've timed out waiting for the semaphore, but the // count is still decremented indicating we may still be waiting on @@ -342,7 +342,7 @@ class LightweightSemaphore oldCount = m_count.fetch_sub(1, std::memory_order_acquire); if (oldCount <= 0) { - if ((timeout_usecs == 0) || (timeout_usecs < 0 && !m_sema.wait()) || (timeout_usecs > 0 && !m_sema.timed_wait((std::uint64_t)timeout_usecs))) + if ((timeout_usecs == 0) || (timeout_usecs < 0 && !m_sema.wait()) || (timeout_usecs > 0 && !m_sema.timed_wait(static_cast(timeout_usecs)))) { while (true) { @@ -425,7 +425,7 @@ class LightweightSemaphore ssize_t toRelease = -oldCount < count ? -oldCount : count; if (toRelease > 0) { - m_sema.signal((int)toRelease); + m_sema.signal(static_cast(toRelease)); } } diff --git a/tests/unittests/unittests.cpp b/tests/unittests/unittests.cpp index b4c4d9cc..78f86338 100644 --- a/tests/unittests/unittests.cpp +++ b/tests/unittests/unittests.cpp @@ -136,6 +136,18 @@ struct LargeTraits : public MallocTrackingTraits static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 128; }; +struct SmallImplicitIndexTraits : public MallocTrackingTraits +{ + static const size_t BLOCK_SIZE = 4; + static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 4; +}; + +struct LargerImplicitIndexTraits : public MallocTrackingTraits +{ + static const size_t BLOCK_SIZE = 4; + static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 16; +}; + // Note: Not thread safe! struct Foo { @@ -375,6 +387,7 @@ class ConcurrentQueueTests : public TestClass REGISTER_TEST(explicit_strings_threaded); REGISTER_TEST(large_traits); + REGISTER_TEST(implicit_producer_index_limit); } bool postTest(bool testSucceeded) override @@ -5031,6 +5044,68 @@ class ConcurrentQueueTests : public TestClass return true; } + bool implicit_producer_index_limit() + { + // Issue #418: try_enqueue() fails around BLOCK_SIZE * IMPLICIT_INITIAL_INDEX_SIZE + // elements even when blocks have been pre-allocated, because the block index + // (not the blocks themselves) needs to grow, which requires allocation that + // try_enqueue refuses to do. + + // SmallImplicitIndexTraits: BLOCK_SIZE=4, IMPLICIT_INITIAL_INDEX_SIZE=4, limit=16 + // LargerImplicitIndexTraits: BLOCK_SIZE=4, IMPLICIT_INITIAL_INDEX_SIZE=16, limit=64 + + { + // Demonstrate the limit: try_enqueue fails at BLOCK_SIZE * IMPLICIT_INITIAL_INDEX_SIZE + const int limit = (int)(SmallImplicitIndexTraits::BLOCK_SIZE * SmallImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE); + ConcurrentQueue q(limit + 64); // Pre-allocate plenty of blocks + + int successCount = 0; + for (int i = 0; i < limit + 64; ++i) { + if (!q.try_enqueue(i)) + break; + ++successCount; + } + // try_enqueue should stop succeeding at the index limit + ASSERT_OR_FAIL(successCount == limit); + } + + { + // Fix #1: enqueue() (which can allocate) works past the limit + const int limit = (int)(SmallImplicitIndexTraits::BLOCK_SIZE * SmallImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE); + ConcurrentQueue q(limit + 64); + + for (int i = 0; i < limit + 64; ++i) { + ASSERT_OR_FAIL(q.enqueue(i)); + } + // Verify all elements are dequeued correctly + int item; + for (int i = 0; i < limit + 64; ++i) { + ASSERT_OR_FAIL(q.try_dequeue(item)); + ASSERT_OR_FAIL(item == i); + } + ASSERT_OR_FAIL(!q.try_dequeue(item)); + } + + { + // Fix #2: larger IMPLICIT_INITIAL_INDEX_SIZE allows more try_enqueue calls + const int small_limit = (int)(SmallImplicitIndexTraits::BLOCK_SIZE * SmallImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE); // 16 + const int large_limit = (int)(LargerImplicitIndexTraits::BLOCK_SIZE * LargerImplicitIndexTraits::IMPLICIT_INITIAL_INDEX_SIZE); // 64 + ConcurrentQueue q(large_limit + 64); + + int successCount = 0; + for (int i = 0; i < large_limit + 64; ++i) { + if (!q.try_enqueue(i)) + break; + ++successCount; + } + // Should succeed well past the old small limit + ASSERT_OR_FAIL(successCount > small_limit); + ASSERT_OR_FAIL(successCount == large_limit); + } + + return true; + } + bool large_traits() { union Elem { uint32_t x; char dummy[156]; };