Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 29 additions & 19 deletions blockingconcurrentqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ class BlockingConcurrentQueue
// includes making the memory effects of construction visible, possibly with a
// memory barrier).
explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
: inner(capacity), sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
: inner(capacity), sema(create<LightweightSemaphore, ssize_t, int>(0, static_cast<int>(Traits::MAX_SEMA_SPINS)), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
{
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
assert(reinterpret_cast<ConcurrentQueue*>(reinterpret_cast<BlockingConcurrentQueue*>(1)) == &(reinterpret_cast<BlockingConcurrentQueue*>(1))->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
if (!sema) {
MOODYCAMEL_THROW(std::bad_alloc());
}
}

BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
: inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
: inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore, ssize_t, int>(0, static_cast<int>(Traits::MAX_SEMA_SPINS)), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
{
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
assert(reinterpret_cast<ConcurrentQueue*>(reinterpret_cast<BlockingConcurrentQueue*>(1)) == &(reinterpret_cast<BlockingConcurrentQueue*>(1))->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
if (!sema) {
MOODYCAMEL_THROW(std::bad_alloc());
}
Expand Down Expand Up @@ -179,12 +179,13 @@ class BlockingConcurrentQueue
inline bool enqueue_bulk(It itemFirst, size_t count)
{
if ((details::likely)(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
assert(static_cast<ssize_t>(count) >= 0);
sema->signal(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(count)));
return true;
}
return false;
}

// Enqueues several items using an explicit producer token.
// Allocates memory if required. Only fails if memory allocation fails
// (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
Expand All @@ -195,7 +196,8 @@ class BlockingConcurrentQueue
inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
if ((details::likely)(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
assert(static_cast<ssize_t>(count) >= 0);
sema->signal(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(count)));
return true;
}
return false;
Expand Down Expand Up @@ -264,12 +266,13 @@ class BlockingConcurrentQueue
inline bool try_enqueue_bulk(It itemFirst, size_t count)
{
if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
assert(static_cast<ssize_t>(count) >= 0);
sema->signal(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(count)));
return true;
}
return false;
}

// Enqueues several items using an explicit producer token.
// Does not allocate memory. Fails if not enough room to enqueue.
// Note: Use std::make_move_iterator if the elements should be moved
Expand All @@ -279,7 +282,8 @@ class BlockingConcurrentQueue
inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
assert(static_cast<ssize_t>(count) >= 0);
sema->signal(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(count)));
return true;
}
return false;
Expand Down Expand Up @@ -327,13 +331,14 @@ class BlockingConcurrentQueue
inline size_t try_dequeue_bulk(It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
assert(static_cast<ssize_t>(max) >= 0);
max = static_cast<size_t>(sema->tryWaitMany(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(max))));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
return count;
}

// Attempts to dequeue several elements from the queue using an explicit consumer token.
// Returns the number of items actually dequeued.
// Returns 0 if all producer streams appeared empty at the time they
Expand All @@ -343,7 +348,8 @@ class BlockingConcurrentQueue
inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
assert(static_cast<ssize_t>(max) >= 0);
max = static_cast<size_t>(sema->tryWaitMany(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(max))));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
Expand Down Expand Up @@ -447,13 +453,14 @@ class BlockingConcurrentQueue
inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
assert(static_cast<ssize_t>(max) >= 0);
max = static_cast<size_t>(sema->waitMany(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(max))));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
return count;
}

// Attempts to dequeue several elements from the queue.
// Returns the number of items actually dequeued, which can
// be 0 if the timeout expires while waiting for elements,
Expand All @@ -465,7 +472,8 @@ class BlockingConcurrentQueue
inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
assert(static_cast<ssize_t>(max) >= 0);
max = static_cast<size_t>(sema->waitMany(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(max)), timeout_usecs));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
Expand All @@ -492,7 +500,8 @@ class BlockingConcurrentQueue
inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
assert(static_cast<ssize_t>(max) >= 0);
max = static_cast<size_t>(sema->waitMany(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(max))));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
Expand All @@ -510,7 +519,8 @@ class BlockingConcurrentQueue
inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
assert(static_cast<ssize_t>(max) >= 0);
max = static_cast<size_t>(sema->waitMany(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(max)), timeout_usecs));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
Expand All @@ -537,7 +547,7 @@ class BlockingConcurrentQueue
// Thread-safe.
inline size_t size_approx() const
{
return (size_t)sema->availableApprox();
return static_cast<size_t>(sema->availableApprox());
}


Expand Down
7 changes: 7 additions & 0 deletions concurrentqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,15 @@ namespace details
static inline size_t hash_thread_id(thread_id_t id)
{
static_assert(sizeof(thread_id_t) <= 8, "Expected a platform where thread IDs are at most 64-bit values");
#if defined(__GNUC__) && !defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wuseless-cast"
#endif
return static_cast<size_t>(hash_32_or_64<sizeof(thread_id_converter<thread_id_t>::thread_id_hash_t)>::hash(
thread_id_converter<thread_id_t>::prehash(id)));
#if defined(__GNUC__) && !defined(__clang__)
#pragma GCC diagnostic pop
#endif
}

template<typename T>
Expand Down
10 changes: 5 additions & 5 deletions lightweightsemaphore.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ class Semaphore
#else
clock_gettime(CLOCK_REALTIME, &ts);
#endif
ts.tv_sec += (time_t)(usecs / usecs_in_1_sec);
ts.tv_nsec += (long)(usecs % usecs_in_1_sec) * 1000;
ts.tv_sec += static_cast<time_t>(usecs / usecs_in_1_sec);
ts.tv_nsec += static_cast<long>(usecs % usecs_in_1_sec) * 1000;
// sem_timedwait bombs if you have more than 1e9 in tv_nsec
// so we have to clean things up before passing it in
if (ts.tv_nsec >= nsecs_in_1_sec) {
Expand Down Expand Up @@ -306,7 +306,7 @@ class LightweightSemaphore
if (m_sema.wait())
return true;
}
if (timeout_usecs > 0 && m_sema.timed_wait((std::uint64_t)timeout_usecs))
if (timeout_usecs > 0 && m_sema.timed_wait(static_cast<std::uint64_t>(timeout_usecs)))
return true;
// At this point, we've timed out waiting for the semaphore, but the
// count is still decremented indicating we may still be waiting on
Expand Down Expand Up @@ -342,7 +342,7 @@ class LightweightSemaphore
oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
if (oldCount <= 0)
{
if ((timeout_usecs == 0) || (timeout_usecs < 0 && !m_sema.wait()) || (timeout_usecs > 0 && !m_sema.timed_wait((std::uint64_t)timeout_usecs)))
if ((timeout_usecs == 0) || (timeout_usecs < 0 && !m_sema.wait()) || (timeout_usecs > 0 && !m_sema.timed_wait(static_cast<std::uint64_t>(timeout_usecs))))
{
while (true)
{
Expand Down Expand Up @@ -425,7 +425,7 @@ class LightweightSemaphore
ssize_t toRelease = -oldCount < count ? -oldCount : count;
if (toRelease > 0)
{
m_sema.signal((int)toRelease);
m_sema.signal(static_cast<int>(toRelease));
}
}

Expand Down