mirror of
https://gitlab.com/niansa/libcrosscoro.git
synced 2025-03-06 20:53:32 +01:00
Backport bugs found from issue-72 (lockless thread_pool queue) (#75)
This commit is contained in:
parent
19d626c1fb
commit
8ecd35af40
6 changed files with 100 additions and 45 deletions
|
@ -1,5 +1,7 @@
|
|||
#pragma once
|
||||
|
||||
#include "coro/stop_signal.hpp"
|
||||
|
||||
#include <atomic>
|
||||
#include <coroutine>
|
||||
#include <mutex>
|
||||
|
@ -26,7 +28,7 @@ public:
|
|||
|
||||
auto await_ready() const noexcept -> bool;
|
||||
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool;
|
||||
auto await_resume() const noexcept -> bool;
|
||||
auto await_resume() const -> void;
|
||||
|
||||
private:
|
||||
friend semaphore;
|
||||
|
@ -41,6 +43,7 @@ public:
|
|||
/**
|
||||
* Acquires a resource from the semaphore, if the semaphore has no resources available then
|
||||
* this will wait until a resource becomes available.
|
||||
* @throws coro::stop_signal If the semaphore has been requested to stop.
|
||||
*/
|
||||
[[nodiscard]] auto acquire() -> acquire_operation { return acquire_operation{*this}; }
|
||||
|
||||
|
@ -64,7 +67,7 @@ public:
|
|||
* Stops the semaphore and will notify all release/acquire waiters to wake up in a failed state.
|
||||
* Once this is set it cannot be un-done and all future oprations on the semaphore will fail.
|
||||
*/
|
||||
auto stop_notify_all() noexcept -> void;
|
||||
auto stop_signal_notify_waiters() noexcept -> void;
|
||||
|
||||
private:
|
||||
friend class release_operation;
|
||||
|
@ -76,7 +79,7 @@ private:
|
|||
std::mutex m_waiter_mutex{};
|
||||
acquire_operation* m_acquire_waiters{nullptr};
|
||||
|
||||
bool m_notify_all_set{false};
|
||||
std::atomic<bool> m_notify_all_set{false};
|
||||
};
|
||||
|
||||
} // namespace coro
|
||||
|
|
|
@ -177,12 +177,6 @@ public:
|
|||
private:
|
||||
/// The configuration options.
|
||||
options m_opts;
|
||||
|
||||
protected:
|
||||
/// Has the thread pool been requested to shut down?
|
||||
std::atomic<bool> m_shutdown_requested{false};
|
||||
|
||||
private:
|
||||
/// The background executor threads.
|
||||
std::vector<std::jthread> m_threads;
|
||||
|
||||
|
@ -192,12 +186,6 @@ private:
|
|||
std::condition_variable_any m_wait_cv;
|
||||
/// FIFO queue of tasks waiting to be executed.
|
||||
std::deque<std::coroutine_handle<>> m_queue;
|
||||
|
||||
protected:
|
||||
/// The number of tasks in the queue + currently executing.
|
||||
std::atomic<std::size_t> m_size{0};
|
||||
|
||||
private:
|
||||
/**
|
||||
* Each background thread runs from this function.
|
||||
* @param stop_token Token which signals when shutdown() has been called.
|
||||
|
@ -211,6 +199,11 @@ private:
|
|||
auto schedule_impl(std::coroutine_handle<> handle) noexcept -> void;
|
||||
|
||||
protected:
|
||||
/// The number of tasks in the queue + currently executing.
|
||||
std::atomic<std::size_t> m_size{0};
|
||||
/// Has the thread pool been requested to shut down?
|
||||
std::atomic<bool> m_shutdown_requested{false};
|
||||
|
||||
/// Required to resume all waiters of the event onto a thread_pool.
|
||||
friend event;
|
||||
friend shared_mutex;
|
||||
|
|
|
@ -15,7 +15,7 @@ semaphore::semaphore(std::ptrdiff_t least_max_value, std::ptrdiff_t starting_val
|
|||
|
||||
semaphore::~semaphore()
|
||||
{
|
||||
stop_notify_all();
|
||||
stop_signal_notify_waiters();
|
||||
}
|
||||
|
||||
semaphore::acquire_operation::acquire_operation(semaphore& s) : m_semaphore(s)
|
||||
|
@ -24,13 +24,17 @@ semaphore::acquire_operation::acquire_operation(semaphore& s) : m_semaphore(s)
|
|||
|
||||
auto semaphore::acquire_operation::await_ready() const noexcept -> bool
|
||||
{
|
||||
if (m_semaphore.m_notify_all_set.load(std::memory_order::relaxed))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
return m_semaphore.try_acquire();
|
||||
}
|
||||
|
||||
auto semaphore::acquire_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
||||
{
|
||||
std::unique_lock lk{m_semaphore.m_waiter_mutex};
|
||||
if (m_semaphore.m_notify_all_set)
|
||||
if (m_semaphore.m_notify_all_set.load(std::memory_order::relaxed))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -58,9 +62,12 @@ auto semaphore::acquire_operation::await_suspend(std::coroutine_handle<> awaitin
|
|||
return true;
|
||||
}
|
||||
|
||||
auto semaphore::acquire_operation::await_resume() const noexcept -> bool
|
||||
auto semaphore::acquire_operation::await_resume() const -> void
|
||||
{
|
||||
return !m_semaphore.m_notify_all_set;
|
||||
if (m_semaphore.m_notify_all_set.load(std::memory_order::relaxed))
|
||||
{
|
||||
throw coro::stop_signal{};
|
||||
}
|
||||
}
|
||||
|
||||
auto semaphore::release() -> void
|
||||
|
@ -100,8 +107,9 @@ auto semaphore::try_acquire() -> bool
|
|||
return true;
|
||||
}
|
||||
|
||||
auto semaphore::stop_notify_all() noexcept -> void
|
||||
auto semaphore::stop_signal_notify_waiters() noexcept -> void
|
||||
{
|
||||
m_notify_all_set.exchange(true, std::memory_order::release);
|
||||
while (true)
|
||||
{
|
||||
std::unique_lock lk{m_waiter_mutex};
|
||||
|
|
|
@ -184,6 +184,10 @@ auto shared_mutex::wake_waiters(std::unique_lock<std::mutex>& lk) -> void
|
|||
lock_operation* to_resume = m_head_waiter;
|
||||
m_head_waiter = m_head_waiter->m_next;
|
||||
--m_exclusive_waiters;
|
||||
if (m_head_waiter == nullptr)
|
||||
{
|
||||
m_tail_waiter = nullptr;
|
||||
}
|
||||
|
||||
// Since this is an exclusive lock waiting we can resume it directly.
|
||||
lk.unlock();
|
||||
|
@ -198,6 +202,10 @@ auto shared_mutex::wake_waiters(std::unique_lock<std::mutex>& lk) -> void
|
|||
{
|
||||
lock_operation* to_resume = m_head_waiter;
|
||||
m_head_waiter = m_head_waiter->m_next;
|
||||
if (m_head_waiter == nullptr)
|
||||
{
|
||||
m_tail_waiter = nullptr;
|
||||
}
|
||||
++m_shared_users;
|
||||
|
||||
m_thread_pool.resume(to_resume->m_awaiting_coroutine);
|
||||
|
|
|
@ -182,6 +182,41 @@ TEST_CASE("benchmark thread_pool{2} counter task", "[benchmark]")
|
|||
REQUIRE(tp.empty());
|
||||
}
|
||||
|
||||
TEST_CASE("benchmark thread_pool{N} counter task", "[benchmark]")
|
||||
{
|
||||
constexpr std::size_t iterations = default_iterations;
|
||||
|
||||
coro::thread_pool tp{};
|
||||
std::atomic<uint64_t> counter{0};
|
||||
|
||||
auto make_task = [](coro::thread_pool& tp, std::atomic<uint64_t>& c) -> coro::task<void> {
|
||||
co_await tp.schedule();
|
||||
c.fetch_add(1, std::memory_order::relaxed);
|
||||
co_return;
|
||||
};
|
||||
|
||||
std::vector<coro::task<void>> tasks;
|
||||
tasks.reserve(iterations);
|
||||
|
||||
auto start = sc::now();
|
||||
|
||||
for (std::size_t i = 0; i < iterations; ++i)
|
||||
{
|
||||
tasks.emplace_back(make_task(tp, counter));
|
||||
tasks.back().resume();
|
||||
}
|
||||
|
||||
// This will fail in valgrind since it runs in a single 'thread', and thus is shutsdown prior
|
||||
// to any coroutine actually getting properly scheduled onto the background thread pool.
|
||||
// Inject a sleep here so it forces a thread context switch within valgrind.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{10});
|
||||
tp.shutdown();
|
||||
|
||||
print_stats("benchmark thread_pool{N} counter task", iterations, start, sc::now());
|
||||
REQUIRE(counter == iterations);
|
||||
REQUIRE(tp.empty());
|
||||
}
|
||||
|
||||
TEST_CASE("benchmark counter task scheduler{1} yield", "[benchmark]")
|
||||
{
|
||||
constexpr std::size_t iterations = default_iterations;
|
||||
|
|
|
@ -111,7 +111,9 @@ TEST_CASE("semaphore ringbuffer", "[semaphore]")
|
|||
auto make_consumer_task = [&](uint64_t id) -> coro::task<void> {
|
||||
co_await tp.schedule();
|
||||
|
||||
while (value.load(std::memory_order::acquire) < iterations)
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
std::cerr << "id = " << id << " waiting to acquire the semaphore\n";
|
||||
co_await s.acquire();
|
||||
|
@ -120,16 +122,19 @@ TEST_CASE("semaphore ringbuffer", "[semaphore]")
|
|||
value.fetch_add(1, std::memory_order::release);
|
||||
// In the ringbfuffer acquire is 'consuming', we never release back into the buffer
|
||||
}
|
||||
|
||||
}
|
||||
catch (const coro::stop_signal&)
|
||||
{
|
||||
std::cerr << "id = " << id << " exiting\n";
|
||||
s.stop_notify_all();
|
||||
}
|
||||
|
||||
co_return;
|
||||
};
|
||||
|
||||
auto make_producer_task = [&]() -> coro::task<void> {
|
||||
co_await tp.schedule();
|
||||
|
||||
while (value.load(std::memory_order::acquire) < iterations)
|
||||
for (size_t i = 2; i < iterations; ++i)
|
||||
{
|
||||
std::cerr << "producer: doing work\n";
|
||||
// Do some work...
|
||||
|
@ -141,7 +146,7 @@ TEST_CASE("semaphore ringbuffer", "[semaphore]")
|
|||
}
|
||||
|
||||
std::cerr << "producer exiting\n";
|
||||
s.stop_notify_all();
|
||||
s.stop_signal_notify_waiters();
|
||||
co_return;
|
||||
};
|
||||
|
||||
|
@ -157,32 +162,30 @@ TEST_CASE("semaphore ringbuffer many producers and consumers", "[semaphore]")
|
|||
{
|
||||
const std::size_t consumers = 16;
|
||||
const std::size_t producers = 1;
|
||||
const std::size_t iterations = 1'000'000;
|
||||
const std::size_t iterations = 100'000;
|
||||
|
||||
std::atomic<uint64_t> value{0};
|
||||
|
||||
coro::semaphore s{50, 0};
|
||||
|
||||
coro::thread_pool tp{}; // let er rip
|
||||
coro::io_scheduler tp{}; // let er rip
|
||||
|
||||
auto make_consumer_task = [&](uint64_t id) -> coro::task<void> {
|
||||
co_await tp.schedule();
|
||||
|
||||
while (value.load(std::memory_order::acquire) < iterations)
|
||||
try
|
||||
{
|
||||
auto success = co_await s.acquire();
|
||||
if (!success)
|
||||
while (true)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
co_await s.acquire();
|
||||
co_await tp.schedule();
|
||||
value.fetch_add(1, std::memory_order::relaxed);
|
||||
}
|
||||
|
||||
}
|
||||
catch (const coro::stop_signal&)
|
||||
{
|
||||
std::cerr << "consumer " << id << " exiting\n";
|
||||
|
||||
s.stop_notify_all();
|
||||
}
|
||||
|
||||
co_return;
|
||||
};
|
||||
|
@ -190,14 +193,19 @@ TEST_CASE("semaphore ringbuffer many producers and consumers", "[semaphore]")
|
|||
auto make_producer_task = [&](uint64_t id) -> coro::task<void> {
|
||||
co_await tp.schedule();
|
||||
|
||||
while (value.load(std::memory_order::acquire) < iterations)
|
||||
for (size_t i = 0; i < iterations; ++i)
|
||||
{
|
||||
s.release();
|
||||
}
|
||||
|
||||
while (value.load(std::memory_order::relaxed) < iterations)
|
||||
{
|
||||
co_await tp.yield_for(std::chrono::milliseconds{1});
|
||||
}
|
||||
|
||||
std::cerr << "producer " << id << " exiting\n";
|
||||
|
||||
s.stop_notify_all();
|
||||
s.stop_signal_notify_waiters();
|
||||
|
||||
co_return;
|
||||
};
|
||||
|
|
Loading…
Add table
Reference in a new issue