1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

Add coro::thread_pool (#10)

Closes #9
This commit is contained in:
Josh Baldwin 2020-10-26 11:51:24 -06:00 committed by GitHub
parent c548433dd9
commit 33df116b40
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 151 additions and 56 deletions

View file

@ -15,8 +15,8 @@ namespace coro
* await_resume() -> decltype(auto)
* Where the return type on await_resume is the requested return of the awaitable.
*/
template<typename T>
concept awaiter_type = requires(T t, std::coroutine_handle<> c)
template<typename type>
concept awaiter = requires(type t, std::coroutine_handle<> c)
{
{ t.await_ready() } -> std::same_as<bool>;
std::same_as<decltype(t.await_suspend(c)), void> ||
@ -28,30 +28,29 @@ concept awaiter_type = requires(T t, std::coroutine_handle<> c)
/**
* This concept declares a type that can be operator co_await()'ed and returns an awaiter_type.
*/
template<typename T>
concept awaitable_type = requires(T t)
template<typename type>
concept awaitable = requires(type t)
{
// operator co_await()
{ t.operator co_await() } -> awaiter_type;
{ t.operator co_await() } -> awaiter;
};
template<awaitable_type awaitable, typename = void>
template<awaitable awaitable, typename = void>
struct awaitable_traits
{
};
template<typename T>
static auto get_awaiter(T&& value)
template<awaitable awaitable>
static auto get_awaiter(awaitable&& value)
{
return static_cast<T&&>(value).operator co_await();
return static_cast<awaitable&&>(value).operator co_await();
}
template<awaitable_type awaitable>
template<awaitable awaitable>
struct awaitable_traits<awaitable>
{
using awaiter_t = decltype(get_awaiter(std::declval<awaitable>()));
using awaiter_return_t = decltype(std::declval<awaiter_t>().await_resume());
// using awaiter_return_decay_t = std::decay_t<decltype(std::declval<awaiter_t>().await_resume())>;
using awaiter_type = decltype(get_awaiter(std::declval<awaitable>()));
using awaiter_return_type = decltype(std::declval<awaiter_type>().await_resume());
};
} // namespace coro

View file

@ -7,15 +7,15 @@
namespace coro
{
template<typename T, typename return_type>
concept promise_type = requires(T t)
template<typename type, typename return_type>
concept promise_type = requires(type t)
{
{ t.get_return_object() } -> std::convertible_to<std::coroutine_handle<>>;
{ t.initial_suspend() } -> awaiter_type;
{ t.final_suspend() } -> awaiter_type;
{ t.yield_value() } -> awaitable_type;
{ t.initial_suspend() } -> awaiter;
{ t.final_suspend() } -> awaiter;
{ t.yield_value() } -> awaitable;
} &&
requires(T t, return_type return_value)
requires(type t, return_type return_value)
{
std::same_as<decltype(t.return_void()), void> ||
std::same_as<decltype(t.return_value(return_value)), void> ||

View file

@ -205,7 +205,7 @@ private:
};
template<awaitable_type awaitable, typename return_type = awaitable_traits<awaitable>::awaiter_return_t>
template<awaitable awaitable, typename return_type = awaitable_traits<awaitable>::awaiter_return_type>
static auto make_sync_wait_task(awaitable&& a) -> sync_wait_task<return_type>
{
if constexpr (std::is_void_v<return_type>)
@ -221,7 +221,7 @@ static auto make_sync_wait_task(awaitable&& a) -> sync_wait_task<return_type>
} // namespace detail
template<awaitable_type awaitable>
template<awaitable awaitable>
auto sync_wait(awaitable&& a) -> decltype(auto)
{
detail::sync_wait_event e{};

View file

@ -140,9 +140,9 @@ public:
task() noexcept : m_coroutine(nullptr) {}
task(coroutine_handle handle) : m_coroutine(handle) {}
explicit task(coroutine_handle handle) : m_coroutine(handle) {}
task(const task&) = delete;
task(task&& other) noexcept : m_coroutine(other.m_coroutine) { other.m_coroutine = nullptr; }
task(task&& other) noexcept : m_coroutine(std::exchange(other.m_coroutine, nullptr)) { }
~task()
{
@ -153,7 +153,8 @@ public:
}
auto operator=(const task&) -> task& = delete;
auto operator =(task&& other) noexcept -> task&
auto operator=(task&& other) noexcept -> task&
{
if (std::addressof(other) != this)
{
@ -162,8 +163,7 @@ public:
m_coroutine.destroy();
}
m_coroutine = other.m_coroutine;
other.m_coroutine = nullptr;
m_coroutine = std::exchange(other.m_coroutine, nullptr);
}
return *this;

View file

@ -27,9 +27,9 @@ public:
public:
explicit operation(thread_pool& tp) noexcept;
auto await_ready() noexcept -> bool { std::cerr << "thread_pool::operation::await_ready()\n"; return false; }
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool;
auto await_resume() noexcept -> void { std::cerr << "thread_pool::operation::await_resume()\n";/* no-op */ }
auto await_ready() noexcept -> bool { return false; }
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void;
auto await_resume() noexcept -> void { /* no-op */ }
private:
thread_pool& m_thread_pool;
std::coroutine_handle<> m_awaiting_coroutine{nullptr};

View file

@ -497,7 +497,7 @@ private:
coroutine_handle_type m_coroutine;
};
template<awaitable_type awaitable, typename return_type = awaitable_traits<awaitable&&>::awaiter_return_t>
template<awaitable awaitable, typename return_type = awaitable_traits<awaitable&&>::awaiter_return_type>
static auto make_when_all_task(awaitable a) -> when_all_task<return_type>
{
if constexpr (std::is_void_v<return_type>)
@ -513,21 +513,21 @@ static auto make_when_all_task(awaitable a) -> when_all_task<return_type>
} // namespace detail
template<awaitable_type... awaitables_type>
template<awaitable... awaitables_type>
[[nodiscard]] auto when_all_awaitable(awaitables_type&&... awaitables)
{
return
detail::when_all_ready_awaitable<
std::tuple<
detail::when_all_task<
typename awaitable_traits<awaitables_type>::awaiter_return_t
typename awaitable_traits<awaitables_type>::awaiter_return_type
>...
>
>(std::make_tuple(detail::make_when_all_task(std::forward<awaitables_type>(awaitables))...));
}
template<awaitable_type awaitable, typename return_type = awaitable_traits<awaitable>::awaiter_return_t>
[[nodiscard]] auto when_all_awaitable(std::vector<awaitable>&& awaitables) -> detail::when_all_ready_awaitable<std::vector<detail::when_all_task<return_type>>>
template<awaitable awaitable, typename return_type = awaitable_traits<awaitable>::awaiter_return_type>
[[nodiscard]] auto when_all_awaitable(std::vector<awaitable>& awaitables) -> detail::when_all_ready_awaitable<std::vector<detail::when_all_task<return_type>>>
{
std::vector<detail::when_all_task<return_type>> tasks;
tasks.reserve(std::size(awaitables));

View file

@ -9,12 +9,14 @@ thread_pool::operation::operation(thread_pool& tp) noexcept
}
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
{
std::cerr << "thread_pool::operation::await_suspend()\n";
m_awaiting_coroutine = awaiting_coroutine;
m_thread_pool.schedule_impl(this);
return false;
// void return on await_suspend suspends the _this_ coroutine, which is now scheduled on the
// thread pool and returns control to the caller. They could be sync_wait'ing or go do
// something else while this coroutine gets picked up by the thread pool.
}
thread_pool::thread_pool(uint32_t thread_count)
@ -37,7 +39,6 @@ thread_pool::~thread_pool()
auto thread_pool::schedule() noexcept -> std::optional<operation>
{
std::cerr << "thread_pool::schedule()\n";
if(!m_shutdown_requested.load(std::memory_order::relaxed))
{
m_size.fetch_add(1, std::memory_order::relaxed);
@ -77,7 +78,6 @@ auto thread_pool::run(uint32_t worker_idx) -> void
std::lock_guard<std::mutex> lk{m_queue_mutex};
if(!m_queue.empty())
{
std::cerr << "thread_pool::run m_queue.pop_front()\n";
op = m_queue.front();
m_queue.pop_front();
}
@ -112,7 +112,6 @@ auto thread_pool::join() -> void
auto thread_pool::schedule_impl(operation* op) -> void
{
std::cerr << "thread_pool::schedule_impl()\n";
{
std::lock_guard<std::mutex> lk{m_queue_mutex};
m_queue.emplace_back(op);

View file

@ -4,19 +4,116 @@
#include <iostream>
// TEST_CASE("thread_pool one worker, one task")
// {
// coro::thread_pool tp{1};
TEST_CASE("thread_pool one worker, one task")
{
coro::thread_pool tp{1};
// auto func = [&tp]() -> coro::task<uint64_t>
// {
// std::cerr << "func()\n";
// co_await tp.schedule().value(); // Schedule this coroutine on the scheduler.
// std::cerr << "func co_return 42\n";
// co_return 42;
// };
auto func = [&tp]() -> coro::task<uint64_t>
{
co_await tp.schedule().value(); // Schedule this coroutine on the scheduler.
co_return 42;
};
// std::cerr << "coro::sync_wait(func()) start\n";
// coro::sync_wait(func());
// std::cerr << "coro::sync_wait(func()) end\n";
// }
auto result = coro::sync_wait(func());
REQUIRE(result == 42);
}
TEST_CASE("thread_pool one worker, many tasks tuple")
{
coro::thread_pool tp{1};
auto f = [&tp]() -> coro::task<uint64_t>
{
co_await tp.schedule().value(); // Schedule this coroutine on the scheduler.
co_return 50;
};
auto tasks = coro::sync_wait(coro::when_all_awaitable(f(), f(), f(), f(), f()));
REQUIRE(std::tuple_size<decltype(tasks)>() == 5);
uint64_t counter{0};
std::apply([&counter](auto&&... t) -> void {
((counter += t.return_value()), ...);
},
tasks);
REQUIRE(counter == 250);
}
TEST_CASE("thread_pool one worker, many tasks vector")
{
coro::thread_pool tp{1};
auto f = [&tp]() -> coro::task<uint64_t>
{
co_await tp.schedule().value(); // Schedule this coroutine on the scheduler.
co_return 50;
};
std::vector<coro::task<uint64_t>> input_tasks;
input_tasks.emplace_back(f());
input_tasks.emplace_back(f());
input_tasks.emplace_back(f());
auto output_tasks = coro::sync_wait(coro::when_all_awaitable(input_tasks));
REQUIRE(output_tasks.size() == 3);
uint64_t counter{0};
for(const auto& task : output_tasks)
{
counter += task.return_value();
}
REQUIRE(counter == 150);
}
TEST_CASE("thread_pool N workers, 1 million tasks")
{
constexpr const std::size_t iterations = 100'000;
coro::thread_pool tp{};
auto make_task = [](coro::thread_pool& tp) -> coro::task<uint64_t>
{
co_await tp.schedule().value();
co_return 1;
};
std::vector<coro::task<uint64_t>> input_tasks{};
input_tasks.reserve(iterations);
for(std::size_t i = 0; i < iterations; ++i)
{
input_tasks.emplace_back(make_task(tp));
}
auto output_tasks = coro::sync_wait(coro::when_all_awaitable(input_tasks));
REQUIRE(output_tasks.size() == iterations);
uint64_t counter{0};
for(const auto& task : output_tasks)
{
counter += task.return_value();
}
REQUIRE(counter == iterations);
}
TEST_CASE("thread pool 1 worker, task spawns another task")
{
coro::thread_pool tp{};
auto f1 = [](coro::thread_pool& tp) -> coro::task<uint64_t>
{
co_await tp.schedule().value();
auto f2 = [](coro::thread_pool& tp) -> coro::task<uint64_t>
{
co_await tp.schedule().value();
co_return 5;
};
co_return 1 + co_await f2(tp);
};
REQUIRE(coro::sync_wait(f1(tp)) == 6);
}

View file

@ -49,7 +49,7 @@ TEST_CASE("when_all_awaitable single task with vector container")
std::vector<coro::task<uint64_t>> input_tasks;
input_tasks.emplace_back(make_task(100));
auto output_tasks = coro::sync_wait(coro::when_all_awaitable(std::move(input_tasks)));
auto output_tasks = coro::sync_wait(coro::when_all_awaitable(input_tasks));
REQUIRE(output_tasks.size() == 1);
uint64_t counter{0};
@ -73,7 +73,7 @@ TEST_CASE("when_all_ready multple task withs vector container")
input_tasks.emplace_back(make_task(550));
input_tasks.emplace_back(make_task(1000));
auto output_tasks = coro::sync_wait(coro::when_all_awaitable(std::move(input_tasks)));
auto output_tasks = coro::sync_wait(coro::when_all_awaitable(input_tasks));
REQUIRE(output_tasks.size() == 4);
uint64_t counter{0};