From fab634154f8625d4c88deece625386595d936672 Mon Sep 17 00:00:00 2001 From: Josh Baldwin Date: Sat, 27 Feb 2021 12:33:42 -0700 Subject: [PATCH] Update README with section links (#70) * Update README with section links * add # to links * try event instead of coro::event * Update section names to remove "::" since markdown doesn't seem to link properly with them --- .githooks/readme-template.md | 51 +++++++++++++++++---------------- README.md | 53 ++++++++++++++++++----------------- examples/coro_ring_buffer.cpp | 2 +- inc/coro/ring_buffer.hpp | 4 +-- test/test_ring_buffer.cpp | 2 +- 5 files changed, 59 insertions(+), 53 deletions(-) diff --git a/.githooks/readme-template.md b/.githooks/readme-template.md index ad9f887..fd0e348 100644 --- a/.githooks/readme-template.md +++ b/.githooks/readme-template.md @@ -14,24 +14,27 @@ * C++20 coroutines! * Modern Safe C++20 API * Higher level coroutine constructs - - coro::task - - coro::generator - - coro::event - - coro::latch - - coro::mutex + - [coro::task](#task) + - [coro::generator](#generator) + - [coro::event](#event) + - [coro::latch](#latch) + - [coro::mutex](#mutex) + - [coro::shared_mutex](#shared_mutex) + - [coro::semaphore](#semaphore) + - [coro::ring_buffer](#ring_buffer) - coro::sync_wait(awaitable) - coro::when_all(awaitable...) -> awaitable * Schedulers - - coro::thread_pool for coroutine cooperative multitasking - - coro::io_scheduler for driving i/o events, uses thread_pool for coroutine execution upon triggered events - - epoll driver - - io_uring driver (Future, will be required for async file i/o) + - [coro::thread_pool](#thread_pool) for coroutine cooperative multitasking + - [coro::io_scheduler](#io_scheduler) for driving i/o events, uses thread_pool for coroutine execution upon triggered events + - Currently uses an epoll driver + - [coro::task_container](#task_container) for dynamic task lifetimes * Coroutine Networking - coro::net::dns_resolver for async dns - Uses libc-ares - - coro::net::tcp_client + - [coro::net::tcp_client](#io_scheduler) - Supports SSL/TLS via OpenSSL - - coro::net::tcp_server + - [coro::net::tcp_server](#io_scheduler) - Supports SSL/TLS via OpenSSL - coro::net::udp_peer @@ -40,7 +43,7 @@ ### A note on co_await Its important to note with coroutines that depending on the construct used _any_ `co_await` has the potential to switch the thread that is executing the currently running coroutine. In general this shouldn't affect the way any user of the library would write code except for `thread_local`. Usage of `thread_local` should be extremely careful and _never_ used across any `co_await` boundary do to thread switching and work stealing on thread pools. -### coro::task +### task The `coro::task` is the main coroutine building block within `libcoro`. Use task to create your coroutines and `co_await` or `co_yield` tasks within tasks to perform asynchronous operations, lazily evaluation or even spreading work out across a `coro::thread_pool`. Tasks are lightweight and only begin execution upon awaiting them. If their return type is not `void` then the value can be returned by const reference or by moving (r-value reference). @@ -59,7 +62,7 @@ expensive_struct() move constructor called Answer to everything = 42 ``` -### coro::generator +### generator The `coro::generator` construct is a coroutine which can generate one or more values. ```C++ @@ -72,7 +75,7 @@ $ ./examples/coro_generator 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, ``` -### coro::event +### event The `coro::event` is a thread safe async tool to have 1 or more waiters suspend for an event to be set before proceeding. The implementation of event currently will resume execution of all waiters on the thread that sets the event. If the event is already set when a waiter goes to wait on the thread they will simply continue executing with no suspend or wait time incurred. ```C++ @@ -91,7 +94,7 @@ task 2 event triggered, now resuming. task 1 event triggered, now resuming. ``` -### coro::latch +### latch The `coro::latch` is a thread safe async tool to have 1 waiter suspend until all outstanding events have completed before proceeding. ```C++ @@ -115,7 +118,7 @@ worker task 5 is done, counting down on the latch latch task dependency tasks completed, resuming. ``` -### coro::mutex +### mutex The `coro::mutex` is a thread safe async tool to protect critical sections and only allow a single thread to execute the critical section at any given time. Mutexes that are uncontended are a simple CAS operation with a memory fence 'acquire' to behave similar to `std::mutex`. If the lock is contended then the thread will add itself to a FIFO queue of waiters and yield excution to allow another coroutine to process on that thread while it waits to acquire the lock. Its important to note that upon releasing the mutex that thread will immediately start processing the next waiter in line for the `coro::mutex`, the mutex is only unlocked/released once all waiters have been processed. This guarantees fair execution in a FIFO manner, but it also means all coroutines that stack in the waiter queue will end up shifting to the single thread that is executing all waiting coroutines. It is possible to reschedule after the critical section onto a thread pool to re-distribute the work. Perhaps an auto-reschedule on a given thread pool is a good feature to implement in the future to prevent this behavior so the post critical section work in the coroutines is redistributed amongst all available thread pool threads. @@ -131,7 +134,7 @@ $ ./examples/coro_mutex 1, 2, 3, 4, 5, 6, 7, 8, 10, 9, 12, 11, 13, 14, 15, 16, 17, 18, 19, 21, 22, 20, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 47, 48, 49, 46, 50, 51, 52, 53, 54, 55, 57, 58, 59, 56, 60, 62, 61, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, ``` -### coro::shared_mutex +### shared_mutex The `coro::shared_mutex` is a thread safe async tool to allow for multiple shared users at once but also exclusive access. The lock is acquired strictly in a FIFO manner in that if the lock is currenty held by shared users and an exclusive attempts to lock, the exclusive waiter will suspend until all the _current_ shared users finish using the lock. Any new users that attempt to lock the mutex in a shared state once there is an exclusive waiter will also wait behind the exclusive waiter. This prevents the exclusive waiter from being starved. @@ -166,7 +169,7 @@ shared task 6 unlock_shared() ``` -### coro::semaphore +### semaphore The `coro::semaphore` is a thread safe async tool to protect a limited number of resources by only allowing so many consumers to acquire the resources a single time. The `coro::semaphore` also has a maximum number of resources denoted by its constructor. This means if a resource is produced or released when the semaphore is at its maximum resource availability then the release operation will await for space to become available. This is useful for a ringbuffer type situation where the resources are produced and then consumed, but will have no effect on a semaphores usage if there is a set known quantity of resources to start with and are acquired and then released back. ```C++ @@ -179,10 +182,10 @@ $ ./examples/coro_semaphore 1, 23, 25, 24, 22, 27, 28, 29, 21, 20, 19, 18, 17, 14, 31, 30, 33, 32, 41, 40, 37, 39, 38, 36, 35, 34, 43, 46, 47, 48, 45, 42, 44, 26, 16, 15, 13, 52, 54, 55, 53, 49, 51, 57, 58, 50, 62, 63, 61, 60, 59, 56, 12, 11, 8, 10, 9, 7, 6, 5, 4, 3, 642, , 66, 67, 6568, , 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, ``` -### coro::ring_buffer -The `coro::ring_buffer` is thread safe async multi-producer multi-consumer statically sized ring buffer. Producers will that try to produce a value when the ring buffer is full will suspend until space is available. Consumers that try to consume a value when the ring buffer is empty will suspend until space is available. All waiters on the ring buffer for producing or consuming are resumed in a LIFO manner when their respective operation becomes available. +### ring_buffer +The `coro::ring_buffer` is thread safe async multi-producer multi-consumer statically sized ring buffer. Producers will that try to produce a value when the ring buffer is full will suspend until space is available. Consumers that try to consume a value when the ring buffer is empty will suspend until space is available. All waiters on the ring buffer for producing or consuming are resumed in a LIFO manner when their respective operation becomes available. -The `coro::ring_buffer` also works with `coro::stop_signal` in that if the ring buffers `stop_ +The `coro::ring_buffer` also works with `coro::stop_signal` in that if the ring buffers `stop_signal_notify_waiters()` function is called then any producers or consumers that are suspended and waiting will be awoken by throwing a `coro::stop_signal`. This can be useful to write code that will always suspend if data cannot be produced or consumed for long running daemons but will need to break out of the suspend unpon shutdown. ```C++ ${EXAMPLE_CORO_RING_BUFFER_CPP} @@ -199,7 +202,7 @@ consumer 2 shutting down, stop signal received consumer 3 shutting down, stop signal received ``` -### coro::thread_pool +### thread_pool `coro::thread_pool` is a statically sized pool of worker threads to execute scheduled coroutines from a FIFO queue. To schedule a coroutine on a thread pool the pool's `schedule()` function should be `co_awaited` to transfer the execution from the current thread to a thread pool worker thread. Its important to note that scheduling will first place the coroutine into the FIFO queue and will be picked up by the first available thread in the pool, e.g. there could be a delay if there is a lot of work queued up. ```C++ @@ -230,7 +233,7 @@ thread pool worker 3 is shutting down. thread pool worker 0 is shutting down. ``` -### coro::io_scheduler +### io_scheduler `coro::io_scheduler` is a i/o event scheduler that uses a statically sized pool (`coro::thread_pool`) to process the events that are ready. The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually driven via its `process_events()` function for integration into existing event loops. If using the dedicated thread to process i/o events the dedicated thread does not execute and of the tasks itself, it simply schedules them to be executed on the next availble worker thread in its embedded `coro::thread_pool`. Inline execution of tasks on the i/o dedicated thread is not supported since it can introduce poor latency when an expensive task is executing. The example provided here shows an i/o scheduler that spins up a basic `coro::net::tcp_server` and a `coro::net::tcp_client` that will connect to each other and then send a request and a response. @@ -252,7 +255,7 @@ io_scheduler::thread_pool worker 1 stopping io_scheduler::process event thread stop ``` -### coro::task_container +### task_container `coro::task_container` is a special container type that will maintain the lifetime of tasks that do not have a known lifetime. This is extremely useful for tasks that hold open connections to clients and possibly process multiple requests from that client before shutting down. The task doesn't know how long it will be alive but at some point in the future it will complete and need to have its resources cleaned up. The `coro::task_container` does this by wrapping the users task into anothe coroutine task that will mark itself for deletion upon completing within the parent task container. The task container should then run garbage collection periodically, or by default when a new task is added, to prune completed tasks from the container. All tasks that are stored within a `coro::task_container` must have a `void` return type since their result cannot be accessed due to the task's lifetime being indeterminate. diff --git a/README.md b/README.md index 76e3c84..9a962bf 100644 --- a/README.md +++ b/README.md @@ -14,24 +14,27 @@ * C++20 coroutines! * Modern Safe C++20 API * Higher level coroutine constructs - - coro::task - - coro::generator - - coro::event - - coro::latch - - coro::mutex + - [coro::task](#task) + - [coro::generator](#generator) + - [coro::event](#event) + - [coro::latch](#latch) + - [coro::mutex](#mutex) + - [coro::shared_mutex](#shared_mutex) + - [coro::semaphore](#semaphore) + - [coro::ring_buffer](#ring_buffer) - coro::sync_wait(awaitable) - coro::when_all(awaitable...) -> awaitable * Schedulers - - coro::thread_pool for coroutine cooperative multitasking - - coro::io_scheduler for driving i/o events, uses thread_pool for coroutine execution upon triggered events - - epoll driver - - io_uring driver (Future, will be required for async file i/o) + - [coro::thread_pool](#thread_pool) for coroutine cooperative multitasking + - [coro::io_scheduler](#io_scheduler) for driving i/o events, uses thread_pool for coroutine execution upon triggered events + - Currently uses an epoll driver + - [coro::task_container](#task_container) for dynamic task lifetimes * Coroutine Networking - coro::net::dns_resolver for async dns - Uses libc-ares - - coro::net::tcp_client + - [coro::net::tcp_client](#io_scheduler) - Supports SSL/TLS via OpenSSL - - coro::net::tcp_server + - [coro::net::tcp_server](#io_scheduler) - Supports SSL/TLS via OpenSSL - coro::net::udp_peer @@ -40,7 +43,7 @@ ### A note on co_await Its important to note with coroutines that depending on the construct used _any_ `co_await` has the potential to switch the thread that is executing the currently running coroutine. In general this shouldn't affect the way any user of the library would write code except for `thread_local`. Usage of `thread_local` should be extremely careful and _never_ used across any `co_await` boundary do to thread switching and work stealing on thread pools. -### coro::task +### task The `coro::task` is the main coroutine building block within `libcoro`. Use task to create your coroutines and `co_await` or `co_yield` tasks within tasks to perform asynchronous operations, lazily evaluation or even spreading work out across a `coro::thread_pool`. Tasks are lightweight and only begin execution upon awaiting them. If their return type is not `void` then the value can be returned by const reference or by moving (r-value reference). @@ -134,7 +137,7 @@ expensive_struct() move constructor called Answer to everything = 42 ``` -### coro::generator +### generator The `coro::generator` construct is a coroutine which can generate one or more values. ```C++ @@ -177,7 +180,7 @@ $ ./examples/coro_generator 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, ``` -### coro::event +### event The `coro::event` is a thread safe async tool to have 1 or more waiters suspend for an event to be set before proceeding. The implementation of event currently will resume execution of all waiters on the thread that sets the event. If the event is already set when a waiter goes to wait on the thread they will simply continue executing with no suspend or wait time incurred. ```C++ @@ -221,7 +224,7 @@ task 2 event triggered, now resuming. task 1 event triggered, now resuming. ``` -### coro::latch +### latch The `coro::latch` is a thread safe async tool to have 1 waiter suspend until all outstanding events have completed before proceeding. ```C++ @@ -298,7 +301,7 @@ worker task 5 is done, counting down on the latch latch task dependency tasks completed, resuming. ``` -### coro::mutex +### mutex The `coro::mutex` is a thread safe async tool to protect critical sections and only allow a single thread to execute the critical section at any given time. Mutexes that are uncontended are a simple CAS operation with a memory fence 'acquire' to behave similar to `std::mutex`. If the lock is contended then the thread will add itself to a FIFO queue of waiters and yield excution to allow another coroutine to process on that thread while it waits to acquire the lock. Its important to note that upon releasing the mutex that thread will immediately start processing the next waiter in line for the `coro::mutex`, the mutex is only unlocked/released once all waiters have been processed. This guarantees fair execution in a FIFO manner, but it also means all coroutines that stack in the waiter queue will end up shifting to the single thread that is executing all waiting coroutines. It is possible to reschedule after the critical section onto a thread pool to re-distribute the work. Perhaps an auto-reschedule on a given thread pool is a good feature to implement in the future to prevent this behavior so the post critical section work in the coroutines is redistributed amongst all available thread pool threads. @@ -351,7 +354,7 @@ $ ./examples/coro_mutex 1, 2, 3, 4, 5, 6, 7, 8, 10, 9, 12, 11, 13, 14, 15, 16, 17, 18, 19, 21, 22, 20, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 47, 48, 49, 46, 50, 51, 52, 53, 54, 55, 57, 58, 59, 56, 60, 62, 61, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, ``` -### coro::shared_mutex +### shared_mutex The `coro::shared_mutex` is a thread safe async tool to allow for multiple shared users at once but also exclusive access. The lock is acquired strictly in a FIFO manner in that if the lock is currenty held by shared users and an exclusive attempts to lock, the exclusive waiter will suspend until all the _current_ shared users finish using the lock. Any new users that attempt to lock the mutex in a shared state once there is an exclusive waiter will also wait behind the exclusive waiter. This prevents the exclusive waiter from being starved. @@ -440,7 +443,7 @@ shared task 6 unlock_shared() ``` -### coro::semaphore +### semaphore The `coro::semaphore` is a thread safe async tool to protect a limited number of resources by only allowing so many consumers to acquire the resources a single time. The `coro::semaphore` also has a maximum number of resources denoted by its constructor. This means if a resource is produced or released when the semaphore is at its maximum resource availability then the release operation will await for space to become available. This is useful for a ringbuffer type situation where the resources are produced and then consumed, but will have no effect on a semaphores usage if there is a set known quantity of resources to start with and are acquired and then released back. ```C++ @@ -481,10 +484,10 @@ $ ./examples/coro_semaphore 1, 23, 25, 24, 22, 27, 28, 29, 21, 20, 19, 18, 17, 14, 31, 30, 33, 32, 41, 40, 37, 39, 38, 36, 35, 34, 43, 46, 47, 48, 45, 42, 44, 26, 16, 15, 13, 52, 54, 55, 53, 49, 51, 57, 58, 50, 62, 63, 61, 60, 59, 56, 12, 11, 8, 10, 9, 7, 6, 5, 4, 3, 642, , 66, 67, 6568, , 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, ``` -### coro::ring_buffer -The `coro::ring_buffer` is thread safe async multi-producer multi-consumer statically sized ring buffer. Producers will that try to produce a value when the ring buffer is full will suspend until space is available. Consumers that try to consume a value when the ring buffer is empty will suspend until space is available. All waiters on the ring buffer for producing or consuming are resumed in a LIFO manner when their respective operation becomes available. +### ring_buffer +The `coro::ring_buffer` is thread safe async multi-producer multi-consumer statically sized ring buffer. Producers will that try to produce a value when the ring buffer is full will suspend until space is available. Consumers that try to consume a value when the ring buffer is empty will suspend until space is available. All waiters on the ring buffer for producing or consuming are resumed in a LIFO manner when their respective operation becomes available. -The `coro::ring_buffer` also works with `coro::stop_signal` in that if the ring buffers `stop_ +The `coro::ring_buffer` also works with `coro::stop_signal` in that if the ring buffers `stop_signal_notify_waiters()` function is called then any producers or consumers that are suspended and waiting will be awoken by throwing a `coro::stop_signal`. This can be useful to write code that will always suspend if data cannot be produced or consumed for long running daemons but will need to break out of the suspend unpon shutdown. ```C++ #include @@ -520,7 +523,7 @@ int main() auto scoped_lock = co_await m.lock(); std::cerr << "\nproducer is sending stop signal"; } - rb.stop_signal_waiters(); + rb.stop_signal_notify_waiters(); co_return; }; @@ -574,7 +577,7 @@ consumer 2 shutting down, stop signal received consumer 3 shutting down, stop signal received ``` -### coro::thread_pool +### thread_pool `coro::thread_pool` is a statically sized pool of worker threads to execute scheduled coroutines from a FIFO queue. To schedule a coroutine on a thread pool the pool's `schedule()` function should be `co_awaited` to transfer the execution from the current thread to a thread pool worker thread. Its important to note that scheduling will first place the coroutine into the FIFO queue and will be picked up by the first available thread in the pool, e.g. there could be a delay if there is a lot of work queued up. ```C++ @@ -682,7 +685,7 @@ thread pool worker 3 is shutting down. thread pool worker 0 is shutting down. ``` -### coro::io_scheduler +### io_scheduler `coro::io_scheduler` is a i/o event scheduler that uses a statically sized pool (`coro::thread_pool`) to process the events that are ready. The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually driven via its `process_events()` function for integration into existing event loops. If using the dedicated thread to process i/o events the dedicated thread does not execute and of the tasks itself, it simply schedules them to be executed on the next availble worker thread in its embedded `coro::thread_pool`. Inline execution of tasks on the i/o dedicated thread is not supported since it can introduce poor latency when an expensive task is executing. The example provided here shows an i/o scheduler that spins up a basic `coro::net::tcp_server` and a `coro::net::tcp_client` that will connect to each other and then send a request and a response. @@ -847,7 +850,7 @@ io_scheduler::thread_pool worker 1 stopping io_scheduler::process event thread stop ``` -### coro::task_container +### task_container `coro::task_container` is a special container type that will maintain the lifetime of tasks that do not have a known lifetime. This is extremely useful for tasks that hold open connections to clients and possibly process multiple requests from that client before shutting down. The task doesn't know how long it will be alive but at some point in the future it will complete and need to have its resources cleaned up. The `coro::task_container` does this by wrapping the users task into anothe coroutine task that will mark itself for deletion upon completing within the parent task container. The task container should then run garbage collection periodically, or by default when a new task is added, to prune completed tasks from the container. All tasks that are stored within a `coro::task_container` must have a `void` return type since their result cannot be accessed due to the task's lifetime being indeterminate. diff --git a/examples/coro_ring_buffer.cpp b/examples/coro_ring_buffer.cpp index 0fb898a..f4f06bd 100644 --- a/examples/coro_ring_buffer.cpp +++ b/examples/coro_ring_buffer.cpp @@ -31,7 +31,7 @@ int main() auto scoped_lock = co_await m.lock(); std::cerr << "\nproducer is sending stop signal"; } - rb.stop_signal_waiters(); + rb.stop_signal_notify_waiters(); co_return; }; diff --git a/inc/coro/ring_buffer.hpp b/inc/coro/ring_buffer.hpp index f4b8ad1..ab3d8de 100644 --- a/inc/coro/ring_buffer.hpp +++ b/inc/coro/ring_buffer.hpp @@ -34,7 +34,7 @@ public: ~ring_buffer() { // Wake up anyone still using the ring buffer. - stop_signal_waiters(); + stop_signal_notify_waiters(); } ring_buffer(const ring_buffer&) = delete; @@ -183,7 +183,7 @@ public: * will throw a coro::stop_signal. Further produce()/consume() calls will always throw * a coro::stop_signal after this is called for this ring buffer. */ - auto stop_signal_waiters() -> void + auto stop_signal_notify_waiters() -> void { // Only wake up waiters once. if (m_stopped.load(std::memory_order::acquire)) diff --git a/test/test_ring_buffer.cpp b/test/test_ring_buffer.cpp index 7447e69..48d3ecd 100644 --- a/test/test_ring_buffer.cpp +++ b/test/test_ring_buffer.cpp @@ -70,7 +70,7 @@ TEST_CASE("ring_buffer many elements many producers many consumers", "[ring_buff co_await tp.yield(); } - rb.stop_signal_waiters(); // signal to all consumers (or even producers) we are done/shutting down. + rb.stop_signal_notify_waiters(); // signal to all consumers (or even producers) we are done/shutting down. co_return; };