ASPiK SDK
readerwriterqueue.h
1 // ©2013-2016 Cameron Desrochers.
2 // Distributed under the simplified BSD license (see the license file that
3 // should have come with this header).
4 
5 #pragma once
6 
7 #include "atomicops.h"
8 #include <type_traits>
9 #include <utility>
10 #include <cassert>
11 #include <stdexcept>
12 #include <new>
13 #include <cstdint>
14 #include <cstdlib> // For malloc/free/abort & size_t
15 #if __cplusplus > 199711L || _MSC_VER >= 1700 // C++11 or VS2012
16 #include <chrono>
17 #endif
18 
19 
20 // A lock-free queue for a single-consumer, single-producer architecture.
21 // The queue is also wait-free in the common path (except if more memory
22 // needs to be allocated, in which case malloc is called).
23 // Allocates memory sparingly (O(lg(n) times, amortized), and only once if
24 // the original maximum size estimate is never exceeded.
25 // Tested on x86/x64 processors, but semantics should be correct for all
26 // architectures (given the right implementations in atomicops.h), provided
27 // that aligned integer and pointer accesses are naturally atomic.
28 // Note that there should only be one consumer thread and producer thread;
29 // Switching roles of the threads, or using multiple consecutive threads for
30 // one role, is not safe unless properly synchronized.
31 // Using the queue exclusively from one thread is fine, though a bit silly.
32 
33 #ifndef MOODYCAMEL_CACHE_LINE_SIZE
34 #define MOODYCAMEL_CACHE_LINE_SIZE 64
35 #endif
36 
37 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
38 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || (defined(__GNUC__) && defined(__EXCEPTIONS)) || (!defined(_MSC_VER) && !defined(__GNUC__))
39 #define MOODYCAMEL_EXCEPTIONS_ENABLED
40 #endif
41 #endif
42 
43 #ifndef MOODYCAMEL_HAS_EMPLACE
44 #if !defined(_MSC_VER) || _MSC_VER >= 1800 // variadic templates: either a non-MS compiler or VS >= 2013
45 #define MOODYCAMEL_HAS_EMPLACE 1
46 #endif
47 #endif
48 
49 #ifdef AE_VCPP
50 #pragma warning(push)
51 #pragma warning(disable: 4324) // structure was padded due to __declspec(align())
52 #pragma warning(disable: 4820) // padding was added
53 #pragma warning(disable: 4127) // conditional expression is constant
54 #endif
55 
56 namespace moodycamel {
57 
58 template<typename T, size_t MAX_BLOCK_SIZE = 512>
60 {
61  // Design: Based on a queue-of-queues. The low-level queues are just
62  // circular buffers with front and tail indices indicating where the
63  // next element to dequeue is and where the next element can be enqueued,
64  // respectively. Each low-level queue is called a "block". Each block
65  // wastes exactly one element's worth of space to keep the design simple
66  // (if front == tail then the queue is empty, and can't be full).
67  // The high-level queue is a circular linked list of blocks; again there
68  // is a front and tail, but this time they are pointers to the blocks.
69  // The front block is where the next element to be dequeued is, provided
70  // the block is not empty. The back block is where elements are to be
71  // enqueued, provided the block is not full.
72  // The producer thread owns all the tail indices/pointers. The consumer
73  // thread owns all the front indices/pointers. Both threads read each
74  // other's variables, but only the owning thread updates them. E.g. After
75  // the consumer reads the producer's tail, the tail may change before the
76  // consumer is done dequeuing an object, but the consumer knows the tail
77  // will never go backwards, only forwards.
78  // If there is no room to enqueue an object, an additional block (of
79  // equal size to the last block) is added. Blocks are never removed.
80 
81 public:
82  typedef T value_type;
83 
84  // Constructs a queue that can hold maxSize elements without further
85  // allocations. If more than MAX_BLOCK_SIZE elements are requested,
86  // then several blocks of MAX_BLOCK_SIZE each are reserved (including
87  // at least one extra buffer block).
88  explicit ReaderWriterQueue(size_t maxSize = 15)
89 #ifndef NDEBUG
90  : enqueuing(false)
91  ,dequeuing(false)
92 #endif
93  {
94  assert(maxSize > 0);
95  assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) && "MAX_BLOCK_SIZE must be a power of 2");
96  assert(MAX_BLOCK_SIZE >= 2 && "MAX_BLOCK_SIZE must be at least 2");
97 
98  Block* firstBlock = nullptr;
99 
100  largestBlockSize = ceilToPow2(maxSize + 1); // We need a spare slot to fit maxSize elements in the block
101  if (largestBlockSize > MAX_BLOCK_SIZE * 2) {
102  // We need a spare block in case the producer is writing to a different block the consumer is reading from, and
103  // wants to enqueue the maximum number of elements. We also need a spare element in each block to avoid the ambiguity
104  // between front == tail meaning "empty" and "full".
105  // So the effective number of slots that are guaranteed to be usable at any time is the block size - 1 times the
106  // number of blocks - 1. Solving for maxSize and applying a ceiling to the division gives us (after simplifying):
107  size_t initialBlockCount = (maxSize + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
108  largestBlockSize = MAX_BLOCK_SIZE;
109  Block* lastBlock = nullptr;
110  for (size_t i = 0; i != initialBlockCount; ++i) {
111  auto block = make_block(largestBlockSize);
112  if (block == nullptr) {
113 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
114  throw std::bad_alloc();
115 #else
116  abort();
117 #endif
118  }
119  if (firstBlock == nullptr) {
120  firstBlock = block;
121  }
122  else {
123  lastBlock->next = block;
124  }
125  lastBlock = block;
126  block->next = firstBlock;
127  }
128  }
129  else {
130  firstBlock = make_block(largestBlockSize);
131  if (firstBlock == nullptr) {
132 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
133  throw std::bad_alloc();
134 #else
135  abort();
136 #endif
137  }
138  firstBlock->next = firstBlock;
139  }
140  frontBlock = firstBlock;
141  tailBlock = firstBlock;
142 
143  // Make sure the reader/writer threads will have the initialized memory setup above:
144  fence(memory_order_sync);
145  }
146 
147  // Note: The queue should not be accessed concurrently while it's
148  // being moved. It's up to the user to synchronize this.
150  : frontBlock(other.frontBlock.load()),
151  tailBlock(other.tailBlock.load()),
152  largestBlockSize(other.largestBlockSize)
153 #ifndef NDEBUG
154  ,enqueuing(false)
155  ,dequeuing(false)
156 #endif
157  {
158  other.largestBlockSize = 32;
159  Block* b = other.make_block(other.largestBlockSize);
160  if (b == nullptr) {
161 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
162  throw std::bad_alloc();
163 #else
164  abort();
165 #endif
166  }
167  b->next = b;
168  other.frontBlock = b;
169  other.tailBlock = b;
170  }
171 
172  // Note: The queue should not be accessed concurrently while it's
173  // being moved. It's up to the user to synchronize this.
174  ReaderWriterQueue& operator=(ReaderWriterQueue&& other)
175  {
176  Block* b = frontBlock.load();
177  frontBlock = other.frontBlock.load();
178  other.frontBlock = b;
179  b = tailBlock.load();
180  tailBlock = other.tailBlock.load();
181  other.tailBlock = b;
182  std::swap(largestBlockSize, other.largestBlockSize);
183  return *this;
184  }
185 
186  // Note: The queue should not be accessed concurrently while it's
187  // being deleted. It's up to the user to synchronize this.
189  {
190  // Make sure we get the latest version of all variables from other CPUs:
191  fence(memory_order_sync);
192 
193  // Destroy any remaining objects in queue and free memory
194  Block* frontBlock_ = frontBlock;
195  Block* block = frontBlock_;
196  do {
197  Block* nextBlock = block->next;
198  size_t blockFront = block->front;
199  size_t blockTail = block->tail;
200 
201  for (size_t i = blockFront; i != blockTail; i = (i + 1) & block->sizeMask) {
202  auto element = reinterpret_cast<T*>(block->data + i * sizeof(T));
203  element->~T();
204  (void)element;
205  }
206 
207  auto rawBlock = block->rawThis;
208  block->~Block();
209  std::free(rawBlock);
210  block = nextBlock;
211  } while (block != frontBlock_);
212  }
213 
214 
215  // Enqueues a copy of element if there is room in the queue.
216  // Returns true if the element was enqueued, false otherwise.
217  // Does not allocate memory.
218  AE_FORCEINLINE bool try_enqueue(T const& element)
219  {
220  return inner_enqueue<CannotAlloc>(element);
221  }
222 
223  // Enqueues a moved copy of element if there is room in the queue.
224  // Returns true if the element was enqueued, false otherwise.
225  // Does not allocate memory.
226  AE_FORCEINLINE bool try_enqueue(T&& element)
227  {
228  return inner_enqueue<CannotAlloc>(std::forward<T>(element));
229  }
230 
231 #if MOODYCAMEL_HAS_EMPLACE
232  // Like try_enqueue() but with emplace semantics (i.e. construct-in-place).
233  template<typename... Args>
234  AE_FORCEINLINE bool try_emplace(Args&&... args)
235  {
236  return inner_enqueue<CannotAlloc>(std::forward<Args>(args)...);
237  }
238 #endif
239 
240  // Enqueues a copy of element on the queue.
241  // Allocates an additional block of memory if needed.
242  // Only fails (returns false) if memory allocation fails.
243  AE_FORCEINLINE bool enqueue(T const& element)
244  {
245  return inner_enqueue<CanAlloc>(element);
246  }
247 
248  // Enqueues a moved copy of element on the queue.
249  // Allocates an additional block of memory if needed.
250  // Only fails (returns false) if memory allocation fails.
251  AE_FORCEINLINE bool enqueue(T&& element)
252  {
253  return inner_enqueue<CanAlloc>(std::forward<T>(element));
254  }
255 
256 #if MOODYCAMEL_HAS_EMPLACE
257  // Like enqueue() but with emplace semantics (i.e. construct-in-place).
258  template<typename... Args>
259  AE_FORCEINLINE bool emplace(Args&&... args)
260  {
261  return inner_enqueue<CanAlloc>(std::forward<Args>(args)...);
262  }
263 #endif
264 
265  // Attempts to dequeue an element; if the queue is empty,
266  // returns false instead. If the queue has at least one element,
267  // moves front to result using operator=, then returns true.
268  template<typename U>
269  bool try_dequeue(U& result)
270  {
271 #ifndef NDEBUG
272  ReentrantGuard guard(this->dequeuing);
273 #endif
274 
275  // High-level pseudocode:
276  // Remember where the tail block is
277  // If the front block has an element in it, dequeue it
278  // Else
279  // If front block was the tail block when we entered the function, return false
280  // Else advance to next block and dequeue the item there
281 
282  // Note that we have to use the value of the tail block from before we check if the front
283  // block is full or not, in case the front block is empty and then, before we check if the
284  // tail block is at the front block or not, the producer fills up the front block *and
285  // moves on*, which would make us skip a filled block. Seems unlikely, but was consistently
286  // reproducible in practice.
287  // In order to avoid overhead in the common case, though, we do a double-checked pattern
288  // where we have the fast path if the front block is not empty, then read the tail block,
289  // then re-read the front block and check if it's not empty again, then check if the tail
290  // block has advanced.
291 
292  Block* frontBlock_ = frontBlock.load();
293  size_t blockTail = frontBlock_->localTail;
294  size_t blockFront = frontBlock_->front.load();
295 
296  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
297  fence(memory_order_acquire);
298 
299  non_empty_front_block:
300  // Front block not empty, dequeue from here
301  auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
302  result = std::move(*element);
303  element->~T();
304 
305  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
306 
307  fence(memory_order_release);
308  frontBlock_->front = blockFront;
309  }
310  else if (frontBlock_ != tailBlock.load()) {
311  fence(memory_order_acquire);
312 
313  frontBlock_ = frontBlock.load();
314  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
315  blockFront = frontBlock_->front.load();
316  fence(memory_order_acquire);
317 
318  if (blockFront != blockTail) {
319  // Oh look, the front block isn't empty after all
320  goto non_empty_front_block;
321  }
322 
323  // Front block is empty but there's another block ahead, advance to it
324  Block* nextBlock = frontBlock_->next;
325  // Don't need an acquire fence here since next can only ever be set on the tailBlock,
326  // and we're not the tailBlock, and we did an acquire earlier after reading tailBlock which
327  // ensures next is up-to-date on this CPU in case we recently were at tailBlock.
328 
329  size_t nextBlockFront = nextBlock->front.load();
330  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
331  fence(memory_order_acquire);
332 
333  // Since the tailBlock is only ever advanced after being written to,
334  // we know there's for sure an element to dequeue on it
335  assert(nextBlockFront != nextBlockTail);
336  AE_UNUSED(nextBlockTail);
337 
338  // We're done with this block, let the producer use it if it needs
339  fence(memory_order_release); // Expose possibly pending changes to frontBlock->front from last dequeue
340  frontBlock = frontBlock_ = nextBlock;
341 
342  compiler_fence(memory_order_release); // Not strictly needed
343 
344  auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
345 
346  result = std::move(*element);
347  element->~T();
348 
349  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
350 
351  fence(memory_order_release);
352  frontBlock_->front = nextBlockFront;
353  }
354  else {
355  // No elements in current block and no other block to advance to
356  return false;
357  }
358 
359  return true;
360  }
361 
362 
363  // Returns a pointer to the front element in the queue (the one that
364  // would be removed next by a call to `try_dequeue` or `pop`). If the
365  // queue appears empty at the time the method is called, nullptr is
366  // returned instead.
367  // Must be called only from the consumer thread.
368  T* peek()
369  {
370 #ifndef NDEBUG
371  ReentrantGuard guard(this->dequeuing);
372 #endif
373  // See try_dequeue() for reasoning
374 
375  Block* frontBlock_ = frontBlock.load();
376  size_t blockTail = frontBlock_->localTail;
377  size_t blockFront = frontBlock_->front.load();
378 
379  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
380  fence(memory_order_acquire);
381  non_empty_front_block:
382  return reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
383  }
384  else if (frontBlock_ != tailBlock.load()) {
385  fence(memory_order_acquire);
386  frontBlock_ = frontBlock.load();
387  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
388  blockFront = frontBlock_->front.load();
389  fence(memory_order_acquire);
390 
391  if (blockFront != blockTail) {
392  goto non_empty_front_block;
393  }
394 
395  Block* nextBlock = frontBlock_->next;
396 
397  size_t nextBlockFront = nextBlock->front.load();
398  fence(memory_order_acquire);
399 
400  assert(nextBlockFront != nextBlock->tail.load());
401  return reinterpret_cast<T*>(nextBlock->data + nextBlockFront * sizeof(T));
402  }
403 
404  return nullptr;
405  }
406 
407  // Removes the front element from the queue, if any, without returning it.
408  // Returns true on success, or false if the queue appeared empty at the time
409  // `pop` was called.
410  bool pop()
411  {
412 #ifndef NDEBUG
413  ReentrantGuard guard(this->dequeuing);
414 #endif
415  // See try_dequeue() for reasoning
416 
417  Block* frontBlock_ = frontBlock.load();
418  size_t blockTail = frontBlock_->localTail;
419  size_t blockFront = frontBlock_->front.load();
420 
421  if (blockFront != blockTail || blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
422  fence(memory_order_acquire);
423 
424  non_empty_front_block:
425  auto element = reinterpret_cast<T*>(frontBlock_->data + blockFront * sizeof(T));
426  element->~T();
427 
428  blockFront = (blockFront + 1) & frontBlock_->sizeMask;
429 
430  fence(memory_order_release);
431  frontBlock_->front = blockFront;
432  }
433  else if (frontBlock_ != tailBlock.load()) {
434  fence(memory_order_acquire);
435  frontBlock_ = frontBlock.load();
436  blockTail = frontBlock_->localTail = frontBlock_->tail.load();
437  blockFront = frontBlock_->front.load();
438  fence(memory_order_acquire);
439 
440  if (blockFront != blockTail) {
441  goto non_empty_front_block;
442  }
443 
444  // Front block is empty but there's another block ahead, advance to it
445  Block* nextBlock = frontBlock_->next;
446 
447  size_t nextBlockFront = nextBlock->front.load();
448  size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
449  fence(memory_order_acquire);
450 
451  assert(nextBlockFront != nextBlockTail);
452  AE_UNUSED(nextBlockTail);
453 
454  fence(memory_order_release);
455  frontBlock = frontBlock_ = nextBlock;
456 
457  compiler_fence(memory_order_release);
458 
459  auto element = reinterpret_cast<T*>(frontBlock_->data + nextBlockFront * sizeof(T));
460  element->~T();
461 
462  nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
463 
464  fence(memory_order_release);
465  frontBlock_->front = nextBlockFront;
466  }
467  else {
468  // No elements in current block and no other block to advance to
469  return false;
470  }
471 
472  return true;
473  }
474 
475  // Returns the approximate number of items currently in the queue.
476  // Safe to call from both the producer and consumer threads.
477  inline size_t size_approx() const
478  {
479  size_t result = 0;
480  Block* frontBlock_ = frontBlock.load();
481  Block* block = frontBlock_;
482  do {
483  fence(memory_order_acquire);
484  size_t blockFront = block->front.load();
485  size_t blockTail = block->tail.load();
486  result += (blockTail - blockFront) & block->sizeMask;
487  block = block->next.load();
488  } while (block != frontBlock_);
489  return result;
490  }
491 
492 
493 private:
494  enum AllocationMode { CanAlloc, CannotAlloc };
495 
496 #if MOODYCAMEL_HAS_EMPLACE
497  template<AllocationMode canAlloc, typename... Args>
498  bool inner_enqueue(Args&&... args)
499 #else
500  template<AllocationMode canAlloc, typename U>
501  bool inner_enqueue(U&& element)
502 #endif
503  {
504 #ifndef NDEBUG
505  ReentrantGuard guard(this->enqueuing);
506 #endif
507 
508  // High-level pseudocode (assuming we're allowed to alloc a new block):
509  // If room in tail block, add to tail
510  // Else check next block
511  // If next block is not the head block, enqueue on next block
512  // Else create a new block and enqueue there
513  // Advance tail to the block we just enqueued to
514 
515  Block* tailBlock_ = tailBlock.load();
516  size_t blockFront = tailBlock_->localFront;
517  size_t blockTail = tailBlock_->tail.load();
518 
519  size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
520  if (nextBlockTail != blockFront || nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) {
521  fence(memory_order_acquire);
522  // This block has room for at least one more element
523  char* location = tailBlock_->data + blockTail * sizeof(T);
524 #if MOODYCAMEL_HAS_EMPLACE
525  new (location) T(std::forward<Args>(args)...);
526 #else
527  new (location) T(std::forward<U>(element));
528 #endif
529 
530  fence(memory_order_release);
531  tailBlock_->tail = nextBlockTail;
532  }
533  else {
534  fence(memory_order_acquire);
535  if (tailBlock_->next.load() != frontBlock) {
536  // Note that the reason we can't advance to the frontBlock and start adding new entries there
537  // is because if we did, then dequeue would stay in that block, eventually reading the new values,
538  // instead of advancing to the next full block (whose values were enqueued first and so should be
539  // consumed first).
540 
541  fence(memory_order_acquire); // Ensure we get latest writes if we got the latest frontBlock
542 
543  // tailBlock is full, but there's a free block ahead, use it
544  Block* tailBlockNext = tailBlock_->next.load();
545  size_t nextBlockFront = tailBlockNext->localFront = tailBlockNext->front.load();
546  nextBlockTail = tailBlockNext->tail.load();
547  fence(memory_order_acquire);
548 
549  // This block must be empty since it's not the head block and we
550  // go through the blocks in a circle
551  assert(nextBlockFront == nextBlockTail);
552  tailBlockNext->localFront = nextBlockFront;
553 
554  char* location = tailBlockNext->data + nextBlockTail * sizeof(T);
555 #if MOODYCAMEL_HAS_EMPLACE
556  new (location) T(std::forward<Args>(args)...);
557 #else
558  new (location) T(std::forward<U>(element));
559 #endif
560 
561  tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
562 
563  fence(memory_order_release);
564  tailBlock = tailBlockNext;
565  }
566  else if (canAlloc == CanAlloc) {
567  // tailBlock is full and there's no free block ahead; create a new block
568  auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE ? largestBlockSize : largestBlockSize * 2;
569  auto newBlock = make_block(newBlockSize);
570  if (newBlock == nullptr) {
571  // Could not allocate a block!
572  return false;
573  }
574  largestBlockSize = newBlockSize;
575 
576 #if MOODYCAMEL_HAS_EMPLACE
577  new (newBlock->data) T(std::forward<Args>(args)...);
578 #else
579  new (newBlock->data) T(std::forward<U>(element));
580 #endif
581  assert(newBlock->front == 0);
582  newBlock->tail = newBlock->localTail = 1;
583 
584  newBlock->next = tailBlock_->next.load();
585  tailBlock_->next = newBlock;
586 
587  // Might be possible for the dequeue thread to see the new tailBlock->next
588  // *without* seeing the new tailBlock value, but this is OK since it can't
589  // advance to the next block until tailBlock is set anyway (because the only
590  // case where it could try to read the next is if it's already at the tailBlock,
591  // and it won't advance past tailBlock in any circumstance).
592 
593  fence(memory_order_release);
594  tailBlock = newBlock;
595  }
596  else if (canAlloc == CannotAlloc) {
597  // Would have had to allocate a new block to enqueue, but not allowed
598  return false;
599  }
600  else {
601  assert(false && "Should be unreachable code");
602  return false;
603  }
604  }
605 
606  return true;
607  }
608 
609 
610  // Disable copying
612 
613  // Disable assignment
614  ReaderWriterQueue& operator=(ReaderWriterQueue const&) { }
615 
616 
617 
618  AE_FORCEINLINE static size_t ceilToPow2(size_t x)
619  {
620  // From http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
621  --x;
622  x |= x >> 1;
623  x |= x >> 2;
624  x |= x >> 4;
625  for (size_t i = 1; i < sizeof(size_t); i <<= 1) {
626  x |= x >> (i << 3);
627  }
628  ++x;
629  return x;
630  }
631 
632  template<typename U>
633  static AE_FORCEINLINE char* align_for(char* ptr)
634  {
635  const std::size_t alignment = std::alignment_of<U>::value;
636  return ptr + (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) % alignment;
637  }
638 private:
639 #ifndef NDEBUG
640  struct ReentrantGuard
641  {
642  ReentrantGuard(bool& _inSection)
643  : inSection(_inSection)
644  {
645  assert(!inSection && "ReaderWriterQueue does not support enqueuing or dequeuing elements from other elements' ctors and dtors");
646  inSection = true;
647  }
648 
649  ~ReentrantGuard() { inSection = false; }
650 
651  private:
652  ReentrantGuard& operator=(ReentrantGuard const&);
653 
654  private:
655  bool& inSection;
656  };
657 #endif
658 
659  struct Block
660  {
661  // Avoid false-sharing by putting highly contended variables on their own cache lines
662  weak_atomic<size_t> front; // (Atomic) Elements are read from here
663  size_t localTail; // An uncontended shadow copy of tail, owned by the consumer
664 
665  char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)];
666  weak_atomic<size_t> tail; // (Atomic) Elements are enqueued here
667  size_t localFront;
668 
669  char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<size_t>) - sizeof(size_t)]; // next isn't very contended, but we don't want it on the same cache line as tail (which is)
670  weak_atomic<Block*> next; // (Atomic)
671 
672  char* data; // Contents (on heap) are aligned to T's alignment
673 
674  const size_t sizeMask;
675 
676 
677  // size must be a power of two (and greater than 0)
678  Block(size_t const& _size, char* _rawThis, char* _data)
679  : front(0), localTail(0), tail(0), localFront(0), next(nullptr), data(_data), sizeMask(_size - 1), rawThis(_rawThis)
680  {
681  }
682 
683  private:
684  // C4512 - Assignment operator could not be generated
685  Block& operator=(Block const&);
686 
687  public:
688  char* rawThis;
689  };
690 
691 
692  static Block* make_block(size_t capacity)
693  {
694  // Allocate enough memory for the block itself, as well as all the elements it will contain
695  auto size = sizeof(Block) + std::alignment_of<Block>::value - 1;
696  size += sizeof(T) * capacity + std::alignment_of<T>::value - 1;
697  auto newBlockRaw = static_cast<char*>(std::malloc(size));
698  if (newBlockRaw == nullptr) {
699  return nullptr;
700  }
701 
702  auto newBlockAligned = align_for<Block>(newBlockRaw);
703  auto newBlockData = align_for<T>(newBlockAligned + sizeof(Block));
704  return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
705  }
706 
707 private:
708  weak_atomic<Block*> frontBlock; // (Atomic) Elements are enqueued to this block
709 
710  char cachelineFiller[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(weak_atomic<Block*>)];
711  weak_atomic<Block*> tailBlock; // (Atomic) Elements are dequeued from this block
712 
713  size_t largestBlockSize;
714 
715 #ifndef NDEBUG
716  bool enqueuing;
717  bool dequeuing;
718 #endif
719 };
720 
721 // Like ReaderWriterQueue, but also providees blocking operations
722 template<typename T, size_t MAX_BLOCK_SIZE = 512>
724 {
725 private:
726  typedef ::moodycamel::ReaderWriterQueue<T, MAX_BLOCK_SIZE> ReaderWriterQueue;
727 
728 public:
729  explicit BlockingReaderWriterQueue(size_t maxSize = 15)
730  : inner(maxSize)
731  { }
732 
733 
734  // Enqueues a copy of element if there is room in the queue.
735  // Returns true if the element was enqueued, false otherwise.
736  // Does not allocate memory.
737  AE_FORCEINLINE bool try_enqueue(T const& element)
738  {
739  if (inner.try_enqueue(element)) {
740  sema.signal();
741  return true;
742  }
743  return false;
744  }
745 
746  // Enqueues a moved copy of element if there is room in the queue.
747  // Returns true if the element was enqueued, false otherwise.
748  // Does not allocate memory.
749  AE_FORCEINLINE bool try_enqueue(T&& element)
750  {
751  if (inner.try_enqueue(std::forward<T>(element))) {
752  sema.signal();
753  return true;
754  }
755  return false;
756  }
757 
758 
759  // Enqueues a copy of element on the queue.
760  // Allocates an additional block of memory if needed.
761  // Only fails (returns false) if memory allocation fails.
762  AE_FORCEINLINE bool enqueue(T const& element)
763  {
764  if (inner.enqueue(element)) {
765  sema.signal();
766  return true;
767  }
768  return false;
769  }
770 
771  // Enqueues a moved copy of element on the queue.
772  // Allocates an additional block of memory if needed.
773  // Only fails (returns false) if memory allocation fails.
774  AE_FORCEINLINE bool enqueue(T&& element)
775  {
776  if (inner.enqueue(std::forward<T>(element))) {
777  sema.signal();
778  return true;
779  }
780  return false;
781  }
782 
783 
784  // Attempts to dequeue an element; if the queue is empty,
785  // returns false instead. If the queue has at least one element,
786  // moves front to result using operator=, then returns true.
787  template<typename U>
788  bool try_dequeue(U& result)
789  {
790  if (sema.tryWait()) {
791  bool success = inner.try_dequeue(result);
792  assert(success);
793  AE_UNUSED(success);
794  return true;
795  }
796  return false;
797  }
798 
799 
800  // Attempts to dequeue an element; if the queue is empty,
801  // waits until an element is available, then dequeues it.
802  template<typename U>
803  void wait_dequeue(U& result)
804  {
805  sema.wait();
806  bool success = inner.try_dequeue(result);
807  AE_UNUSED(result);
808  assert(success);
809  AE_UNUSED(success);
810  }
811 
812 
813  // Attempts to dequeue an element; if the queue is empty,
814  // waits until an element is available up to the specified timeout,
815  // then dequeues it and returns true, or returns false if the timeout
816  // expires before an element can be dequeued.
817  // Using a negative timeout indicates an indefinite timeout,
818  // and is thus functionally equivalent to calling wait_dequeue.
819  template<typename U>
820  bool wait_dequeue_timed(U& result, std::int64_t timeout_usecs)
821  {
822  if (!sema.wait(timeout_usecs)) {
823  return false;
824  }
825  bool success = inner.try_dequeue(result);
826  AE_UNUSED(result);
827  assert(success);
828  AE_UNUSED(success);
829  return true;
830  }
831 
832 
833 #if __cplusplus > 199711L || _MSC_VER >= 1700
834  // Attempts to dequeue an element; if the queue is empty,
835  // waits until an element is available up to the specified timeout,
836  // then dequeues it and returns true, or returns false if the timeout
837  // expires before an element can be dequeued.
838  // Using a negative timeout indicates an indefinite timeout,
839  // and is thus functionally equivalent to calling wait_dequeue.
840  template<typename U, typename Rep, typename Period>
841  inline bool wait_dequeue_timed(U& result, std::chrono::duration<Rep, Period> const& timeout)
842  {
843  return wait_dequeue_timed(result, std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
844  }
845 #endif
846 
847 
848  // Returns a pointer to the front element in the queue (the one that
849  // would be removed next by a call to `try_dequeue` or `pop`). If the
850  // queue appears empty at the time the method is called, nullptr is
851  // returned instead.
852  // Must be called only from the consumer thread.
853  AE_FORCEINLINE T* peek()
854  {
855  return inner.peek();
856  }
857 
858  // Removes the front element from the queue, if any, without returning it.
859  // Returns true on success, or false if the queue appeared empty at the time
860  // `pop` was called.
861  AE_FORCEINLINE bool pop()
862  {
863  if (sema.tryWait()) {
864  bool result = inner.pop();
865  assert(result);
866  AE_UNUSED(result);
867  return true;
868  }
869  return false;
870  }
871 
872  // Returns the approximate number of items currently in the queue.
873  // Safe to call from both the producer and consumer threads.
874  AE_FORCEINLINE size_t size_approx() const
875  {
876  return sema.availableApprox();
877  }
878 
879 
880 private:
881  // Disable copying & assignment
883  BlockingReaderWriterQueue& operator=(ReaderWriterQueue const&) { }
884 
885 private:
886  ReaderWriterQueue inner;
888 };
889 
890 } // end namespace moodycamel
891 
892 #ifdef AE_VCPP
893 #pragma warning(pop)
894 #endif
Definition: atomicops.h:70
Definition: readerwriterqueue.h:59
Definition: readerwriterqueue.h:723