From 8ecd35af404acb9f335695fed8ae01e006167659 Mon Sep 17 00:00:00 2001 From: Josh Baldwin Date: Sun, 7 Mar 2021 11:08:31 -0700 Subject: [PATCH] Backport bugs found from issue-72 (lockless thread_pool queue) (#75) --- inc/coro/semaphore.hpp | 9 ++++--- inc/coro/thread_pool.hpp | 17 ++++-------- src/semaphore.cpp | 18 +++++++++---- src/shared_mutex.cpp | 8 ++++++ test/bench.cpp | 35 ++++++++++++++++++++++++ test/test_semaphore.cpp | 58 +++++++++++++++++++++++----------------- 6 files changed, 100 insertions(+), 45 deletions(-) diff --git a/inc/coro/semaphore.hpp b/inc/coro/semaphore.hpp index 061dc1d..5455591 100644 --- a/inc/coro/semaphore.hpp +++ b/inc/coro/semaphore.hpp @@ -1,5 +1,7 @@ #pragma once +#include "coro/stop_signal.hpp" + #include #include #include @@ -26,7 +28,7 @@ public: auto await_ready() const noexcept -> bool; auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool; - auto await_resume() const noexcept -> bool; + auto await_resume() const -> void; private: friend semaphore; @@ -41,6 +43,7 @@ public: /** * Acquires a resource from the semaphore, if the semaphore has no resources available then * this will wait until a resource becomes available. + * @throws coro::stop_signal If the semaphore has been requested to stop. */ [[nodiscard]] auto acquire() -> acquire_operation { return acquire_operation{*this}; } @@ -64,7 +67,7 @@ public: * Stops the semaphore and will notify all release/acquire waiters to wake up in a failed state. * Once this is set it cannot be un-done and all future oprations on the semaphore will fail. */ - auto stop_notify_all() noexcept -> void; + auto stop_signal_notify_waiters() noexcept -> void; private: friend class release_operation; @@ -76,7 +79,7 @@ private: std::mutex m_waiter_mutex{}; acquire_operation* m_acquire_waiters{nullptr}; - bool m_notify_all_set{false}; + std::atomic m_notify_all_set{false}; }; } // namespace coro diff --git a/inc/coro/thread_pool.hpp b/inc/coro/thread_pool.hpp index 55d10e7..8ee9b81 100644 --- a/inc/coro/thread_pool.hpp +++ b/inc/coro/thread_pool.hpp @@ -177,12 +177,6 @@ public: private: /// The configuration options. options m_opts; - -protected: - /// Has the thread pool been requested to shut down? - std::atomic m_shutdown_requested{false}; - -private: /// The background executor threads. std::vector m_threads; @@ -192,12 +186,6 @@ private: std::condition_variable_any m_wait_cv; /// FIFO queue of tasks waiting to be executed. std::deque> m_queue; - -protected: - /// The number of tasks in the queue + currently executing. - std::atomic m_size{0}; - -private: /** * Each background thread runs from this function. * @param stop_token Token which signals when shutdown() has been called. @@ -211,6 +199,11 @@ private: auto schedule_impl(std::coroutine_handle<> handle) noexcept -> void; protected: + /// The number of tasks in the queue + currently executing. + std::atomic m_size{0}; + /// Has the thread pool been requested to shut down? + std::atomic m_shutdown_requested{false}; + /// Required to resume all waiters of the event onto a thread_pool. friend event; friend shared_mutex; diff --git a/src/semaphore.cpp b/src/semaphore.cpp index 7b3bee6..6275859 100644 --- a/src/semaphore.cpp +++ b/src/semaphore.cpp @@ -15,7 +15,7 @@ semaphore::semaphore(std::ptrdiff_t least_max_value, std::ptrdiff_t starting_val semaphore::~semaphore() { - stop_notify_all(); + stop_signal_notify_waiters(); } semaphore::acquire_operation::acquire_operation(semaphore& s) : m_semaphore(s) @@ -24,13 +24,17 @@ semaphore::acquire_operation::acquire_operation(semaphore& s) : m_semaphore(s) auto semaphore::acquire_operation::await_ready() const noexcept -> bool { + if (m_semaphore.m_notify_all_set.load(std::memory_order::relaxed)) + { + return true; + } return m_semaphore.try_acquire(); } auto semaphore::acquire_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool { std::unique_lock lk{m_semaphore.m_waiter_mutex}; - if (m_semaphore.m_notify_all_set) + if (m_semaphore.m_notify_all_set.load(std::memory_order::relaxed)) { return false; } @@ -58,9 +62,12 @@ auto semaphore::acquire_operation::await_suspend(std::coroutine_handle<> awaitin return true; } -auto semaphore::acquire_operation::await_resume() const noexcept -> bool +auto semaphore::acquire_operation::await_resume() const -> void { - return !m_semaphore.m_notify_all_set; + if (m_semaphore.m_notify_all_set.load(std::memory_order::relaxed)) + { + throw coro::stop_signal{}; + } } auto semaphore::release() -> void @@ -100,8 +107,9 @@ auto semaphore::try_acquire() -> bool return true; } -auto semaphore::stop_notify_all() noexcept -> void +auto semaphore::stop_signal_notify_waiters() noexcept -> void { + m_notify_all_set.exchange(true, std::memory_order::release); while (true) { std::unique_lock lk{m_waiter_mutex}; diff --git a/src/shared_mutex.cpp b/src/shared_mutex.cpp index 6e15116..453d7cd 100644 --- a/src/shared_mutex.cpp +++ b/src/shared_mutex.cpp @@ -184,6 +184,10 @@ auto shared_mutex::wake_waiters(std::unique_lock& lk) -> void lock_operation* to_resume = m_head_waiter; m_head_waiter = m_head_waiter->m_next; --m_exclusive_waiters; + if (m_head_waiter == nullptr) + { + m_tail_waiter = nullptr; + } // Since this is an exclusive lock waiting we can resume it directly. lk.unlock(); @@ -198,6 +202,10 @@ auto shared_mutex::wake_waiters(std::unique_lock& lk) -> void { lock_operation* to_resume = m_head_waiter; m_head_waiter = m_head_waiter->m_next; + if (m_head_waiter == nullptr) + { + m_tail_waiter = nullptr; + } ++m_shared_users; m_thread_pool.resume(to_resume->m_awaiting_coroutine); diff --git a/test/bench.cpp b/test/bench.cpp index b4dcd58..d396a81 100644 --- a/test/bench.cpp +++ b/test/bench.cpp @@ -182,6 +182,41 @@ TEST_CASE("benchmark thread_pool{2} counter task", "[benchmark]") REQUIRE(tp.empty()); } +TEST_CASE("benchmark thread_pool{N} counter task", "[benchmark]") +{ + constexpr std::size_t iterations = default_iterations; + + coro::thread_pool tp{}; + std::atomic counter{0}; + + auto make_task = [](coro::thread_pool& tp, std::atomic& c) -> coro::task { + co_await tp.schedule(); + c.fetch_add(1, std::memory_order::relaxed); + co_return; + }; + + std::vector> tasks; + tasks.reserve(iterations); + + auto start = sc::now(); + + for (std::size_t i = 0; i < iterations; ++i) + { + tasks.emplace_back(make_task(tp, counter)); + tasks.back().resume(); + } + + // This will fail in valgrind since it runs in a single 'thread', and thus is shutsdown prior + // to any coroutine actually getting properly scheduled onto the background thread pool. + // Inject a sleep here so it forces a thread context switch within valgrind. + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + tp.shutdown(); + + print_stats("benchmark thread_pool{N} counter task", iterations, start, sc::now()); + REQUIRE(counter == iterations); + REQUIRE(tp.empty()); +} + TEST_CASE("benchmark counter task scheduler{1} yield", "[benchmark]") { constexpr std::size_t iterations = default_iterations; diff --git a/test/test_semaphore.cpp b/test/test_semaphore.cpp index 19a76ba..5e1b124 100644 --- a/test/test_semaphore.cpp +++ b/test/test_semaphore.cpp @@ -111,25 +111,30 @@ TEST_CASE("semaphore ringbuffer", "[semaphore]") auto make_consumer_task = [&](uint64_t id) -> coro::task { co_await tp.schedule(); - while (value.load(std::memory_order::acquire) < iterations) + try { - std::cerr << "id = " << id << " waiting to acquire the semaphore\n"; - co_await s.acquire(); - std::cerr << "id = " << id << " semaphore acquired, consuming value\n"; + while (true) + { + std::cerr << "id = " << id << " waiting to acquire the semaphore\n"; + co_await s.acquire(); + std::cerr << "id = " << id << " semaphore acquired, consuming value\n"; - value.fetch_add(1, std::memory_order::release); - // In the ringbfuffer acquire is 'consuming', we never release back into the buffer + value.fetch_add(1, std::memory_order::release); + // In the ringbfuffer acquire is 'consuming', we never release back into the buffer + } + } + catch (const coro::stop_signal&) + { + std::cerr << "id = " << id << " exiting\n"; } - std::cerr << "id = " << id << " exiting\n"; - s.stop_notify_all(); co_return; }; auto make_producer_task = [&]() -> coro::task { co_await tp.schedule(); - while (value.load(std::memory_order::acquire) < iterations) + for (size_t i = 2; i < iterations; ++i) { std::cerr << "producer: doing work\n"; // Do some work... @@ -141,7 +146,7 @@ TEST_CASE("semaphore ringbuffer", "[semaphore]") } std::cerr << "producer exiting\n"; - s.stop_notify_all(); + s.stop_signal_notify_waiters(); co_return; }; @@ -157,32 +162,30 @@ TEST_CASE("semaphore ringbuffer many producers and consumers", "[semaphore]") { const std::size_t consumers = 16; const std::size_t producers = 1; - const std::size_t iterations = 1'000'000; + const std::size_t iterations = 100'000; std::atomic value{0}; coro::semaphore s{50, 0}; - coro::thread_pool tp{}; // let er rip + coro::io_scheduler tp{}; // let er rip auto make_consumer_task = [&](uint64_t id) -> coro::task { co_await tp.schedule(); - while (value.load(std::memory_order::acquire) < iterations) + try { - auto success = co_await s.acquire(); - if (!success) + while (true) { - break; + co_await s.acquire(); + co_await tp.schedule(); + value.fetch_add(1, std::memory_order::relaxed); } - - co_await tp.schedule(); - value.fetch_add(1, std::memory_order::relaxed); } - - std::cerr << "consumer " << id << " exiting\n"; - - s.stop_notify_all(); + catch (const coro::stop_signal&) + { + std::cerr << "consumer " << id << " exiting\n"; + } co_return; }; @@ -190,14 +193,19 @@ TEST_CASE("semaphore ringbuffer many producers and consumers", "[semaphore]") auto make_producer_task = [&](uint64_t id) -> coro::task { co_await tp.schedule(); - while (value.load(std::memory_order::acquire) < iterations) + for (size_t i = 0; i < iterations; ++i) { s.release(); } + while (value.load(std::memory_order::relaxed) < iterations) + { + co_await tp.yield_for(std::chrono::milliseconds{1}); + } + std::cerr << "producer " << id << " exiting\n"; - s.stop_notify_all(); + s.stop_signal_notify_waiters(); co_return; };