ASPiK SDK
Loading...
Searching...
No Matches
readerwriterqueue.h
1// ©2013-2016 Cameron Desrochers.
2// Distributed under the simplified BSD license (see the license file that
3// should have come with this header).
4
5#pragma once
6
7#include "atomicops.h"
8#include <type_traits>
9#include <utility>
10#include <cassert>
11#include <stdexcept>
12#include <new>
13#include <cstdint>
14#include <cstdlib> // For malloc/free/abort & size_t
15#if __cplusplus > 199711L || _MSC_VER >= 1700 // C++11 or VS2012
16#include <chrono>
17#endif
18
19
20// A lock-free queue for a single-consumer, single-producer architecture.
21// The queue is also wait-free in the common path (except if more memory
22// needs to be allocated, in which case malloc is called).
23// Allocates memory sparingly (O(lg(n) times, amortized), and only once if
24// the original maximum size estimate is never exceeded.
25// Tested on x86/x64 processors, but semantics should be correct for all
26// architectures (given the right implementations in atomicops.h), provided
27// that aligned integer and pointer accesses are naturally atomic.
28// Note that there should only be one consumer thread and producer thread;
29// Switching roles of the threads, or using multiple consecutive threads for
30// one role, is not safe unless properly synchronized.
31// Using the queue exclusively from one thread is fine, though a bit silly.
32
33#ifndef MOODYCAMEL_CACHE_LINE_SIZE
34#define MOODYCAMEL_CACHE_LINE_SIZE 64
35#endif
36
37#ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
38#if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
39#define MOODYCAMEL_EXCEPTIONS_ENABLED
40#endif
41#endif
42
43#ifndef MOODYCAMEL_HAS_EMPLACE
44#if !defined(_MSC_VER) || _MSC_VER >= 1800 // variadic templates: either a non-MS compiler or VS >= 2013
45#define MOODYCAMEL_HAS_EMPLACE 1
46#endif
47#endif
48
49#ifdef AE_VCPP
50#pragma warning(push)
51#pragma warning(disable: 4324) // structure was padded due to __declspec(align())
52#pragma warning(disable: 4820) // padding was added
53#pragma warning(disable: 4127) // conditional expression is constant
54#endif
55
56namespace moodycamel {
57
58template<typename T, size_t MAX_BLOCK_SIZE = 512>
60{
61 // Design: Based on a queue-of-queues. The low-level queues are just
62 // circular buffers with front and tail indices indicating where the
63 // next element to dequeue is and where the next element can be enqueued,
64 // respectively. Each low-level queue is called a "block". Each block
65 // wastes exactly one element's worth of space to keep the design simple
66 // (if front == tail then the queue is empty, and can't be full).
67 // The high-level queue is a circular linked list of blocks; again there
68 // is a front and tail, but this time they are pointers to the blocks.
69 // The front block is where the next element to be dequeued is, provided
70 // the block is not empty. The back block is where elements are to be
71 // enqueued, provided the block is not full.
72 // The producer thread owns all the tail indices/pointers. The consumer
73 // thread owns all the front indices/pointers. Both threads read each
74 // other's variables, but only the owning thread updates them. E.g. After
75 // the consumer reads the producer's tail, the tail may change before the
76 // consumer is done dequeuing an object, but the consumer knows the tail
77 // will never go backwards, only forwards.
78 // If there is no room to enqueue an object, an additional block (of
79 // equal size to the last block) is added. Blocks are never removed.
80
81public:
82 typedef T value_type;
83
84 // Constructs a queue that can hold maxSize elements without further
85 // allocations. If more than MAX_BLOCK_SIZE elements are requested,
86 // then several blocks of MAX_BLOCK_SIZE each are reserved (including
87 // at least one extra buffer block).
88 explicit ReaderWriterQueue(size_t maxSize = 15)
89#ifndef NDEBUG
90 : enqueuing(false)
91 ,dequeuing(false)
92#endif
93 {
94 assert(maxSize > 0);
95 assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) && "MAX_BLOCK_SIZE must be a power of 2");
96 assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2");
97
98 Block* firstBlock = nullptr;
99
100 largestBlockSize = ceilToPow2(maxSize + 1); // We need a spare slot to fit maxSize elements in the block
101 if (largestBlockSize > MAX_BLOCK_SIZE * 2) {
102 // We need a spare block in case the producer is writing to a different block the consumer is reading from, and
103 // wants to enqueue the maximum number of elements. We also need a spare element in each block to avoid the ambiguity
104 // between front == tail meaning "empty" and "full".
105 // So the effective number of slots that are guaranteed to be usable at any time is the block size - 1 times the
106 // number of blocks - 1. Solving for maxSize and applying a ceiling to the division gives us (after simplifying):
107 size_t initialBlockCount = (maxSize + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
108 largestBlockSize = MAX_BLOCK_SIZE;
109 Block* lastBlock = nullptr;
110 for (size_t i = 0; i != initialBlockCount; ++i) {
111 auto block = make_block(largestBlockSize);
112 if (block == nullptr) {
113#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
114 throw std::bad_alloc();
115#else
116 abort();
117#endif
118 }
119 if (firstBlock == nullptr) {
120 firstBlock = block;
121 }
122 else {
123 lastBlock->next = block;
124 }
125 lastBlock = block;
126 block->next = firstBlock;
127 }
128 }
129 else {
130 firstBlock = make_block(largestBlockSize);
131 if (firstBlock == nullptr) {
132#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
133 throw std::bad_alloc();
134#else
135 abort();
136#endif
137 }
138 firstBlock->next = firstBlock;
139 }
140 frontBlock = firstBlock;
141 tailBlock = firstBlock;
142
143 // Make sure the reader/writer threads will have the initialized memory setup above:
144 fence(memory_order_sync);
145 }
146
147 // Note: The queue should not be accessed concurrently while it's
148 // being moved. It's up to the user to synchronize this.
150 : frontBlock(other.frontBlock.load()),
151 tailBlock(other.tailBlock.load()),
152 largestBlockSize(other.largestBlockSize)
153#ifndef NDEBUG
154 ,enqueuing(false)
155 ,dequeuing(false)
156#endif
157 {
158 other.largestBlockSize = 32;
159 Block* b = other.make_block(other.largestBlockSize);
160 if (b == nullptr) {
161#ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
162 throw std::bad_alloc();
163#else
164 abort();
165#endif
166 }
167 b->next = b;
168 other.frontBlock = b;
169 other.tailBlock = b;
170 }
171
172 // Note: The queue should not be accessed concurrently while it's
173 // being moved. It's up to the user to synchronize this.
174 ReaderWriterQueue& operator=(ReaderWriterQueue&& other)
175 {
176 Block* b = frontBlock.load();
177 frontBlock = other.frontBlock.load();
178 other.frontBlock = b;
179 b = tailBlock.load();
180 tailBlock = other.tailBlock.load();
181 other.tailBlock = b;
182 std::swap(largestBlockSize, other.largestBlockSize);
183 return *this;
184 }
185
186 // Note: The queue should not be accessed concurrently while it's
187 // being deleted. It's up to the user to synchronize this.
189 {
190 // Make sure we get the latest version of all variables from other CPUs:
191 fence(memory_order_sync);
192
193 // Destroy any remaining objects in queue and free memory
194 Block* frontBlock_ = frontBlock;
195 Block* block = frontBlock_;
196 do {
197 Block* nextBlock = block->next;
198 size_t blockFront = block->front;
199 size_t blockTail = block->tail;
200
201 for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask) {
202 auto element = reinterpret_cast<T*>(block->data + i * sizeof(T));
203 element->~T();
204 (void)element;
205 }
206
207 auto rawBlock = block->rawThis;
208 block->~Block();
209 std::free(rawBlock);
210 block = nextBlock;
211 } while (block != frontBlock_);
212 }
213
214
215 // Enqueues a copy of element if there is room in the queue.
216 // Returns true if the element was enqueued, false otherwise.
217 // Does not allocate memory.
218 AE_FORCEINLINE bool try_enqueue(T const& element)
219 {
220 return inner_enqueue<CannotAlloc>(element);
221 }
222
223 // Enqueues a moved copy of element if there is room in the queue.
224 // Returns true if the element was enqueued, false otherwise.
225 // Does not allocate memory.
226 AE_FORCEINLINE bool try_enqueue(T&& element)
227 {
228 return inner_enqueue<CannotAlloc>(std::forward<T>(element));
229 }
230
231#if MOODYCAMEL_HAS_EMPLACE
232 // Like try_enqueue() but with emplace semantics (i.e. construct-in-place).
233 template<typename... Args>
234 AE_FORCEINLINE bool try_emplace(Args&&... args)
235 {
236 return inner_enqueue<CannotAlloc>(std::forward<Args>(args)...);
237 }
238#endif
239
240 // Enqueues a copy of element on the queue.
241 // Allocates an additional block of memory if needed.
242 // Only fails (returns false) if memory allocation fails.
243 AE_FORCEINLINE bool enqueue(T const& element)
244 {
245 return inner_enqueue<CanAlloc>(element);
246 }
247
248 // Enqueues a moved copy of element on the queue.
249 // Allocates an additional block of memory if needed.
250 // Only fails (returns false) if memory allocation fails.
251 AE_FORCEINLINE bool enqueue(T&& element)
252 {
253 return inner_enqueue<CanAlloc>(std::forward<T>(element));
254 }
255
256#if MOODYCAMEL_HAS_EMPLACE
257 // Like enqueue() but with emplace semantics (i.e. construct-in-place).
258 template<typename... Args>
259 AE_FORCEINLINE bool emplace(Args&&... args)
260 {
261 return inner_enqueue<CanAlloc>(std::forward<Args>(args)...);
262 }
263#endif
264
265 // Attempts to dequeue an element; if the queue is empty,
266 // returns false instead. If the queue has at least one element,
267 // moves front to result using operator=, then returns true.
268 template<typename U>
269 bool try_dequeue(U& result)
270 {
271#ifndef NDEBUG
272 ReentrantGuard guard(this->dequeuing);
273#endif
274
275 // High-level pseudocode:
276 // Remember where the tail block is
277 // If the front block has an element in it, dequeue it
278 // Else
279 // If front block was the tail block when we entered the function, return false
280 // Else advance to next block and dequeue the item there
281
282 // Note that we have to use the value of the tail block from before we check if the front
283 // block is full or not, in case the front block is empty and then, before we check if the
284 // tail block is at the front block or not, the producer fills up the front block *and
285 // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently
286 // reproducible in practice.
287 // In order to avoid overhead in the common case, though, we do a double-checked pattern
288 // where we have the fast path if the front block is not empty, then read the tail block,
289 // then re-read the front block and check if it's not empty again, then check if the tail
290 // block has advanced.
291
292 Block* frontBlock_ = frontBlock.load();
293 size_t blockTail = frontBlock_->localTail;
294 size_t blockFront = frontBlock_->front.load();
295
296 if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
297 fence(memory_order_acquire);
298
299 non_empty_front_block:
300 // Front block not empty, dequeue from here
301 auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
302 result = std::move(*element);
303 element->~T();
304
305 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
306
307 fence(memory_order_release);
308 frontBlock_->front = blockFront;
309 }
310 else if (frontBlock_ != tailBlock.load()) {
311 fence(memory_order_acquire);
312
313 frontBlock_ = frontBlock.load();
314 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
315 blockFront = frontBlock_->front.load();
316 fence(memory_order_acquire);
317
318 if (blockFront != blockTail) {
319 // Oh look, the front block isn't empty after all
320 goto non_empty_front_block;
321 }
322
323 // Front block is empty but there's another block ahead, advance to it
324 Block* nextBlock = frontBlock_->next;
325 // Don't need an acquire fence here since next can only ever be set on the tailBlock,
326 // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which
327 // ensures next is up-to-date on this CPU in case we recently were at tailBlock.
328
329 size_t nextBlockFront = nextBlock->front.load();
330 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
331 fence(memory_order_acquire);
332
333 // Since the tailBlock is only ever advanced after being written to,
334 // we know there's for sure an element to dequeue on it
335 assert(nextBlockFront != nextBlockTail);
336 AE_UNUSED(nextBlockTail);
337
338 // We're done with this block, let the producer use it if it needs
339 fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue
340 frontBlock = frontBlock_ = nextBlock;
341
342 compiler_fence(memory_order_release); // Not strictly needed
343
344 auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
345
346 result = std::move(*element);
347 element->~T();
348
349 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
350
351 fence(memory_order_release);
352 frontBlock_->front = nextBlockFront;
353 }
354 else {
355 // No elements in current block and no other block to advance to
356 return false;
357 }
358
359 return true;
360 }
361
362
363 // Returns a pointer to the front element in the queue (the one that
364 // would be removed next by a call to `try_dequeue` or `pop`). If the
365 // queue appears empty at the time the method is called, nullptr is
366 // returned instead.
367 // Must be called only from the consumer thread.
368 T* peek()
369 {
370#ifndef NDEBUG
371 ReentrantGuard guard(this->dequeuing);
372#endif
373 // See try_dequeue() for reasoning
374
375 Block* frontBlock_ = frontBlock.load();
376 size_t blockTail = frontBlock_->localTail;
377 size_t blockFront = frontBlock_->front.load();
378
379 if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
380 fence(memory_order_acquire);
381 non_empty_front_block:
382 return reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
383 }
384 else if (frontBlock_ != tailBlock.load()) {
385 fence(memory_order_acquire);
386 frontBlock_ = frontBlock.load();
387 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
388 blockFront = frontBlock_->front.load();
389 fence(memory_order_acquire);
390
391 if (blockFront != blockTail) {
392 goto non_empty_front_block;
393 }
394
395 Block* nextBlock = frontBlock_->next;
396
397 size_t nextBlockFront = nextBlock->front.load();
398 fence(memory_order_acquire);
399
400 assert(nextBlockFront != nextBlock->tail.load());
401 return reinterpret_cast<T*>(nextBlock->data + nextBlockFront * sizeof(T));
402 }
403
404 return nullptr;
405 }
406
407 // Removes the front element from the queue, if any, without returning it.
408 // Returns true on success, or false if the queue appeared empty at the time
409 // `pop` was called.
410 bool pop()
411 {
412#ifndef NDEBUG
413 ReentrantGuard guard(this->dequeuing);
414#endif
415 // See try_dequeue() for reasoning
416
417 Block* frontBlock_ = frontBlock.load();
418 size_t blockTail = frontBlock_->localTail;
419 size_t blockFront = frontBlock_->front.load();
420
421 if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
422 fence(memory_order_acquire);
423
424 non_empty_front_block:
425 auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
426 element->~T();
427
428 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
429
430 fence(memory_order_release);
431 frontBlock_->front = blockFront;
432 }
433 else if (frontBlock_ != tailBlock.load()) {
434 fence(memory_order_acquire);
435 frontBlock_ = frontBlock.load();
436 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
437 blockFront = frontBlock_->front.load();
438 fence(memory_order_acquire);
439
440 if (blockFront != blockTail) {
441 goto non_empty_front_block;
442 }
443
444 // Front block is empty but there's another block ahead, advance to it
445 Block* nextBlock = frontBlock_->next;
446
447 size_t nextBlockFront = nextBlock->front.load();
448 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
449 fence(memory_order_acquire);
450
451 assert(nextBlockFront != nextBlockTail);
452 AE_UNUSED(nextBlockTail);
453
454 fence(memory_order_release);
455 frontBlock = frontBlock_ = nextBlock;
456
457 compiler_fence(memory_order_release);
458
459 auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
460 element->~T();
461
462 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
463
464 fence(memory_order_release);
465 frontBlock_->front = nextBlockFront;
466 }
467 else {
468 // No elements in current block and no other block to advance to
469 return false;
470 }
471
472 return true;
473 }
474
475 // Returns the approximate number of items currently in the queue.
476 // Safe to call from both the producer and consumer threads.
477 inline size_t size_approx() const
478 {
479 size_t result = 0;
480 Block* frontBlock_ = frontBlock.load();
481 Block* block = frontBlock_;
482 do {
483 fence(memory_order_acquire);
484 size_t blockFront = block->front.load();
485 size_t blockTail = block->tail.load();
486 result += (blockTail - blockFront) & block->sizeMask;
487 block = block->next.load();
488 } while (block != frontBlock_);
489 return result;
490 }
491
492
493private:
494 enum AllocationMode { CanAlloc, CannotAlloc };
495
496#if MOODYCAMEL_HAS_EMPLACE
497 template<AllocationMode canAlloc, typename... Args>
498 bool inner_enqueue(Args&&... args)
499#else
500 template<AllocationMode canAlloc, typename U>
501 bool inner_enqueue(U&& element)
502#endif
503 {
504#ifndef NDEBUG
505 ReentrantGuard guard(this->enqueuing);
506#endif
507
508 // High-level pseudocode (assuming we're allowed to alloc a new block):
509 // If room in tail block, add to tail
510 // Else check next block
511 // If next block is not the head block, enqueue on next block
512 // Else create a new block and enqueue there
513 // Advance tail to the block we just enqueued to
514
515 Block* tailBlock_ = tailBlock.load();
516 size_t blockFront = tailBlock_->localFront;
517 size_t blockTail = tailBlock_->tail.load();
518
519 size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
520 if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) {
521 fence(memory_order_acquire);
522 // This block has room for at least one more element
523 char* location = tailBlock_->data + blockTail * sizeof(T);
524#if MOODYCAMEL_HAS_EMPLACE
525 new (location) T(std::forward<Args>(args)...);
526#else
527 new (location) T(std::forward<U>(element));
528#endif
529
530 fence(memory_order_release);
531 tailBlock_->tail = nextBlockTail;
532 }
533 else {
534 fence(memory_order_acquire);
535 if (tailBlock_->next.load() != frontBlock) {
536 // Note that the reason we can't advance to the frontBlock and start adding new entries there
537 // is because if we did, then dequeue would stay in that block, eventually reading the new values,
538 // instead of advancing to the next full block (whose values were enqueued first and so should be
539 // consumed first).
540
541 fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock
542
543 // tailBlock is full, but there's a free block ahead, use it
544 Block* tailBlockNext = tailBlock_->next.load();
545 size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
546 nextBlockTail = tailBlockNext->tail.load();
547 fence(memory_order_acquire);
548
549 // This block must be empty since it's not the head block and we
550 // go through the blocks in a circle
551 assert(nextBlockFront == nextBlockTail);
552 tailBlockNext->localFront = nextBlockFront;
553
554 char* location = tailBlockNext->data + nextBlockTail * sizeof(T);
555#if MOODYCAMEL_HAS_EMPLACE
556 new (location) T(std::forward<Args>(args)...);
557#else
558 new (location) T(std::forward<U>(element));
559#endif
560
561 tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
562
563 fence(memory_order_release);
564 tailBlock = tailBlockNext;
565 }
566 else if (canAlloc == CanAlloc) {
567 // tailBlock is full and there's no free block ahead; create a new block
568 auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
569 auto newBlock = make_block(newBlockSize);
570 if (newBlock == nullptr) {
571 // Could not allocate a block!
572 return false;
573 }
574 largestBlockSize = newBlockSize;
575
576#if MOODYCAMEL_HAS_EMPLACE
577 new (newBlock->data) T(std::forward<Args>(args)...);
578#else
579 new (newBlock->data) T(std::forward<U>(element));
580#endif
581 assert(newBlock->front == 0);
582 newBlock->tail = newBlock->localTail = 1;
583
584 newBlock->next = tailBlock_->next.load();
585 tailBlock_->next = newBlock;
586
587 // Might be possible for the dequeue thread to see the new tailBlock->next
588 // *without* seeing the new tailBlock value, but this is OK since it can't
589 // advance to the next block until tailBlock is set anyway (because the only
590 // case where it could try to read the next is if it's already at the tailBlock,
591 // and it won't advance past tailBlock in any circumstance).
592
593 fence(memory_order_release);
594 tailBlock = newBlock;
595 }
596 else if (canAlloc == CannotAlloc) {
597 // Would have had to allocate a new block to enqueue, but not allowed
598 return false;
599 }
600 else {
601 assert(false && "Should be unreachable code");
602 return false;
603 }
604 }
605
606 return true;
607 }
608
609
610 // Disable copying
612
613 // Disable assignment
614 ReaderWriterQueue& operator=(ReaderWriterQueue const&) { }
615
616
617
618 AE_FORCEINLINE static size_t ceilToPow2(size_t x)
619 {
620 // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
621 --x;
622 x |= x >> 1;
623 x |= x >> 2;
624 x |= x >> 4;
625 for (size_t i = 1; i < sizeof(size_t); i <<= 1) {
626 x |= x >> (i << 3);
627 }
628 ++x;
629 return x;
630 }
631
632 template<typename U>
633 static AE_FORCEINLINE char* align_for(char* ptr)
634 {
635 const std::size_t alignment = std::alignment_of<U>::value;
636 return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
637 }
638private:
639#ifndef NDEBUG
640 struct ReentrantGuard
641 {
642 ReentrantGuard(bool& _inSection)
643 : inSection(_inSection)
644 {
645 assert(!inSection && "ReaderWriterQueue does not support enqueuing or dequeuing elements from other elements' ctors and dtors");
646 inSection = true;
647 }
648
649 ~ReentrantGuard() { inSection = false; }
650
651 private:
652 ReentrantGuard& operator=(ReentrantGuard const&);
653
654 private:
655 bool& inSection;
656 };
657#endif
658
659 struct Block
660 {
661 // Avoid false-sharing by putting highly contended variables on their own cache lines
662 weak_atomic<size_t> front; // (Atomic) Elements are read from here
663 size_t localTail; // An uncontended shadow copy of tail, owned by the consumer
664
665 char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)];
666 weak_atomic<size_t> tail; // (Atomic) Elements are enqueued here
667 size_t localFront;
668
669 char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)]; // next isn't very contended, but we don't want it on the same cache line as tail (which is)
670 weak_atomic<Block*> next; // (Atomic)
671
672 char* data; // Contents (on heap) are aligned to T's alignment
673
674 const size_t sizeMask;
675
676
677 // size must be a power of two (and greater than 0)
678 Block(size_t const& _size, char* _rawThis, char* _data)
679 : front(0), localTail(0), tail(0), localFront(0), next(nullptr), data(_data), sizeMask(_size - 1), rawThis(_rawThis)
680 {
681 }
682
683 private:
684 // C4512 - Assignment operator could not be generated
685 Block& operator=(Block const&);
686
687 public:
688 char* rawThis;
689 };
690
691
692 static Block* make_block(size_t capacity)
693 {
694 // Allocate enough memory for the block itself, as well as all the elements it will contain
695 auto size = sizeof(Block) + std::alignment_of<Block>::value - 1;
696 size += sizeof(T) * capacity + std::alignment_of<T>::value - 1;
697 auto newBlockRaw = static_cast<char*>(std::malloc(size));
698 if (newBlockRaw == nullptr) {
699 return nullptr;
700 }
701
702 auto newBlockAligned = align_for<Block>(newBlockRaw);
703 auto newBlockData = align_for<T>(newBlockAligned + sizeof(Block));
704 return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
705 }
706
707private:
708 weak_atomic<Block*> frontBlock; // (Atomic) Elements are enqueued to this block
709
710 char cachelineFiller[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<Block*>)];
711 weak_atomic<Block*> tailBlock; // (Atomic) Elements are dequeued from this block
712
713 size_t largestBlockSize;
714
715#ifndef NDEBUG
716 bool enqueuing;
717 bool dequeuing;
718#endif
719};
720
721// Like ReaderWriterQueue, but also providees blocking operations
722template<typename T, size_t MAX_BLOCK_SIZE = 512>
724{
725private:
726 typedef ::moodycamel::ReaderWriterQueue<T, MAX_BLOCK_SIZE> ReaderWriterQueue;
727
728public:
729 explicit BlockingReaderWriterQueue(size_t maxSize = 15)
730 : inner(maxSize)
731 { }
732
733
734 // Enqueues a copy of element if there is room in the queue.
735 // Returns true if the element was enqueued, false otherwise.
736 // Does not allocate memory.
737 AE_FORCEINLINE bool try_enqueue(T const& element)
738 {
739 if (inner.try_enqueue(element)) {
740 sema.signal();
741 return true;
742 }
743 return false;
744 }
745
746 // Enqueues a moved copy of element if there is room in the queue.
747 // Returns true if the element was enqueued, false otherwise.
748 // Does not allocate memory.
749 AE_FORCEINLINE bool try_enqueue(T&& element)
750 {
751 if (inner.try_enqueue(std::forward<T>(element))) {
752 sema.signal();
753 return true;
754 }
755 return false;
756 }
757
758
759 // Enqueues a copy of element on the queue.
760 // Allocates an additional block of memory if needed.
761 // Only fails (returns false) if memory allocation fails.
762 AE_FORCEINLINE bool enqueue(T const& element)
763 {
764 if (inner.enqueue(element)) {
765 sema.signal();
766 return true;
767 }
768 return false;
769 }
770
771 // Enqueues a moved copy of element on the queue.
772 // Allocates an additional block of memory if needed.
773 // Only fails (returns false) if memory allocation fails.
774 AE_FORCEINLINE bool enqueue(T&& element)
775 {
776 if (inner.enqueue(std::forward<T>(element))) {
777 sema.signal();
778 return true;
779 }
780 return false;
781 }
782
783
784 // Attempts to dequeue an element; if the queue is empty,
785 // returns false instead. If the queue has at least one element,
786 // moves front to result using operator=, then returns true.
787 template<typename U>
788 bool try_dequeue(U& result)
789 {
790 if (sema.tryWait()) {
791 bool success = inner.try_dequeue(result);
792 assert(success);
793 AE_UNUSED(success);
794 return true;
795 }
796 return false;
797 }
798
799
800 // Attempts to dequeue an element; if the queue is empty,
801 // waits until an element is available, then dequeues it.
802 template<typename U>
803 void wait_dequeue(U& result)
804 {
805 sema.wait();
806 bool success = inner.try_dequeue(result);
807 AE_UNUSED(result);
808 assert(success);
809 AE_UNUSED(success);
810 }
811
812
813 // Attempts to dequeue an element; if the queue is empty,
814 // waits until an element is available up to the specified timeout,
815 // then dequeues it and returns true, or returns false if the timeout
816 // expires before an element can be dequeued.
817 // Using a negative timeout indicates an indefinite timeout,
818 // and is thus functionally equivalent to calling wait_dequeue.
819 template<typename U>
820 bool wait_dequeue_timed(U& result, std::int64_t timeout_usecs)
821 {
822 if (!sema.wait(timeout_usecs)) {
823 return false;
824 }
825 bool success = inner.try_dequeue(result);
826 AE_UNUSED(result);
827 assert(success);
828 AE_UNUSED(success);
829 return true;
830 }
831
832
833#if __cplusplus > 199711L || _MSC_VER >= 1700
834 // Attempts to dequeue an element; if the queue is empty,
835 // waits until an element is available up to the specified timeout,
836 // then dequeues it and returns true, or returns false if the timeout
837 // expires before an element can be dequeued.
838 // Using a negative timeout indicates an indefinite timeout,
839 // and is thus functionally equivalent to calling wait_dequeue.
840 template<typename U, typename Rep, typename Period>
841 inline bool wait_dequeue_timed(U& result, std::chrono::duration<Rep, Period> const& timeout)
842 {
843 return wait_dequeue_timed(result, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
844 }
845#endif
846
847
848 // Returns a pointer to the front element in the queue (the one that
849 // would be removed next by a call to `try_dequeue` or `pop`). If the
850 // queue appears empty at the time the method is called, nullptr is
851 // returned instead.
852 // Must be called only from the consumer thread.
853 AE_FORCEINLINE T* peek()
854 {
855 return inner.peek();
856 }
857
858 // Removes the front element from the queue, if any, without returning it.
859 // Returns true on success, or false if the queue appeared empty at the time
860 // `pop` was called.
861 AE_FORCEINLINE bool pop()
862 {
863 if (sema.tryWait()) {
864 bool result = inner.pop();
865 assert(result);
866 AE_UNUSED(result);
867 return true;
868 }
869 return false;
870 }
871
872 // Returns the approximate number of items currently in the queue.
873 // Safe to call from both the producer and consumer threads.
874 AE_FORCEINLINE size_t size_approx() const
875 {
876 return sema.availableApprox();
877 }
878
879
880private:
881 // Disable copying & assignment
883 BlockingReaderWriterQueue& operator=(ReaderWriterQueue const&) { }
884
885private:
886 ReaderWriterQueue inner;
888};
889
890} // end namespace moodycamel
891
892#ifdef AE_VCPP
893#pragma warning(pop)
894#endif
Definition: readerwriterqueue.h:724
Definition: readerwriterqueue.h:60
Definition: atomicops.h:228