mirror of
https://gitlab.com/niansa/libcrosscoro.git
synced 2025-03-06 20:53:32 +01:00
thread_pool allow functors to be executed standalone (#12)
* thread_pool allow functors to be executed standalone Closes #11 * Fix wakup issue for executors (whoops deleted a !)
This commit is contained in:
parent
33df116b40
commit
2fb6624c48
8 changed files with 331 additions and 56 deletions
|
@ -3,6 +3,7 @@
|
|||
#include <concepts>
|
||||
#include <coroutine>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
namespace coro
|
||||
{
|
||||
|
@ -43,7 +44,7 @@ struct awaitable_traits
|
|||
template<awaitable awaitable>
|
||||
static auto get_awaiter(awaitable&& value)
|
||||
{
|
||||
return static_cast<awaitable&&>(value).operator co_await();
|
||||
return std::forward<awaitable>(value).operator co_await();
|
||||
}
|
||||
|
||||
template<awaitable awaitable>
|
||||
|
|
|
@ -194,10 +194,17 @@ public:
|
|||
m_coroutine.promise().start(event);
|
||||
}
|
||||
|
||||
// todo specialize for type void
|
||||
auto return_value() -> return_type
|
||||
auto return_value() -> decltype(auto)
|
||||
{
|
||||
return m_coroutine.promise().return_value();
|
||||
if constexpr (std::is_same_v<void, return_type>)
|
||||
{
|
||||
m_coroutine.promise().return_value();
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
return m_coroutine.promise().return_value();
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
#include <coroutine>
|
||||
#include <exception>
|
||||
#include <utility>
|
||||
|
||||
namespace coro
|
||||
{
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#pragma once
|
||||
|
||||
#include "coro/shutdown.hpp"
|
||||
#include "coro/task.hpp"
|
||||
|
||||
#include <atomic>
|
||||
#include <vector>
|
||||
|
@ -10,32 +11,76 @@
|
|||
#include <coroutine>
|
||||
#include <deque>
|
||||
#include <optional>
|
||||
|
||||
#include <iostream>
|
||||
#include <functional>
|
||||
|
||||
namespace coro
|
||||
{
|
||||
|
||||
class thread_pool;
|
||||
|
||||
/**
|
||||
* Creates a thread pool that executes arbitrary coroutine tasks in a FIFO scheduler policy.
|
||||
* The thread pool by default will create an execution thread per available core on the system.
|
||||
*
|
||||
* When shutting down, either by the thread pool destructing or by manually calling shutdown()
|
||||
* the thread pool will stop accepting new tasks but will complete all tasks that were scheduled
|
||||
* prior to the shutdown request.
|
||||
*/
|
||||
class thread_pool
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* An operation is an awaitable type with a coroutine to resume the task scheduled on one of
|
||||
* the executor threads.
|
||||
*/
|
||||
class operation
|
||||
{
|
||||
friend class thread_pool;
|
||||
public:
|
||||
/**
|
||||
* Only thread_pool's can create operations when a task is being scheduled.
|
||||
* @param tp The thread pool that created this operation.
|
||||
*/
|
||||
explicit operation(thread_pool& tp) noexcept;
|
||||
|
||||
public:
|
||||
/**
|
||||
* Operations always pause so the executing thread and be switched.
|
||||
*/
|
||||
auto await_ready() noexcept -> bool { return false; }
|
||||
|
||||
/**
|
||||
* Suspending always returns to the caller (using void return of await_suspend()) and
|
||||
* stores the coroutine internally for the executing thread to resume from.
|
||||
*/
|
||||
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void;
|
||||
auto await_resume() noexcept -> void { /* no-op */ }
|
||||
|
||||
/**
|
||||
* no-op as this is the function called first by the thread pool's executing thread.
|
||||
*/
|
||||
auto await_resume() noexcept -> void { }
|
||||
private:
|
||||
/// The thread pool that this operation will execute on.
|
||||
thread_pool& m_thread_pool;
|
||||
/// The coroutine awaiting execution.
|
||||
std::coroutine_handle<> m_awaiting_coroutine{nullptr};
|
||||
};
|
||||
|
||||
explicit thread_pool(uint32_t thread_count = std::thread::hardware_concurrency());
|
||||
struct options
|
||||
{
|
||||
/// The number of executor threads for this thread pool. Uses the hardware concurrency
|
||||
/// value by default.
|
||||
uint32_t thread_count = std::thread::hardware_concurrency();
|
||||
/// Functor to call on each executor thread upon starting execution.
|
||||
std::function<void(std::size_t)> on_thread_start_functor = nullptr;
|
||||
/// Functor to call on each executor thread upon stopping execution.
|
||||
std::function<void(std::size_t)> on_thread_stop_functor = nullptr;
|
||||
};
|
||||
|
||||
/**
|
||||
* @param opts Thread pool configuration options.
|
||||
*/
|
||||
explicit thread_pool(options opts = options{
|
||||
std::thread::hardware_concurrency(),
|
||||
nullptr,
|
||||
nullptr
|
||||
});
|
||||
|
||||
thread_pool(const thread_pool&) = delete;
|
||||
thread_pool(thread_pool&&) = delete;
|
||||
|
@ -44,30 +89,113 @@ public:
|
|||
|
||||
~thread_pool();
|
||||
|
||||
auto thread_count() const -> uint32_t { return m_threads.size(); }
|
||||
auto thread_count() const noexcept -> uint32_t { return m_threads.size(); }
|
||||
|
||||
/**
|
||||
* Schedules the currently executing coroutine to be run on this thread pool. This must be
|
||||
* called from within the coroutines function body to schedule the coroutine on the thread pool.
|
||||
* @return The operation to switch from the calling scheduling thread to the executor thread
|
||||
* pool thread. This will return nullopt if the schedule fails, currently the only
|
||||
* way for this to fail is if `shudown()` has been called.
|
||||
*/
|
||||
[[nodiscard]]
|
||||
auto schedule() noexcept -> std::optional<operation>;
|
||||
|
||||
auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) -> void;
|
||||
/**
|
||||
* @throw std::runtime_error If the thread pool is `shutdown()` scheduling new tasks is not permitted.
|
||||
* @param f The function to execute on the thread pool.
|
||||
* @param args The arguments to call the functor with.
|
||||
* @return A task that wraps the given functor to be executed on the thread pool.
|
||||
*/
|
||||
template<typename functor, typename... arguments>
|
||||
[[nodiscard]]
|
||||
auto schedule(functor&& f, arguments... args) noexcept -> task<decltype(f(std::forward<arguments>(args)...))>
|
||||
{
|
||||
auto scheduled = schedule();
|
||||
if(!scheduled.has_value())
|
||||
{
|
||||
throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
|
||||
}
|
||||
|
||||
auto size() const -> std::size_t { return m_size.load(std::memory_order::relaxed); }
|
||||
auto empty() const -> bool { return size() == 0; }
|
||||
co_await scheduled.value();
|
||||
|
||||
if constexpr (std::is_same_v<void, decltype(f(std::forward<arguments>(args)...))>)
|
||||
{
|
||||
f(std::forward<arguments>(args)...);
|
||||
co_return;
|
||||
}
|
||||
else
|
||||
{
|
||||
co_return f(std::forward<arguments>(args)...);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutsdown the thread pool. This will finish any tasks scheduled prior to calling this
|
||||
* function but will prevent the thread pool from scheduling any new tasks.
|
||||
* @param wait_for_tasks Should this function block until all remaining scheduled tasks have
|
||||
* completed? Pass in sync to wait, or async to not block.
|
||||
*/
|
||||
auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) noexcept -> void;
|
||||
|
||||
/**
|
||||
* @return The number of tasks waiting in the task queue + the executing tasks.
|
||||
*/
|
||||
auto size() const noexcept -> std::size_t { return m_size.load(std::memory_order::relaxed); }
|
||||
|
||||
/**
|
||||
* @return True if the task queue is empty and zero tasks are currently executing.
|
||||
*/
|
||||
auto empty() const noexcept -> bool { return size() == 0; }
|
||||
|
||||
/**
|
||||
* @return The number of tasks waiting in the task queue to be executed.
|
||||
*/
|
||||
auto queue_size() const noexcept -> std::size_t
|
||||
{
|
||||
// Might not be totally perfect but good enough, avoids acquiring the lock for now.
|
||||
std::atomic_thread_fence(std::memory_order::acquire);
|
||||
return m_queue.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if the task queue is currently empty.
|
||||
*/
|
||||
auto queue_empty() const noexcept -> bool { return queue_size() == 0; }
|
||||
private:
|
||||
/// The configuration options.
|
||||
options m_opts;
|
||||
|
||||
/// Has the thread pool been requested to shut down?
|
||||
std::atomic<bool> m_shutdown_requested{false};
|
||||
|
||||
std::vector<std::thread> m_threads;
|
||||
/// The background executor threads.
|
||||
std::vector<std::jthread> m_threads;
|
||||
|
||||
std::mutex m_queue_cv_mutex;
|
||||
std::condition_variable m_queue_cv;
|
||||
/// Mutex for executor threads to sleep on the condition variable.
|
||||
std::mutex m_wait_mutex;
|
||||
/// Condition variable for each executor thread to wait on when no tasks are available.
|
||||
std::condition_variable_any m_wait_cv;
|
||||
|
||||
/// Mutex to guard the queue of FIFO tasks.
|
||||
std::mutex m_queue_mutex;
|
||||
/// FIFO queue of tasks waiting to be executed.
|
||||
std::deque<operation*> m_queue;
|
||||
|
||||
/// The number of tasks in the queue + currently executing.
|
||||
std::atomic<std::size_t> m_size{0};
|
||||
|
||||
auto run(uint32_t worker_idx) -> void;
|
||||
auto join() -> void;
|
||||
auto schedule_impl(operation* op) -> void;
|
||||
/**
|
||||
* Each background thread runs from this function.
|
||||
* @param stop_token Token which signals when shutdown() has been called.
|
||||
* @param idx The executor's idx for internal data structure accesses.
|
||||
*/
|
||||
auto executor(std::stop_token stop_token, std::size_t idx) -> void;
|
||||
|
||||
/**
|
||||
* @param op Schedules the given operation to be executed upon the first available thread.
|
||||
*/
|
||||
auto schedule_impl(operation* op) noexcept -> void;
|
||||
};
|
||||
|
||||
} // namespace coro
|
||||
|
|
|
@ -19,55 +19,68 @@ auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coro
|
|||
// something else while this coroutine gets picked up by the thread pool.
|
||||
}
|
||||
|
||||
thread_pool::thread_pool(uint32_t thread_count)
|
||||
thread_pool::thread_pool(options opts)
|
||||
: m_opts(std::move(opts))
|
||||
{
|
||||
m_threads.reserve(thread_count);
|
||||
for(uint32_t i = 0; i < thread_count; ++i)
|
||||
m_threads.reserve(m_opts.thread_count);
|
||||
|
||||
for(uint32_t i = 0; i < m_opts.thread_count; ++i)
|
||||
{
|
||||
m_threads.emplace_back([this, i] { run(i); });
|
||||
m_threads.emplace_back([this, i](std::stop_token st) { executor(std::move(st), i); });
|
||||
}
|
||||
}
|
||||
|
||||
thread_pool::~thread_pool()
|
||||
{
|
||||
shutdown();
|
||||
|
||||
// If shutdown was called manually by the user with shutdown_t::async then the background
|
||||
// worker threads need to be joined upon the thread pool destruction.
|
||||
join();
|
||||
}
|
||||
|
||||
auto thread_pool::schedule() noexcept -> std::optional<operation>
|
||||
{
|
||||
if(!m_shutdown_requested.load(std::memory_order::relaxed))
|
||||
{
|
||||
m_size.fetch_add(1, std::memory_order::relaxed);
|
||||
m_size.fetch_add(1, std::memory_order_relaxed);
|
||||
return {operation{*this}};
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
auto thread_pool::shutdown(shutdown_t wait_for_tasks) -> void
|
||||
auto thread_pool::shutdown(shutdown_t wait_for_tasks) noexcept -> void
|
||||
{
|
||||
if (!m_shutdown_requested.exchange(true, std::memory_order::release))
|
||||
{
|
||||
m_queue_cv.notify_all();
|
||||
for(auto& thread : m_threads)
|
||||
{
|
||||
thread.request_stop();
|
||||
}
|
||||
|
||||
if(wait_for_tasks == shutdown_t::sync)
|
||||
{
|
||||
join();
|
||||
for(auto& thread : m_threads)
|
||||
{
|
||||
if(thread.joinable())
|
||||
{
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto thread_pool::run(uint32_t worker_idx) -> void
|
||||
auto thread_pool::executor(std::stop_token stop_token, std::size_t idx) -> void
|
||||
{
|
||||
if(m_opts.on_thread_start_functor != nullptr)
|
||||
{
|
||||
m_opts.on_thread_start_functor(idx);
|
||||
}
|
||||
|
||||
while(true)
|
||||
{
|
||||
// Wait until the queue has operations to execute or shutdown has been requested.
|
||||
{
|
||||
std::unique_lock<std::mutex> lk{m_queue_cv_mutex};
|
||||
m_queue_cv.wait(lk, [this] { return !m_queue.empty() || m_shutdown_requested.load(std::memory_order::relaxed); });
|
||||
std::unique_lock<std::mutex> lk{m_wait_mutex};
|
||||
m_wait_cv.wait(lk, stop_token, [this] { return !m_queue.empty(); });
|
||||
}
|
||||
|
||||
// Continue to pull operations from the global queue until its empty.
|
||||
|
@ -90,34 +103,34 @@ auto thread_pool::run(uint32_t worker_idx) -> void
|
|||
if(op != nullptr && op->m_awaiting_coroutine != nullptr)
|
||||
{
|
||||
op->m_awaiting_coroutine.resume();
|
||||
m_size.fetch_sub(1, std::memory_order::relaxed);
|
||||
m_size.fetch_sub(1, std::memory_order_relaxed);
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if(m_shutdown_requested.load(std::memory_order::relaxed))
|
||||
if(stop_token.stop_requested())
|
||||
{
|
||||
break; // while(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto thread_pool::join() -> void
|
||||
{
|
||||
for(auto& thread : m_threads)
|
||||
if(m_opts.on_thread_stop_functor != nullptr)
|
||||
{
|
||||
thread.join();
|
||||
m_opts.on_thread_stop_functor(idx);
|
||||
}
|
||||
m_threads.clear();
|
||||
}
|
||||
|
||||
auto thread_pool::schedule_impl(operation* op) -> void
|
||||
auto thread_pool::schedule_impl(operation* op) noexcept -> void
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lk{m_queue_mutex};
|
||||
m_queue.emplace_back(op);
|
||||
}
|
||||
|
||||
m_queue_cv.notify_one();
|
||||
m_wait_cv.notify_one();
|
||||
}
|
||||
|
||||
} // namespace coro
|
||||
|
|
|
@ -90,6 +90,70 @@ TEST_CASE("benchmark counter func coro::sync_wait(coro::when_all_awaitable(await
|
|||
REQUIRE(counter == iterations);
|
||||
}
|
||||
|
||||
TEST_CASE("benchmark thread_pool{1} counter task")
|
||||
{
|
||||
constexpr std::size_t iterations = default_iterations;
|
||||
|
||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||
std::atomic<uint64_t> counter{0};
|
||||
|
||||
auto make_task = [](coro::thread_pool& tp, std::atomic<uint64_t>& c) -> coro::task<void>
|
||||
{
|
||||
co_await tp.schedule().value();
|
||||
c.fetch_add(1, std::memory_order::relaxed);
|
||||
co_return;
|
||||
};
|
||||
|
||||
std::vector<coro::task<void>> tasks;
|
||||
tasks.reserve(iterations);
|
||||
|
||||
auto start = sc::now();
|
||||
|
||||
for (std::size_t i = 0; i < iterations; ++i)
|
||||
{
|
||||
tasks.emplace_back(make_task(tp, counter));
|
||||
tasks.back().resume();
|
||||
}
|
||||
|
||||
tp.shutdown();
|
||||
|
||||
print_stats("benchmark thread_pool{1} counter task", iterations, start, sc::now());
|
||||
REQUIRE(counter == iterations);
|
||||
REQUIRE(tp.empty());
|
||||
}
|
||||
|
||||
TEST_CASE("benchmark thread_pool{2} counter task")
|
||||
{
|
||||
constexpr std::size_t iterations = default_iterations;
|
||||
|
||||
coro::thread_pool tp{coro::thread_pool::options{2}};
|
||||
std::atomic<uint64_t> counter{0};
|
||||
|
||||
auto make_task = [](coro::thread_pool& tp, std::atomic<uint64_t>& c) -> coro::task<void>
|
||||
{
|
||||
co_await tp.schedule().value();
|
||||
c.fetch_add(1, std::memory_order::relaxed);
|
||||
co_return;
|
||||
};
|
||||
|
||||
std::vector<coro::task<void>> tasks;
|
||||
tasks.reserve(iterations);
|
||||
|
||||
auto start = sc::now();
|
||||
|
||||
for (std::size_t i = 0; i < iterations; ++i)
|
||||
{
|
||||
tasks.emplace_back(make_task(tp, counter));
|
||||
tasks.back().resume();
|
||||
}
|
||||
|
||||
tp.shutdown();
|
||||
|
||||
print_stats("benchmark thread_pool{n} counter task", iterations, start, sc::now());
|
||||
REQUIRE(counter == iterations);
|
||||
REQUIRE(tp.empty());
|
||||
}
|
||||
|
||||
TEST_CASE("benchmark counter task scheduler")
|
||||
{
|
||||
constexpr std::size_t iterations = default_iterations;
|
||||
|
|
|
@ -48,3 +48,14 @@ TEST_CASE("sync_wait task co_await single")
|
|||
auto output = coro::sync_wait(await_answer());
|
||||
REQUIRE(output == 1337);
|
||||
}
|
||||
|
||||
TEST_CASE("sync_wait task that throws")
|
||||
{
|
||||
auto f = []() -> coro::task<uint64_t>
|
||||
{
|
||||
throw std::runtime_error("I always throw!");
|
||||
co_return 1;
|
||||
};
|
||||
|
||||
REQUIRE_THROWS(coro::sync_wait(f()));
|
||||
}
|
||||
|
|
|
@ -4,9 +4,9 @@
|
|||
|
||||
#include <iostream>
|
||||
|
||||
TEST_CASE("thread_pool one worker, one task")
|
||||
TEST_CASE("thread_pool one worker one task")
|
||||
{
|
||||
coro::thread_pool tp{1};
|
||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||
|
||||
auto func = [&tp]() -> coro::task<uint64_t>
|
||||
{
|
||||
|
@ -18,9 +18,9 @@ TEST_CASE("thread_pool one worker, one task")
|
|||
REQUIRE(result == 42);
|
||||
}
|
||||
|
||||
TEST_CASE("thread_pool one worker, many tasks tuple")
|
||||
TEST_CASE("thread_pool one worker many tasks tuple")
|
||||
{
|
||||
coro::thread_pool tp{1};
|
||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||
|
||||
auto f = [&tp]() -> coro::task<uint64_t>
|
||||
{
|
||||
|
@ -40,9 +40,9 @@ TEST_CASE("thread_pool one worker, many tasks tuple")
|
|||
REQUIRE(counter == 250);
|
||||
}
|
||||
|
||||
TEST_CASE("thread_pool one worker, many tasks vector")
|
||||
TEST_CASE("thread_pool one worker many tasks vector")
|
||||
{
|
||||
coro::thread_pool tp{1};
|
||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||
|
||||
auto f = [&tp]() -> coro::task<uint64_t>
|
||||
{
|
||||
|
@ -68,7 +68,7 @@ TEST_CASE("thread_pool one worker, many tasks vector")
|
|||
REQUIRE(counter == 150);
|
||||
}
|
||||
|
||||
TEST_CASE("thread_pool N workers, 1 million tasks")
|
||||
TEST_CASE("thread_pool N workers 100k tasks")
|
||||
{
|
||||
constexpr const std::size_t iterations = 100'000;
|
||||
coro::thread_pool tp{};
|
||||
|
@ -98,9 +98,9 @@ TEST_CASE("thread_pool N workers, 1 million tasks")
|
|||
REQUIRE(counter == iterations);
|
||||
}
|
||||
|
||||
TEST_CASE("thread pool 1 worker, task spawns another task")
|
||||
TEST_CASE("thread_pool 1 worker task spawns another task")
|
||||
{
|
||||
coro::thread_pool tp{};
|
||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||
|
||||
auto f1 = [](coro::thread_pool& tp) -> coro::task<uint64_t>
|
||||
{
|
||||
|
@ -117,3 +117,53 @@ TEST_CASE("thread pool 1 worker, task spawns another task")
|
|||
|
||||
REQUIRE(coro::sync_wait(f1(tp)) == 6);
|
||||
}
|
||||
|
||||
TEST_CASE("thread_pool shutdown")
|
||||
{
|
||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||
|
||||
auto f = [](coro::thread_pool& tp) -> coro::task<bool>
|
||||
{
|
||||
auto scheduled = tp.schedule();
|
||||
if(!scheduled.has_value())
|
||||
{
|
||||
co_return true;
|
||||
}
|
||||
|
||||
co_await scheduled.value();
|
||||
co_return false;
|
||||
};
|
||||
|
||||
tp.shutdown(coro::shutdown_t::async);
|
||||
|
||||
REQUIRE(coro::sync_wait(f(tp)) == true);
|
||||
}
|
||||
|
||||
TEST_CASE("thread_pool schedule functor")
|
||||
{
|
||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||
|
||||
auto f = []() -> uint64_t { return 1; };
|
||||
|
||||
auto result = coro::sync_wait(tp.schedule(f));
|
||||
REQUIRE(result == 1);
|
||||
|
||||
tp.shutdown();
|
||||
|
||||
REQUIRE_THROWS(coro::sync_wait(tp.schedule(f)));
|
||||
}
|
||||
|
||||
TEST_CASE("thread_pool schedule functor return_type = void")
|
||||
{
|
||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||
|
||||
std::atomic<uint64_t> counter{0};
|
||||
auto f = [](std::atomic<uint64_t>& c) -> void { c++; };
|
||||
|
||||
coro::sync_wait(tp.schedule(f, std::ref(counter)));
|
||||
REQUIRE(counter == 1);
|
||||
|
||||
tp.shutdown();
|
||||
|
||||
REQUIRE_THROWS(coro::sync_wait(tp.schedule(f, std::ref(counter))));
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue