mirror of
https://gitlab.com/niansa/libcrosscoro.git
synced 2025-03-06 20:53:32 +01:00
Scheduler now correctly co_await's the user tasks from cleanup task (#14)
Previously it set the continuation manually, which sort of works but is not canonical.
This commit is contained in:
parent
2fb6624c48
commit
76b41a6ca0
16 changed files with 277 additions and 405 deletions
|
@ -7,7 +7,6 @@
|
||||||
|
|
||||||
namespace coro
|
namespace coro
|
||||||
{
|
{
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This concept declares a type that is required to meet the c++20 coroutine operator co_await()
|
* This concept declares a type that is required to meet the c++20 coroutine operator co_await()
|
||||||
* retun type. It requires the following three member functions:
|
* retun type. It requires the following three member functions:
|
||||||
|
@ -19,11 +18,13 @@ namespace coro
|
||||||
template<typename type>
|
template<typename type>
|
||||||
concept awaiter = requires(type t, std::coroutine_handle<> c)
|
concept awaiter = requires(type t, std::coroutine_handle<> c)
|
||||||
{
|
{
|
||||||
{ t.await_ready() } -> std::same_as<bool>;
|
{
|
||||||
std::same_as<decltype(t.await_suspend(c)), void> ||
|
t.await_ready()
|
||||||
std::same_as<decltype(t.await_suspend(c)), bool> ||
|
}
|
||||||
|
->std::same_as<bool>;
|
||||||
|
std::same_as<decltype(t.await_suspend(c)), void> || std::same_as<decltype(t.await_suspend(c)), bool> ||
|
||||||
std::same_as<decltype(t.await_suspend(c)), std::coroutine_handle<>>;
|
std::same_as<decltype(t.await_suspend(c)), std::coroutine_handle<>>;
|
||||||
{ t.await_resume() };
|
{t.await_resume()};
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -33,7 +34,10 @@ template<typename type>
|
||||||
concept awaitable = requires(type t)
|
concept awaitable = requires(type t)
|
||||||
{
|
{
|
||||||
// operator co_await()
|
// operator co_await()
|
||||||
{ t.operator co_await() } -> awaiter;
|
{
|
||||||
|
t.operator co_await()
|
||||||
|
}
|
||||||
|
->awaiter;
|
||||||
};
|
};
|
||||||
|
|
||||||
template<awaitable awaitable, typename = void>
|
template<awaitable awaitable, typename = void>
|
||||||
|
@ -50,7 +54,7 @@ static auto get_awaiter(awaitable&& value)
|
||||||
template<awaitable awaitable>
|
template<awaitable awaitable>
|
||||||
struct awaitable_traits<awaitable>
|
struct awaitable_traits<awaitable>
|
||||||
{
|
{
|
||||||
using awaiter_type = decltype(get_awaiter(std::declval<awaitable>()));
|
using awaiter_type = decltype(get_awaiter(std::declval<awaitable>()));
|
||||||
using awaiter_return_type = decltype(std::declval<awaiter_type>().await_resume());
|
using awaiter_return_type = decltype(std::declval<awaiter_type>().await_resume());
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,8 @@
|
||||||
|
|
||||||
namespace coro::detail
|
namespace coro::detail
|
||||||
{
|
{
|
||||||
|
struct void_value
|
||||||
|
{
|
||||||
|
};
|
||||||
|
|
||||||
struct void_value{};
|
} // namespace coro::detail
|
||||||
|
|
||||||
} // coro::detail
|
|
||||||
|
|
|
@ -6,20 +6,33 @@
|
||||||
|
|
||||||
namespace coro
|
namespace coro
|
||||||
{
|
{
|
||||||
|
|
||||||
template<typename type, typename return_type>
|
template<typename type, typename return_type>
|
||||||
concept promise_type = requires(type t)
|
concept promise_type = requires(type t)
|
||||||
{
|
{
|
||||||
{ t.get_return_object() } -> std::convertible_to<std::coroutine_handle<>>;
|
{
|
||||||
{ t.initial_suspend() } -> awaiter;
|
t.get_return_object()
|
||||||
{ t.final_suspend() } -> awaiter;
|
}
|
||||||
{ t.yield_value() } -> awaitable;
|
->std::convertible_to<std::coroutine_handle<>>;
|
||||||
} &&
|
{
|
||||||
requires(type t, return_type return_value)
|
t.initial_suspend()
|
||||||
|
}
|
||||||
|
->awaiter;
|
||||||
|
{
|
||||||
|
t.final_suspend()
|
||||||
|
}
|
||||||
|
->awaiter;
|
||||||
|
{
|
||||||
|
t.yield_value()
|
||||||
|
}
|
||||||
|
->awaitable;
|
||||||
|
}
|
||||||
|
&&requires(type t, return_type return_value)
|
||||||
{
|
{
|
||||||
std::same_as<decltype(t.return_void()), void> ||
|
std::same_as<decltype(t.return_void()), void> || std::same_as<decltype(t.return_value(return_value)), void> ||
|
||||||
std::same_as<decltype(t.return_value(return_value)), void> ||
|
requires
|
||||||
requires { t.yield_value(return_value); };
|
{
|
||||||
|
t.yield_value(return_value);
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace coro
|
} // namespace coro
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "coro/task.hpp"
|
|
||||||
#include "coro/shutdown.hpp"
|
#include "coro/shutdown.hpp"
|
||||||
|
#include "coro/task.hpp"
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <coroutine>
|
#include <coroutine>
|
||||||
|
@ -182,14 +182,6 @@ private:
|
||||||
template<typename return_type>
|
template<typename return_type>
|
||||||
friend class resume_token;
|
friend class resume_token;
|
||||||
|
|
||||||
struct task_data
|
|
||||||
{
|
|
||||||
/// The user's task, lifetime is maintained by the scheduler.
|
|
||||||
coro::task<void> m_user_task;
|
|
||||||
/// The post processing cleanup tasks to remove a completed task from the scheduler.
|
|
||||||
coro::task<void> m_cleanup_task;
|
|
||||||
};
|
|
||||||
|
|
||||||
class task_manager
|
class task_manager
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
@ -210,8 +202,9 @@ private:
|
||||||
* as deleted upon the coroutines completion.
|
* as deleted upon the coroutines completion.
|
||||||
* @param user_task The scheduled user's task to store since it has suspended after its
|
* @param user_task The scheduled user's task to store since it has suspended after its
|
||||||
* first execution.
|
* first execution.
|
||||||
|
* @return The task just stored wrapped in the self cleanup task.
|
||||||
*/
|
*/
|
||||||
auto store(coro::task<void> user_task) -> void
|
auto store(coro::task<void> user_task) -> task<void>&
|
||||||
{
|
{
|
||||||
// Only grow if completely full and attempting to add more.
|
// Only grow if completely full and attempting to add more.
|
||||||
if (m_free_pos == m_task_indexes.end())
|
if (m_free_pos == m_task_indexes.end())
|
||||||
|
@ -219,17 +212,14 @@ private:
|
||||||
m_free_pos = grow();
|
m_free_pos = grow();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the user task with its cleanup task to maintain their lifetimes until completed.
|
// Store the task inside a cleanup task for self deletion.
|
||||||
auto index = *m_free_pos;
|
auto index = *m_free_pos;
|
||||||
auto& task_data = m_tasks[index];
|
m_tasks[index] = make_cleanup_task(std::move(user_task), m_free_pos);
|
||||||
task_data.m_user_task = std::move(user_task);
|
|
||||||
task_data.m_cleanup_task = cleanup_func(m_free_pos);
|
|
||||||
|
|
||||||
// Attach the cleanup task to be the continuation after the users task.
|
|
||||||
task_data.m_user_task.promise().continuation(task_data.m_cleanup_task.handle());
|
|
||||||
|
|
||||||
// Mark the current used slot as used.
|
// Mark the current used slot as used.
|
||||||
std::advance(m_free_pos, 1);
|
std::advance(m_free_pos, 1);
|
||||||
|
|
||||||
|
return m_tasks[index];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -294,20 +284,34 @@ private:
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Each task the user schedules has this task chained as a continuation to execute after
|
* Encapsulate the users tasks in a cleanup task which marks itself for deletion upon
|
||||||
* the user's task completes. This function takes the task position in the indexes list
|
* completion. Simply co_await the users task until its completed and then mark the given
|
||||||
* and upon execution marks that slot for deletion. It cannot self delete otherwise it
|
* position within the task manager as being deletable. The scheduler's next iteration
|
||||||
* would corrupt/double free its own coroutine stack frame.
|
* in its event loop will then free that position up to be re-used.
|
||||||
|
*
|
||||||
|
* This function will also unconditionally catch all unhandled exceptions by the user's
|
||||||
|
* task to prevent the scheduler from throwing exceptions.
|
||||||
|
* @param user_task The user's task.
|
||||||
|
* @param pos The position where the task data will be stored in the task manager.
|
||||||
|
* @return The user's task wrapped in a self cleanup task.
|
||||||
*/
|
*/
|
||||||
auto cleanup_func(task_position pos) -> coro::task<void>
|
auto make_cleanup_task(task<void> user_task, task_position pos) -> task<void>
|
||||||
{
|
{
|
||||||
// Mark this task for deletion, it cannot delete itself.
|
try
|
||||||
|
{
|
||||||
|
co_await user_task;
|
||||||
|
}
|
||||||
|
catch (const std::runtime_error& e)
|
||||||
|
{
|
||||||
|
std::cerr << "scheduler user_task had an unhandled exception e.what()= " << e.what() << "\n";
|
||||||
|
}
|
||||||
|
|
||||||
m_tasks_to_delete.push_back(pos);
|
m_tasks_to_delete.push_back(pos);
|
||||||
co_return;
|
co_return;
|
||||||
};
|
}
|
||||||
|
|
||||||
/// Maintains the lifetime of the tasks until they are completed.
|
/// Maintains the lifetime of the tasks until they are completed.
|
||||||
std::vector<task_data> m_tasks{};
|
std::vector<task<void>> m_tasks{};
|
||||||
/// The full set of indexes into `m_tasks`.
|
/// The full set of indexes into `m_tasks`.
|
||||||
std::list<std::size_t> m_task_indexes{};
|
std::list<std::size_t> m_task_indexes{};
|
||||||
/// The set of tasks that have completed and need to be deleted.
|
/// The set of tasks that have completed and need to be deleted.
|
||||||
|
@ -732,44 +736,19 @@ private:
|
||||||
static constexpr std::size_t m_max_events = 8;
|
static constexpr std::size_t m_max_events = 8;
|
||||||
std::array<struct epoll_event, m_max_events> m_events{};
|
std::array<struct epoll_event, m_max_events> m_events{};
|
||||||
|
|
||||||
auto task_start(coro::task<void>& task) -> void
|
|
||||||
{
|
|
||||||
if (!task.is_ready()) // sanity check, the user could have manually resumed.
|
|
||||||
{
|
|
||||||
// Attempt to process the task synchronously before suspending.
|
|
||||||
task.resume();
|
|
||||||
|
|
||||||
if (!task.is_ready())
|
|
||||||
{
|
|
||||||
m_task_manager.store(std::move(task));
|
|
||||||
// This task is now suspended waiting for an event.
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// This task completed synchronously.
|
|
||||||
m_size.fetch_sub(1, std::memory_order::relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
m_size.fetch_sub(1, std::memory_order::relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
inline auto process_task_variant(task_variant& tv) -> void
|
inline auto process_task_variant(task_variant& tv) -> void
|
||||||
{
|
{
|
||||||
if (std::holds_alternative<coro::task<void>>(tv))
|
if (std::holds_alternative<coro::task<void>>(tv))
|
||||||
{
|
{
|
||||||
auto& task = std::get<coro::task<void>>(tv);
|
auto& task = std::get<coro::task<void>>(tv);
|
||||||
task_start(task);
|
// Store the users task and immediately start executing it.
|
||||||
|
m_task_manager.store(std::move(task)).resume();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
auto handle = std::get<std::coroutine_handle<>>(tv);
|
auto handle = std::get<std::coroutine_handle<>>(tv);
|
||||||
if (!handle.done())
|
// The cleanup wrapper task will catch all thrown exceptions unconditionally.
|
||||||
{
|
handle.resume();
|
||||||
handle.resume();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
|
|
||||||
namespace coro
|
namespace coro
|
||||||
{
|
{
|
||||||
|
|
||||||
enum class shutdown_t
|
enum class shutdown_t
|
||||||
{
|
{
|
||||||
/// Synchronously wait for all tasks to complete when calling shutdown.
|
/// Synchronously wait for all tasks to complete when calling shutdown.
|
||||||
|
|
|
@ -2,32 +2,31 @@
|
||||||
|
|
||||||
#include "coro/awaitable.hpp"
|
#include "coro/awaitable.hpp"
|
||||||
|
|
||||||
#include <mutex>
|
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
namespace coro
|
namespace coro
|
||||||
{
|
{
|
||||||
|
|
||||||
namespace detail
|
namespace detail
|
||||||
{
|
{
|
||||||
|
|
||||||
class sync_wait_event
|
class sync_wait_event
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
sync_wait_event(bool initially_set = false);
|
sync_wait_event(bool initially_set = false);
|
||||||
sync_wait_event(const sync_wait_event&) = delete;
|
sync_wait_event(const sync_wait_event&) = delete;
|
||||||
sync_wait_event(sync_wait_event&&) = delete;
|
sync_wait_event(sync_wait_event&&) = delete;
|
||||||
auto operator=(const sync_wait_event&) -> sync_wait_event& = delete;
|
auto operator=(const sync_wait_event&) -> sync_wait_event& = delete;
|
||||||
auto operator=(sync_wait_event&&) -> sync_wait_event& = delete;
|
auto operator=(sync_wait_event &&) -> sync_wait_event& = delete;
|
||||||
~sync_wait_event() = default;
|
~sync_wait_event() = default;
|
||||||
|
|
||||||
auto set() noexcept -> void;
|
auto set() noexcept -> void;
|
||||||
auto reset() noexcept -> void;
|
auto reset() noexcept -> void;
|
||||||
auto wait() noexcept -> void;
|
auto wait() noexcept -> void;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::mutex m_mutex;
|
std::mutex m_mutex;
|
||||||
std::condition_variable m_cv;
|
std::condition_variable m_cv;
|
||||||
bool m_set{false};
|
bool m_set{false};
|
||||||
};
|
};
|
||||||
|
|
||||||
class sync_wait_task_promise_base
|
class sync_wait_task_promise_base
|
||||||
|
@ -36,17 +35,12 @@ public:
|
||||||
sync_wait_task_promise_base() noexcept = default;
|
sync_wait_task_promise_base() noexcept = default;
|
||||||
virtual ~sync_wait_task_promise_base() = default;
|
virtual ~sync_wait_task_promise_base() = default;
|
||||||
|
|
||||||
auto initial_suspend() noexcept -> std::suspend_always
|
auto initial_suspend() noexcept -> std::suspend_always { return {}; }
|
||||||
{
|
|
||||||
return {};
|
auto unhandled_exception() -> void { m_exception = std::current_exception(); }
|
||||||
}
|
|
||||||
|
|
||||||
auto unhandled_exception() -> void
|
|
||||||
{
|
|
||||||
m_exception = std::current_exception();
|
|
||||||
}
|
|
||||||
protected:
|
protected:
|
||||||
sync_wait_event* m_event{nullptr};
|
sync_wait_event* m_event{nullptr};
|
||||||
std::exception_ptr m_exception;
|
std::exception_ptr m_exception;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -56,7 +50,7 @@ class sync_wait_task_promise : public sync_wait_task_promise_base
|
||||||
public:
|
public:
|
||||||
using coroutine_type = std::coroutine_handle<sync_wait_task_promise<return_type>>;
|
using coroutine_type = std::coroutine_handle<sync_wait_task_promise<return_type>>;
|
||||||
|
|
||||||
sync_wait_task_promise() noexcept = default;
|
sync_wait_task_promise() noexcept = default;
|
||||||
~sync_wait_task_promise() override = default;
|
~sync_wait_task_promise() override = default;
|
||||||
|
|
||||||
auto start(sync_wait_event& event)
|
auto start(sync_wait_event& event)
|
||||||
|
@ -65,10 +59,7 @@ public:
|
||||||
coroutine_type::from_promise(*this).resume();
|
coroutine_type::from_promise(*this).resume();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto get_return_object() noexcept
|
auto get_return_object() noexcept { return coroutine_type::from_promise(*this); }
|
||||||
{
|
|
||||||
return coroutine_type::from_promise(*this);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto yield_value(return_type&& value) noexcept
|
auto yield_value(return_type&& value) noexcept
|
||||||
{
|
{
|
||||||
|
@ -81,11 +72,8 @@ public:
|
||||||
struct completion_notifier
|
struct completion_notifier
|
||||||
{
|
{
|
||||||
auto await_ready() const noexcept { return false; }
|
auto await_ready() const noexcept { return false; }
|
||||||
auto await_suspend(coroutine_type coroutine) const noexcept
|
auto await_suspend(coroutine_type coroutine) const noexcept { coroutine.promise().m_event->set(); }
|
||||||
{
|
auto await_resume() noexcept {};
|
||||||
coroutine.promise().m_event->set();
|
|
||||||
}
|
|
||||||
auto await_resume() noexcept { };
|
|
||||||
};
|
};
|
||||||
|
|
||||||
return completion_notifier{};
|
return completion_notifier{};
|
||||||
|
@ -93,7 +81,7 @@ public:
|
||||||
|
|
||||||
auto return_value() -> return_type&&
|
auto return_value() -> return_type&&
|
||||||
{
|
{
|
||||||
if(m_exception)
|
if (m_exception)
|
||||||
{
|
{
|
||||||
std::rethrow_exception(m_exception);
|
std::rethrow_exception(m_exception);
|
||||||
}
|
}
|
||||||
|
@ -105,13 +93,13 @@ private:
|
||||||
std::remove_reference_t<return_type>* m_return_value;
|
std::remove_reference_t<return_type>* m_return_value;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
template<>
|
template<>
|
||||||
class sync_wait_task_promise<void> : public sync_wait_task_promise_base
|
class sync_wait_task_promise<void> : public sync_wait_task_promise_base
|
||||||
{
|
{
|
||||||
using coroutine_type = std::coroutine_handle<sync_wait_task_promise<void>>;
|
using coroutine_type = std::coroutine_handle<sync_wait_task_promise<void>>;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
sync_wait_task_promise() noexcept = default;
|
sync_wait_task_promise() noexcept = default;
|
||||||
~sync_wait_task_promise() override = default;
|
~sync_wait_task_promise() override = default;
|
||||||
|
|
||||||
auto start(sync_wait_event& event)
|
auto start(sync_wait_event& event)
|
||||||
|
@ -120,31 +108,25 @@ public:
|
||||||
coroutine_type::from_promise(*this).resume();
|
coroutine_type::from_promise(*this).resume();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto get_return_object() noexcept
|
auto get_return_object() noexcept { return coroutine_type::from_promise(*this); }
|
||||||
{
|
|
||||||
return coroutine_type::from_promise(*this);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto final_suspend() noexcept
|
auto final_suspend() noexcept
|
||||||
{
|
{
|
||||||
struct completion_notifier
|
struct completion_notifier
|
||||||
{
|
{
|
||||||
auto await_ready() const noexcept { return false; }
|
auto await_ready() const noexcept { return false; }
|
||||||
auto await_suspend(coroutine_type coroutine) const noexcept
|
auto await_suspend(coroutine_type coroutine) const noexcept { coroutine.promise().m_event->set(); }
|
||||||
{
|
auto await_resume() noexcept {};
|
||||||
coroutine.promise().m_event->set();
|
|
||||||
}
|
|
||||||
auto await_resume() noexcept { };
|
|
||||||
};
|
};
|
||||||
|
|
||||||
return completion_notifier{};
|
return completion_notifier{};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto return_void() noexcept -> void { }
|
auto return_void() noexcept -> void {}
|
||||||
|
|
||||||
auto return_value()
|
auto return_value()
|
||||||
{
|
{
|
||||||
if(m_exception)
|
if (m_exception)
|
||||||
{
|
{
|
||||||
std::rethrow_exception(m_exception);
|
std::rethrow_exception(m_exception);
|
||||||
}
|
}
|
||||||
|
@ -155,25 +137,17 @@ template<typename return_type>
|
||||||
class sync_wait_task
|
class sync_wait_task
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
using promise_type = sync_wait_task_promise<return_type>;
|
using promise_type = sync_wait_task_promise<return_type>;
|
||||||
using coroutine_type = std::coroutine_handle<promise_type>;
|
using coroutine_type = std::coroutine_handle<promise_type>;
|
||||||
|
|
||||||
sync_wait_task(coroutine_type coroutine) noexcept
|
sync_wait_task(coroutine_type coroutine) noexcept : m_coroutine(coroutine) {}
|
||||||
: m_coroutine(coroutine)
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
sync_wait_task(const sync_wait_task&) = delete;
|
sync_wait_task(const sync_wait_task&) = delete;
|
||||||
sync_wait_task(sync_wait_task&& other) noexcept
|
sync_wait_task(sync_wait_task&& other) noexcept : m_coroutine(std::exchange(other.m_coroutine, coroutine_type{})) {}
|
||||||
: m_coroutine(std::exchange(other.m_coroutine, coroutine_type{}))
|
|
||||||
{
|
|
||||||
|
|
||||||
}
|
|
||||||
auto operator=(const sync_wait_task&) -> sync_wait_task& = delete;
|
auto operator=(const sync_wait_task&) -> sync_wait_task& = delete;
|
||||||
auto operator=(sync_wait_task&& other) -> sync_wait_task&
|
auto operator =(sync_wait_task&& other) -> sync_wait_task&
|
||||||
{
|
{
|
||||||
if(std::addressof(other) != this)
|
if (std::addressof(other) != this)
|
||||||
{
|
{
|
||||||
m_coroutine = std::exchange(other.m_coroutine, coroutine_type{});
|
m_coroutine = std::exchange(other.m_coroutine, coroutine_type{});
|
||||||
}
|
}
|
||||||
|
@ -183,16 +157,13 @@ public:
|
||||||
|
|
||||||
~sync_wait_task()
|
~sync_wait_task()
|
||||||
{
|
{
|
||||||
if(m_coroutine)
|
if (m_coroutine)
|
||||||
{
|
{
|
||||||
m_coroutine.destroy();
|
m_coroutine.destroy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto start(sync_wait_event& event) noexcept
|
auto start(sync_wait_event& event) noexcept { m_coroutine.promise().start(event); }
|
||||||
{
|
|
||||||
m_coroutine.promise().start(event);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto return_value() -> decltype(auto)
|
auto return_value() -> decltype(auto)
|
||||||
{
|
{
|
||||||
|
@ -211,7 +182,6 @@ private:
|
||||||
coroutine_type m_coroutine;
|
coroutine_type m_coroutine;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
template<awaitable awaitable, typename return_type = awaitable_traits<awaitable>::awaiter_return_type>
|
template<awaitable awaitable, typename return_type = awaitable_traits<awaitable>::awaiter_return_type>
|
||||||
static auto make_sync_wait_task(awaitable&& a) -> sync_wait_task<return_type>
|
static auto make_sync_wait_task(awaitable&& a) -> sync_wait_task<return_type>
|
||||||
{
|
{
|
||||||
|
@ -232,7 +202,7 @@ template<awaitable awaitable>
|
||||||
auto sync_wait(awaitable&& a) -> decltype(auto)
|
auto sync_wait(awaitable&& a) -> decltype(auto)
|
||||||
{
|
{
|
||||||
detail::sync_wait_event e{};
|
detail::sync_wait_event e{};
|
||||||
auto task = detail::make_sync_wait_task(std::forward<awaitable>(a));
|
auto task = detail::make_sync_wait_task(std::forward<awaitable>(a));
|
||||||
task.start(e);
|
task.start(e);
|
||||||
e.wait();
|
e.wait();
|
||||||
|
|
||||||
|
|
|
@ -143,7 +143,7 @@ public:
|
||||||
|
|
||||||
explicit task(coroutine_handle handle) : m_coroutine(handle) {}
|
explicit task(coroutine_handle handle) : m_coroutine(handle) {}
|
||||||
task(const task&) = delete;
|
task(const task&) = delete;
|
||||||
task(task&& other) noexcept : m_coroutine(std::exchange(other.m_coroutine, nullptr)) { }
|
task(task&& other) noexcept : m_coroutine(std::exchange(other.m_coroutine, nullptr)) {}
|
||||||
|
|
||||||
~task()
|
~task()
|
||||||
{
|
{
|
||||||
|
|
|
@ -4,18 +4,17 @@
|
||||||
#include "coro/task.hpp"
|
#include "coro/task.hpp"
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <vector>
|
|
||||||
#include <thread>
|
|
||||||
#include <mutex>
|
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <coroutine>
|
#include <coroutine>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <optional>
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
#include <mutex>
|
||||||
|
#include <optional>
|
||||||
|
#include <thread>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
namespace coro
|
namespace coro
|
||||||
{
|
{
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a thread pool that executes arbitrary coroutine tasks in a FIFO scheduler policy.
|
* Creates a thread pool that executes arbitrary coroutine tasks in a FIFO scheduler policy.
|
||||||
* The thread pool by default will create an execution thread per available core on the system.
|
* The thread pool by default will create an execution thread per available core on the system.
|
||||||
|
@ -39,6 +38,7 @@ public:
|
||||||
* @param tp The thread pool that created this operation.
|
* @param tp The thread pool that created this operation.
|
||||||
*/
|
*/
|
||||||
explicit operation(thread_pool& tp) noexcept;
|
explicit operation(thread_pool& tp) noexcept;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
/**
|
/**
|
||||||
* Operations always pause so the executing thread and be switched.
|
* Operations always pause so the executing thread and be switched.
|
||||||
|
@ -54,7 +54,8 @@ public:
|
||||||
/**
|
/**
|
||||||
* no-op as this is the function called first by the thread pool's executing thread.
|
* no-op as this is the function called first by the thread pool's executing thread.
|
||||||
*/
|
*/
|
||||||
auto await_resume() noexcept -> void { }
|
auto await_resume() noexcept -> void {}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// The thread pool that this operation will execute on.
|
/// The thread pool that this operation will execute on.
|
||||||
thread_pool& m_thread_pool;
|
thread_pool& m_thread_pool;
|
||||||
|
@ -76,16 +77,12 @@ public:
|
||||||
/**
|
/**
|
||||||
* @param opts Thread pool configuration options.
|
* @param opts Thread pool configuration options.
|
||||||
*/
|
*/
|
||||||
explicit thread_pool(options opts = options{
|
explicit thread_pool(options opts = options{std::thread::hardware_concurrency(), nullptr, nullptr});
|
||||||
std::thread::hardware_concurrency(),
|
|
||||||
nullptr,
|
|
||||||
nullptr
|
|
||||||
});
|
|
||||||
|
|
||||||
thread_pool(const thread_pool&) = delete;
|
thread_pool(const thread_pool&) = delete;
|
||||||
thread_pool(thread_pool&&) = delete;
|
thread_pool(thread_pool&&) = delete;
|
||||||
auto operator=(const thread_pool&) -> thread_pool& = delete;
|
auto operator=(const thread_pool&) -> thread_pool& = delete;
|
||||||
auto operator=(thread_pool&&) -> thread_pool& = delete;
|
auto operator=(thread_pool &&) -> thread_pool& = delete;
|
||||||
|
|
||||||
~thread_pool();
|
~thread_pool();
|
||||||
|
|
||||||
|
@ -98,8 +95,7 @@ public:
|
||||||
* pool thread. This will return nullopt if the schedule fails, currently the only
|
* pool thread. This will return nullopt if the schedule fails, currently the only
|
||||||
* way for this to fail is if `shudown()` has been called.
|
* way for this to fail is if `shudown()` has been called.
|
||||||
*/
|
*/
|
||||||
[[nodiscard]]
|
[[nodiscard]] auto schedule() noexcept -> std::optional<operation>;
|
||||||
auto schedule() noexcept -> std::optional<operation>;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throw std::runtime_error If the thread pool is `shutdown()` scheduling new tasks is not permitted.
|
* @throw std::runtime_error If the thread pool is `shutdown()` scheduling new tasks is not permitted.
|
||||||
|
@ -108,11 +104,11 @@ public:
|
||||||
* @return A task that wraps the given functor to be executed on the thread pool.
|
* @return A task that wraps the given functor to be executed on the thread pool.
|
||||||
*/
|
*/
|
||||||
template<typename functor, typename... arguments>
|
template<typename functor, typename... arguments>
|
||||||
[[nodiscard]]
|
[[nodiscard]] auto schedule(functor&& f, arguments... args) noexcept
|
||||||
auto schedule(functor&& f, arguments... args) noexcept -> task<decltype(f(std::forward<arguments>(args)...))>
|
-> task<decltype(f(std::forward<arguments>(args)...))>
|
||||||
{
|
{
|
||||||
auto scheduled = schedule();
|
auto scheduled = schedule();
|
||||||
if(!scheduled.has_value())
|
if (!scheduled.has_value())
|
||||||
{
|
{
|
||||||
throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
|
throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
|
||||||
}
|
}
|
||||||
|
@ -162,6 +158,7 @@ public:
|
||||||
* @return True if the task queue is currently empty.
|
* @return True if the task queue is currently empty.
|
||||||
*/
|
*/
|
||||||
auto queue_empty() const noexcept -> bool { return queue_size() == 0; }
|
auto queue_empty() const noexcept -> bool { return queue_size() == 0; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// The configuration options.
|
/// The configuration options.
|
||||||
options m_opts;
|
options m_opts;
|
||||||
|
|
|
@ -9,42 +9,33 @@
|
||||||
|
|
||||||
namespace coro
|
namespace coro
|
||||||
{
|
{
|
||||||
|
|
||||||
namespace detail
|
namespace detail
|
||||||
{
|
{
|
||||||
|
|
||||||
class when_all_latch
|
class when_all_latch
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
when_all_latch(std::size_t count) noexcept
|
when_all_latch(std::size_t count) noexcept : m_count(count + 1) {}
|
||||||
: m_count(count + 1)
|
|
||||||
{ }
|
|
||||||
|
|
||||||
when_all_latch(const when_all_latch&) = delete;
|
when_all_latch(const when_all_latch&) = delete;
|
||||||
when_all_latch(when_all_latch&& other)
|
when_all_latch(when_all_latch&& other)
|
||||||
: m_count(other.m_count.load(std::memory_order::acquire)),
|
: m_count(other.m_count.load(std::memory_order::acquire)),
|
||||||
m_awaiting_coroutine(std::exchange(other.m_awaiting_coroutine, nullptr))
|
m_awaiting_coroutine(std::exchange(other.m_awaiting_coroutine, nullptr))
|
||||||
{ }
|
{
|
||||||
|
}
|
||||||
|
|
||||||
auto operator=(const when_all_latch&) -> when_all_latch& = delete;
|
auto operator=(const when_all_latch&) -> when_all_latch& = delete;
|
||||||
auto operator=(when_all_latch&& other) -> when_all_latch&
|
auto operator =(when_all_latch&& other) -> when_all_latch&
|
||||||
{
|
{
|
||||||
if(std::addressof(other) != this)
|
if (std::addressof(other) != this)
|
||||||
{
|
{
|
||||||
m_count.store(
|
m_count.store(other.m_count.load(std::memory_order::acquire), std::memory_order::relaxed);
|
||||||
other.m_count.load(std::memory_order::acquire),
|
|
||||||
std::memory_order::relaxed
|
|
||||||
);
|
|
||||||
m_awaiting_coroutine = std::exchange(other.m_awaiting_coroutine, nullptr);
|
m_awaiting_coroutine = std::exchange(other.m_awaiting_coroutine, nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto is_ready() const noexcept -> bool
|
auto is_ready() const noexcept -> bool { return m_awaiting_coroutine != nullptr && m_awaiting_coroutine.done(); }
|
||||||
{
|
|
||||||
return m_awaiting_coroutine != nullptr && m_awaiting_coroutine.done();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto try_await(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
auto try_await(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
||||||
{
|
{
|
||||||
|
@ -54,7 +45,7 @@ public:
|
||||||
|
|
||||||
auto notify_awaitable_completed() noexcept -> void
|
auto notify_awaitable_completed() noexcept -> void
|
||||||
{
|
{
|
||||||
if(m_count.fetch_sub(1, std::memory_order::acq_rel) == 1)
|
if (m_count.fetch_sub(1, std::memory_order::acq_rel) == 1)
|
||||||
{
|
{
|
||||||
m_awaiting_coroutine.resume();
|
m_awaiting_coroutine.resume();
|
||||||
}
|
}
|
||||||
|
@ -82,108 +73,92 @@ public:
|
||||||
explicit constexpr when_all_ready_awaitable(std::tuple<>) noexcept {}
|
explicit constexpr when_all_ready_awaitable(std::tuple<>) noexcept {}
|
||||||
|
|
||||||
constexpr auto await_ready() const noexcept -> bool { return true; }
|
constexpr auto await_ready() const noexcept -> bool { return true; }
|
||||||
auto await_suspend(std::coroutine_handle<>) noexcept -> void { }
|
auto await_suspend(std::coroutine_handle<>) noexcept -> void {}
|
||||||
auto await_resume() const noexcept -> std::tuple<> { return {}; }
|
auto await_resume() const noexcept -> std::tuple<> { return {}; }
|
||||||
};
|
};
|
||||||
|
|
||||||
template<typename... task_types>
|
template<typename... task_types>
|
||||||
class when_all_ready_awaitable<std::tuple<task_types...>>
|
class when_all_ready_awaitable<std::tuple<task_types...>>
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit when_all_ready_awaitable(task_types&&... tasks)
|
explicit when_all_ready_awaitable(task_types&&... tasks) noexcept(
|
||||||
noexcept(std::conjunction_v<std::is_nothrow_move_constructible_v<task_types>...>)
|
std::conjunction_v<std::is_nothrow_move_constructible_v<task_types>...>)
|
||||||
: m_latch(sizeof...(task_types)),
|
: m_latch(sizeof...(task_types)),
|
||||||
m_tasks(std::move(tasks)...)
|
m_tasks(std::move(tasks)...)
|
||||||
{}
|
{
|
||||||
|
}
|
||||||
|
|
||||||
explicit when_all_ready_awaitable(std::tuple<task_types...>&& tasks)
|
explicit when_all_ready_awaitable(std::tuple<task_types...>&& tasks) noexcept(
|
||||||
noexcept(std::is_nothrow_move_constructible_v<std::tuple<task_types...>>)
|
std::is_nothrow_move_constructible_v<std::tuple<task_types...>>)
|
||||||
: m_latch(sizeof...(task_types)),
|
: m_latch(sizeof...(task_types)),
|
||||||
m_tasks(std::move(tasks))
|
m_tasks(std::move(tasks))
|
||||||
{ }
|
{
|
||||||
|
}
|
||||||
|
|
||||||
when_all_ready_awaitable(const when_all_ready_awaitable&) = delete;
|
when_all_ready_awaitable(const when_all_ready_awaitable&) = delete;
|
||||||
when_all_ready_awaitable(when_all_ready_awaitable&& other)
|
when_all_ready_awaitable(when_all_ready_awaitable&& other)
|
||||||
: m_latch(std::move(other.m_latch)),
|
: m_latch(std::move(other.m_latch)),
|
||||||
m_tasks(std::move(other.m_tasks))
|
m_tasks(std::move(other.m_tasks))
|
||||||
{ }
|
{
|
||||||
|
}
|
||||||
|
|
||||||
auto operator=(const when_all_ready_awaitable&) -> when_all_ready_awaitable& = delete;
|
auto operator=(const when_all_ready_awaitable&) -> when_all_ready_awaitable& = delete;
|
||||||
auto operator=(when_all_ready_awaitable&&) -> when_all_ready_awaitable& = delete;
|
auto operator=(when_all_ready_awaitable &&) -> when_all_ready_awaitable& = delete;
|
||||||
|
|
||||||
auto operator co_await() & noexcept
|
auto operator co_await() & noexcept
|
||||||
{
|
{
|
||||||
struct awaiter
|
struct awaiter
|
||||||
{
|
{
|
||||||
explicit awaiter(when_all_ready_awaitable& awaitable) noexcept
|
explicit awaiter(when_all_ready_awaitable& awaitable) noexcept : m_awaitable(awaitable) {}
|
||||||
: m_awaitable(awaitable)
|
|
||||||
{ }
|
|
||||||
|
|
||||||
auto await_ready() const noexcept -> bool
|
auto await_ready() const noexcept -> bool { return m_awaitable.is_ready(); }
|
||||||
{
|
|
||||||
return m_awaitable.is_ready();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
||||||
{
|
{
|
||||||
return m_awaitable.try_await(awaiting_coroutine);
|
return m_awaitable.try_await(awaiting_coroutine);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto await_resume() noexcept -> std::tuple<task_types...>&
|
auto await_resume() noexcept -> std::tuple<task_types...>& { return m_awaitable.m_tasks; }
|
||||||
{
|
|
||||||
return m_awaitable.m_tasks;
|
|
||||||
}
|
|
||||||
private:
|
private:
|
||||||
when_all_ready_awaitable& m_awaitable;
|
when_all_ready_awaitable& m_awaitable;
|
||||||
};
|
};
|
||||||
|
|
||||||
return awaiter{ *this };
|
return awaiter{*this};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto operator co_await() && noexcept
|
auto operator co_await() && noexcept
|
||||||
{
|
{
|
||||||
struct awaiter
|
struct awaiter
|
||||||
{
|
{
|
||||||
explicit awaiter(when_all_ready_awaitable& awaitable) noexcept
|
explicit awaiter(when_all_ready_awaitable& awaitable) noexcept : m_awaitable(awaitable) {}
|
||||||
: m_awaitable(awaitable)
|
|
||||||
{ }
|
|
||||||
|
|
||||||
auto await_ready() const noexcept -> bool
|
auto await_ready() const noexcept -> bool { return m_awaitable.is_ready(); }
|
||||||
{
|
|
||||||
return m_awaitable.is_ready();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
||||||
{
|
{
|
||||||
return m_awaitable.try_await(awaiting_coroutine);
|
return m_awaitable.try_await(awaiting_coroutine);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto await_resume() noexcept -> std::tuple<task_types...>&&
|
auto await_resume() noexcept -> std::tuple<task_types...>&& { return std::move(m_awaitable.m_tasks); }
|
||||||
{
|
|
||||||
return std::move(m_awaitable.m_tasks);
|
|
||||||
}
|
|
||||||
private:
|
private:
|
||||||
when_all_ready_awaitable& m_awaitable;
|
when_all_ready_awaitable& m_awaitable;
|
||||||
};
|
};
|
||||||
|
|
||||||
return awaiter{ *this };
|
return awaiter{*this};
|
||||||
}
|
}
|
||||||
private:
|
|
||||||
|
|
||||||
auto is_ready() const noexcept -> bool
|
private:
|
||||||
{
|
auto is_ready() const noexcept -> bool { return m_latch.is_ready(); }
|
||||||
return m_latch.is_ready();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto try_await(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
auto try_await(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
||||||
{
|
{
|
||||||
std::apply(
|
std::apply([this](auto&&... tasks) { ((tasks.start(m_latch)), ...); }, m_tasks);
|
||||||
[this](auto&&... tasks) { ((tasks.start(m_latch)), ...); },
|
|
||||||
m_tasks);
|
|
||||||
return m_latch.try_await(awaiting_coroutine);
|
return m_latch.try_await(awaiting_coroutine);
|
||||||
}
|
}
|
||||||
|
|
||||||
when_all_latch m_latch;
|
when_all_latch m_latch;
|
||||||
std::tuple<task_types...> m_tasks;
|
std::tuple<task_types...> m_tasks;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -194,14 +169,16 @@ public:
|
||||||
explicit when_all_ready_awaitable(task_container_type&& tasks) noexcept
|
explicit when_all_ready_awaitable(task_container_type&& tasks) noexcept
|
||||||
: m_latch(std::size(tasks)),
|
: m_latch(std::size(tasks)),
|
||||||
m_tasks(std::forward<task_container_type>(tasks))
|
m_tasks(std::forward<task_container_type>(tasks))
|
||||||
{}
|
{
|
||||||
|
}
|
||||||
|
|
||||||
when_all_ready_awaitable(const when_all_ready_awaitable&) = delete;
|
when_all_ready_awaitable(const when_all_ready_awaitable&) = delete;
|
||||||
when_all_ready_awaitable(when_all_ready_awaitable&& other)
|
when_all_ready_awaitable(when_all_ready_awaitable&& other) noexcept(
|
||||||
noexcept(std::is_nothrow_move_constructible_v<task_container_type>)
|
std::is_nothrow_move_constructible_v<task_container_type>)
|
||||||
: m_latch(std::move(other.m_latch)),
|
: m_latch(std::move(other.m_latch)),
|
||||||
m_tasks(std::move(m_tasks))
|
m_tasks(std::move(m_tasks))
|
||||||
{}
|
{
|
||||||
|
}
|
||||||
|
|
||||||
auto operator=(const when_all_ready_awaitable&) -> when_all_ready_awaitable& = delete;
|
auto operator=(const when_all_ready_awaitable&) -> when_all_ready_awaitable& = delete;
|
||||||
auto operator=(when_all_ready_awaitable&) -> when_all_ready_awaitable& = delete;
|
auto operator=(when_all_ready_awaitable&) -> when_all_ready_awaitable& = delete;
|
||||||
|
@ -210,24 +187,17 @@ public:
|
||||||
{
|
{
|
||||||
struct awaiter
|
struct awaiter
|
||||||
{
|
{
|
||||||
awaiter(when_all_ready_awaitable& awaitable)
|
awaiter(when_all_ready_awaitable& awaitable) : m_awaitable(awaitable) {}
|
||||||
: m_awaitable(awaitable)
|
|
||||||
{}
|
|
||||||
|
|
||||||
auto await_ready() const noexcept -> bool
|
auto await_ready() const noexcept -> bool { return m_awaitable.is_ready(); }
|
||||||
{
|
|
||||||
return m_awaitable.is_ready();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
||||||
{
|
{
|
||||||
return m_awaitable.try_await(awaiting_coroutine);
|
return m_awaitable.try_await(awaiting_coroutine);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto await_resume() noexcept -> task_container_type&
|
auto await_resume() noexcept -> task_container_type& { return m_awaitable.m_tasks; }
|
||||||
{
|
|
||||||
return m_awaitable.m_tasks;
|
|
||||||
}
|
|
||||||
private:
|
private:
|
||||||
when_all_ready_awaitable& m_awaitable;
|
when_all_ready_awaitable& m_awaitable;
|
||||||
};
|
};
|
||||||
|
@ -239,39 +209,30 @@ public:
|
||||||
{
|
{
|
||||||
struct awaiter
|
struct awaiter
|
||||||
{
|
{
|
||||||
awaiter(when_all_ready_awaitable& awaitable)
|
awaiter(when_all_ready_awaitable& awaitable) : m_awaitable(awaitable) {}
|
||||||
: m_awaitable(awaitable)
|
|
||||||
{}
|
|
||||||
|
|
||||||
auto await_ready() const noexcept -> bool
|
auto await_ready() const noexcept -> bool { return m_awaitable.is_ready(); }
|
||||||
{
|
|
||||||
return m_awaitable.is_ready();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
||||||
{
|
{
|
||||||
return m_awaitable.try_await(awaiting_coroutine);
|
return m_awaitable.try_await(awaiting_coroutine);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto await_resume() noexcept -> task_container_type&&
|
auto await_resume() noexcept -> task_container_type&& { return std::move(m_awaitable.m_tasks); }
|
||||||
{
|
|
||||||
return std::move(m_awaitable.m_tasks);
|
|
||||||
}
|
|
||||||
private:
|
private:
|
||||||
when_all_ready_awaitable& m_awaitable;
|
when_all_ready_awaitable& m_awaitable;
|
||||||
};
|
};
|
||||||
|
|
||||||
return awaiter{*this};
|
return awaiter{*this};
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
auto is_ready() const noexcept -> bool
|
auto is_ready() const noexcept -> bool { return m_latch.is_ready(); }
|
||||||
{
|
|
||||||
return m_latch.is_ready();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto try_await(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
auto try_await(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
||||||
{
|
{
|
||||||
for(auto& task : m_tasks)
|
for (auto& task : m_tasks)
|
||||||
{
|
{
|
||||||
task.start(m_latch);
|
task.start(m_latch);
|
||||||
}
|
}
|
||||||
|
@ -279,7 +240,7 @@ private:
|
||||||
return m_latch.try_await(awaiting_coroutine);
|
return m_latch.try_await(awaiting_coroutine);
|
||||||
}
|
}
|
||||||
|
|
||||||
when_all_latch m_latch;
|
when_all_latch m_latch;
|
||||||
task_container_type m_tasks;
|
task_container_type m_tasks;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -289,18 +250,11 @@ class when_all_task_promise
|
||||||
public:
|
public:
|
||||||
using coroutine_handle_type = std::coroutine_handle<when_all_task_promise<return_type>>;
|
using coroutine_handle_type = std::coroutine_handle<when_all_task_promise<return_type>>;
|
||||||
|
|
||||||
when_all_task_promise() noexcept
|
when_all_task_promise() noexcept {}
|
||||||
{}
|
|
||||||
|
|
||||||
auto get_return_object() noexcept
|
auto get_return_object() noexcept { return coroutine_handle_type::from_promise(*this); }
|
||||||
{
|
|
||||||
return coroutine_handle_type::from_promise(*this);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto initial_suspend() noexcept -> std::suspend_always
|
auto initial_suspend() noexcept -> std::suspend_always { return {}; }
|
||||||
{
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
auto final_suspend() noexcept
|
auto final_suspend() noexcept
|
||||||
{
|
{
|
||||||
|
@ -311,16 +265,13 @@ public:
|
||||||
{
|
{
|
||||||
coroutine.promise().m_latch->notify_awaitable_completed();
|
coroutine.promise().m_latch->notify_awaitable_completed();
|
||||||
}
|
}
|
||||||
auto await_resume() const noexcept { }
|
auto await_resume() const noexcept {}
|
||||||
};
|
};
|
||||||
|
|
||||||
return completion_notifier{};
|
return completion_notifier{};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto unhandled_exception() noexcept
|
auto unhandled_exception() noexcept { m_exception_ptr = std::current_exception(); }
|
||||||
{
|
|
||||||
m_exception_ptr = std::current_exception();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto yield_value(return_type&& value) noexcept
|
auto yield_value(return_type&& value) noexcept
|
||||||
{
|
{
|
||||||
|
@ -336,7 +287,7 @@ public:
|
||||||
|
|
||||||
auto return_value() & -> return_type&
|
auto return_value() & -> return_type&
|
||||||
{
|
{
|
||||||
if(m_exception_ptr)
|
if (m_exception_ptr)
|
||||||
{
|
{
|
||||||
std::rethrow_exception(m_exception_ptr);
|
std::rethrow_exception(m_exception_ptr);
|
||||||
}
|
}
|
||||||
|
@ -345,7 +296,7 @@ public:
|
||||||
|
|
||||||
auto return_value() && -> return_type&&
|
auto return_value() && -> return_type&&
|
||||||
{
|
{
|
||||||
if(m_exception_ptr)
|
if (m_exception_ptr)
|
||||||
{
|
{
|
||||||
std::rethrow_exception(m_exception_ptr);
|
std::rethrow_exception(m_exception_ptr);
|
||||||
}
|
}
|
||||||
|
@ -353,8 +304,8 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
when_all_latch* m_latch{nullptr};
|
when_all_latch* m_latch{nullptr};
|
||||||
std::exception_ptr m_exception_ptr;
|
std::exception_ptr m_exception_ptr;
|
||||||
std::add_pointer_t<return_type> m_return_value;
|
std::add_pointer_t<return_type> m_return_value;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -364,18 +315,11 @@ class when_all_task_promise<void>
|
||||||
public:
|
public:
|
||||||
using coroutine_handle_type = std::coroutine_handle<when_all_task_promise<void>>;
|
using coroutine_handle_type = std::coroutine_handle<when_all_task_promise<void>>;
|
||||||
|
|
||||||
when_all_task_promise() noexcept
|
when_all_task_promise() noexcept {}
|
||||||
{}
|
|
||||||
|
|
||||||
auto get_return_object() noexcept
|
auto get_return_object() noexcept { return coroutine_handle_type::from_promise(*this); }
|
||||||
{
|
|
||||||
return coroutine_handle_type::from_promise(*this);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto initial_suspend() noexcept -> std::suspend_always
|
auto initial_suspend() noexcept -> std::suspend_always { return {}; }
|
||||||
{
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
auto final_suspend() noexcept
|
auto final_suspend() noexcept
|
||||||
{
|
{
|
||||||
|
@ -386,19 +330,15 @@ public:
|
||||||
{
|
{
|
||||||
coroutine.promise().m_latch->notify_awaitable_completed();
|
coroutine.promise().m_latch->notify_awaitable_completed();
|
||||||
}
|
}
|
||||||
auto await_resume() const noexcept -> void { }
|
auto await_resume() const noexcept -> void {}
|
||||||
};
|
};
|
||||||
|
|
||||||
return completion_notifier{};
|
return completion_notifier{};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto unhandled_exception() noexcept -> void
|
auto unhandled_exception() noexcept -> void { m_exception_ptr = std::current_exception(); }
|
||||||
{
|
|
||||||
m_exception_ptr = std::current_exception();
|
|
||||||
}
|
|
||||||
|
|
||||||
auto return_void() noexcept -> void
|
auto return_void() noexcept -> void {}
|
||||||
{}
|
|
||||||
|
|
||||||
auto start(when_all_latch& latch) -> void
|
auto start(when_all_latch& latch) -> void
|
||||||
{
|
{
|
||||||
|
@ -408,13 +348,14 @@ public:
|
||||||
|
|
||||||
auto return_value() -> void
|
auto return_value() -> void
|
||||||
{
|
{
|
||||||
if(m_exception_ptr)
|
if (m_exception_ptr)
|
||||||
{
|
{
|
||||||
std::rethrow_exception(m_exception_ptr);
|
std::rethrow_exception(m_exception_ptr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
when_all_latch* m_latch{nullptr};
|
when_all_latch* m_latch{nullptr};
|
||||||
std::exception_ptr m_exception_ptr;
|
std::exception_ptr m_exception_ptr;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -426,24 +367,23 @@ public:
|
||||||
template<typename task_container_type>
|
template<typename task_container_type>
|
||||||
friend class when_all_ready_awaitable;
|
friend class when_all_ready_awaitable;
|
||||||
|
|
||||||
using promise_type = when_all_task_promise<return_type>;
|
using promise_type = when_all_task_promise<return_type>;
|
||||||
using coroutine_handle_type = typename promise_type::coroutine_handle_type;
|
using coroutine_handle_type = typename promise_type::coroutine_handle_type;
|
||||||
|
|
||||||
when_all_task(coroutine_handle_type coroutine) noexcept
|
when_all_task(coroutine_handle_type coroutine) noexcept : m_coroutine(coroutine) {}
|
||||||
: m_coroutine(coroutine)
|
|
||||||
{}
|
|
||||||
|
|
||||||
when_all_task(const when_all_task&) = delete;
|
when_all_task(const when_all_task&) = delete;
|
||||||
when_all_task(when_all_task&& other) noexcept
|
when_all_task(when_all_task&& other) noexcept
|
||||||
: m_coroutine(std::exchange(other.m_coroutine, coroutine_handle_type{}))
|
: m_coroutine(std::exchange(other.m_coroutine, coroutine_handle_type{}))
|
||||||
{}
|
{
|
||||||
|
}
|
||||||
|
|
||||||
auto operator=(const when_all_task&) -> when_all_task& = delete;
|
auto operator=(const when_all_task&) -> when_all_task& = delete;
|
||||||
auto operator=(when_all_task&&) -> when_all_task& = delete;
|
auto operator=(when_all_task &&) -> when_all_task& = delete;
|
||||||
|
|
||||||
~when_all_task()
|
~when_all_task()
|
||||||
{
|
{
|
||||||
if(m_coroutine != nullptr)
|
if (m_coroutine != nullptr)
|
||||||
{
|
{
|
||||||
m_coroutine.destroy();
|
m_coroutine.destroy();
|
||||||
}
|
}
|
||||||
|
@ -462,7 +402,7 @@ public:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto return_value() const & -> decltype(auto)
|
auto return_value() const& -> decltype(auto)
|
||||||
{
|
{
|
||||||
if constexpr (std::is_void_v<return_type>)
|
if constexpr (std::is_void_v<return_type>)
|
||||||
{
|
{
|
||||||
|
@ -489,10 +429,7 @@ public:
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
auto start(when_all_latch& latch) noexcept -> void
|
auto start(when_all_latch& latch) noexcept -> void { m_coroutine.promise().start(latch); }
|
||||||
{
|
|
||||||
m_coroutine.promise().start(latch);
|
|
||||||
}
|
|
||||||
|
|
||||||
coroutine_handle_type m_coroutine;
|
coroutine_handle_type m_coroutine;
|
||||||
};
|
};
|
||||||
|
@ -516,23 +453,19 @@ static auto make_when_all_task(awaitable a) -> when_all_task<return_type>
|
||||||
template<awaitable... awaitables_type>
|
template<awaitable... awaitables_type>
|
||||||
[[nodiscard]] auto when_all_awaitable(awaitables_type&&... awaitables)
|
[[nodiscard]] auto when_all_awaitable(awaitables_type&&... awaitables)
|
||||||
{
|
{
|
||||||
return
|
return detail::when_all_ready_awaitable<
|
||||||
detail::when_all_ready_awaitable<
|
std::tuple<detail::when_all_task<typename awaitable_traits<awaitables_type>::awaiter_return_type>...>>(
|
||||||
std::tuple<
|
std::make_tuple(detail::make_when_all_task(std::forward<awaitables_type>(awaitables))...));
|
||||||
detail::when_all_task<
|
|
||||||
typename awaitable_traits<awaitables_type>::awaiter_return_type
|
|
||||||
>...
|
|
||||||
>
|
|
||||||
>(std::make_tuple(detail::make_when_all_task(std::forward<awaitables_type>(awaitables))...));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template<awaitable awaitable, typename return_type = awaitable_traits<awaitable>::awaiter_return_type>
|
template<awaitable awaitable, typename return_type = awaitable_traits<awaitable>::awaiter_return_type>
|
||||||
[[nodiscard]] auto when_all_awaitable(std::vector<awaitable>& awaitables) -> detail::when_all_ready_awaitable<std::vector<detail::when_all_task<return_type>>>
|
[[nodiscard]] auto when_all_awaitable(std::vector<awaitable>& awaitables)
|
||||||
|
-> detail::when_all_ready_awaitable<std::vector<detail::when_all_task<return_type>>>
|
||||||
{
|
{
|
||||||
std::vector<detail::when_all_task<return_type>> tasks;
|
std::vector<detail::when_all_task<return_type>> tasks;
|
||||||
tasks.reserve(std::size(awaitables));
|
tasks.reserve(std::size(awaitables));
|
||||||
|
|
||||||
for(auto& a : awaitables)
|
for (auto& a : awaitables)
|
||||||
{
|
{
|
||||||
tasks.emplace_back(detail::make_when_all_task(std::move(a)));
|
tasks.emplace_back(detail::make_when_all_task(std::move(a)));
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,11 +2,8 @@
|
||||||
|
|
||||||
namespace coro::detail
|
namespace coro::detail
|
||||||
{
|
{
|
||||||
|
sync_wait_event::sync_wait_event(bool initially_set) : m_set(initially_set)
|
||||||
sync_wait_event::sync_wait_event(bool initially_set)
|
|
||||||
: m_set(initially_set)
|
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto sync_wait_event::set() noexcept -> void
|
auto sync_wait_event::set() noexcept -> void
|
||||||
|
|
|
@ -2,11 +2,8 @@
|
||||||
|
|
||||||
namespace coro
|
namespace coro
|
||||||
{
|
{
|
||||||
|
thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp)
|
||||||
thread_pool::operation::operation(thread_pool& tp) noexcept
|
|
||||||
: m_thread_pool(tp)
|
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
|
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
|
||||||
|
@ -19,12 +16,11 @@ auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coro
|
||||||
// something else while this coroutine gets picked up by the thread pool.
|
// something else while this coroutine gets picked up by the thread pool.
|
||||||
}
|
}
|
||||||
|
|
||||||
thread_pool::thread_pool(options opts)
|
thread_pool::thread_pool(options opts) : m_opts(std::move(opts))
|
||||||
: m_opts(std::move(opts))
|
|
||||||
{
|
{
|
||||||
m_threads.reserve(m_opts.thread_count);
|
m_threads.reserve(m_opts.thread_count);
|
||||||
|
|
||||||
for(uint32_t i = 0; i < m_opts.thread_count; ++i)
|
for (uint32_t i = 0; i < m_opts.thread_count; ++i)
|
||||||
{
|
{
|
||||||
m_threads.emplace_back([this, i](std::stop_token st) { executor(std::move(st), i); });
|
m_threads.emplace_back([this, i](std::stop_token st) { executor(std::move(st), i); });
|
||||||
}
|
}
|
||||||
|
@ -37,7 +33,7 @@ thread_pool::~thread_pool()
|
||||||
|
|
||||||
auto thread_pool::schedule() noexcept -> std::optional<operation>
|
auto thread_pool::schedule() noexcept -> std::optional<operation>
|
||||||
{
|
{
|
||||||
if(!m_shutdown_requested.load(std::memory_order::relaxed))
|
if (!m_shutdown_requested.load(std::memory_order::relaxed))
|
||||||
{
|
{
|
||||||
m_size.fetch_add(1, std::memory_order_relaxed);
|
m_size.fetch_add(1, std::memory_order_relaxed);
|
||||||
return {operation{*this}};
|
return {operation{*this}};
|
||||||
|
@ -50,16 +46,16 @@ auto thread_pool::shutdown(shutdown_t wait_for_tasks) noexcept -> void
|
||||||
{
|
{
|
||||||
if (!m_shutdown_requested.exchange(true, std::memory_order::release))
|
if (!m_shutdown_requested.exchange(true, std::memory_order::release))
|
||||||
{
|
{
|
||||||
for(auto& thread : m_threads)
|
for (auto& thread : m_threads)
|
||||||
{
|
{
|
||||||
thread.request_stop();
|
thread.request_stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
if(wait_for_tasks == shutdown_t::sync)
|
if (wait_for_tasks == shutdown_t::sync)
|
||||||
{
|
{
|
||||||
for(auto& thread : m_threads)
|
for (auto& thread : m_threads)
|
||||||
{
|
{
|
||||||
if(thread.joinable())
|
if (thread.joinable())
|
||||||
{
|
{
|
||||||
thread.join();
|
thread.join();
|
||||||
}
|
}
|
||||||
|
@ -70,12 +66,12 @@ auto thread_pool::shutdown(shutdown_t wait_for_tasks) noexcept -> void
|
||||||
|
|
||||||
auto thread_pool::executor(std::stop_token stop_token, std::size_t idx) -> void
|
auto thread_pool::executor(std::stop_token stop_token, std::size_t idx) -> void
|
||||||
{
|
{
|
||||||
if(m_opts.on_thread_start_functor != nullptr)
|
if (m_opts.on_thread_start_functor != nullptr)
|
||||||
{
|
{
|
||||||
m_opts.on_thread_start_functor(idx);
|
m_opts.on_thread_start_functor(idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
while(true)
|
while (true)
|
||||||
{
|
{
|
||||||
// Wait until the queue has operations to execute or shutdown has been requested.
|
// Wait until the queue has operations to execute or shutdown has been requested.
|
||||||
{
|
{
|
||||||
|
@ -84,12 +80,12 @@ auto thread_pool::executor(std::stop_token stop_token, std::size_t idx) -> void
|
||||||
}
|
}
|
||||||
|
|
||||||
// Continue to pull operations from the global queue until its empty.
|
// Continue to pull operations from the global queue until its empty.
|
||||||
while(true)
|
while (true)
|
||||||
{
|
{
|
||||||
operation* op{nullptr};
|
operation* op{nullptr};
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lk{m_queue_mutex};
|
std::lock_guard<std::mutex> lk{m_queue_mutex};
|
||||||
if(!m_queue.empty())
|
if (!m_queue.empty())
|
||||||
{
|
{
|
||||||
op = m_queue.front();
|
op = m_queue.front();
|
||||||
m_queue.pop_front();
|
m_queue.pop_front();
|
||||||
|
@ -100,7 +96,7 @@ auto thread_pool::executor(std::stop_token stop_token, std::size_t idx) -> void
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(op != nullptr && op->m_awaiting_coroutine != nullptr)
|
if (op != nullptr && op->m_awaiting_coroutine != nullptr)
|
||||||
{
|
{
|
||||||
op->m_awaiting_coroutine.resume();
|
op->m_awaiting_coroutine.resume();
|
||||||
m_size.fetch_sub(1, std::memory_order_relaxed);
|
m_size.fetch_sub(1, std::memory_order_relaxed);
|
||||||
|
@ -111,13 +107,13 @@ auto thread_pool::executor(std::stop_token stop_token, std::size_t idx) -> void
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(stop_token.stop_requested())
|
if (stop_token.stop_requested())
|
||||||
{
|
{
|
||||||
break; // while(true);
|
break; // while(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(m_opts.on_thread_stop_functor != nullptr)
|
if (m_opts.on_thread_stop_functor != nullptr)
|
||||||
{
|
{
|
||||||
m_opts.on_thread_stop_functor(idx);
|
m_opts.on_thread_stop_functor(idx);
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,10 +50,8 @@ TEST_CASE("benchmark counter func direct call")
|
||||||
TEST_CASE("benchmark counter func coro::sync_wait(awaitable)")
|
TEST_CASE("benchmark counter func coro::sync_wait(awaitable)")
|
||||||
{
|
{
|
||||||
constexpr std::size_t iterations = default_iterations;
|
constexpr std::size_t iterations = default_iterations;
|
||||||
uint64_t counter{0};
|
uint64_t counter{0};
|
||||||
auto func = []() -> coro::task<uint64_t> {
|
auto func = []() -> coro::task<uint64_t> { co_return 1; };
|
||||||
co_return 1;
|
|
||||||
};
|
|
||||||
|
|
||||||
auto start = sc::now();
|
auto start = sc::now();
|
||||||
|
|
||||||
|
@ -69,10 +67,8 @@ TEST_CASE("benchmark counter func coro::sync_wait(awaitable)")
|
||||||
TEST_CASE("benchmark counter func coro::sync_wait(coro::when_all_awaitable(awaitable)) x10")
|
TEST_CASE("benchmark counter func coro::sync_wait(coro::when_all_awaitable(awaitable)) x10")
|
||||||
{
|
{
|
||||||
constexpr std::size_t iterations = default_iterations;
|
constexpr std::size_t iterations = default_iterations;
|
||||||
uint64_t counter{0};
|
uint64_t counter{0};
|
||||||
auto f = []() -> coro::task<uint64_t> {
|
auto f = []() -> coro::task<uint64_t> { co_return 1; };
|
||||||
co_return 1;
|
|
||||||
};
|
|
||||||
|
|
||||||
auto start = sc::now();
|
auto start = sc::now();
|
||||||
|
|
||||||
|
@ -80,13 +76,11 @@ TEST_CASE("benchmark counter func coro::sync_wait(coro::when_all_awaitable(await
|
||||||
{
|
{
|
||||||
auto tasks = coro::sync_wait(coro::when_all_awaitable(f(), f(), f(), f(), f(), f(), f(), f(), f(), f()));
|
auto tasks = coro::sync_wait(coro::when_all_awaitable(f(), f(), f(), f(), f(), f(), f(), f(), f(), f()));
|
||||||
|
|
||||||
std::apply([&counter](auto&&... t) {
|
std::apply([&counter](auto&&... t) { ((counter += t.return_value()), ...); }, tasks);
|
||||||
((counter += t.return_value()), ...);
|
|
||||||
},
|
|
||||||
tasks);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
print_stats("benchmark counter func coro::sync_wait(coro::when_all_awaitable(awaitable))", iterations, start, sc::now());
|
print_stats(
|
||||||
|
"benchmark counter func coro::sync_wait(coro::when_all_awaitable(awaitable))", iterations, start, sc::now());
|
||||||
REQUIRE(counter == iterations);
|
REQUIRE(counter == iterations);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,11 +88,10 @@ TEST_CASE("benchmark thread_pool{1} counter task")
|
||||||
{
|
{
|
||||||
constexpr std::size_t iterations = default_iterations;
|
constexpr std::size_t iterations = default_iterations;
|
||||||
|
|
||||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||||
std::atomic<uint64_t> counter{0};
|
std::atomic<uint64_t> counter{0};
|
||||||
|
|
||||||
auto make_task = [](coro::thread_pool& tp, std::atomic<uint64_t>& c) -> coro::task<void>
|
auto make_task = [](coro::thread_pool& tp, std::atomic<uint64_t>& c) -> coro::task<void> {
|
||||||
{
|
|
||||||
co_await tp.schedule().value();
|
co_await tp.schedule().value();
|
||||||
c.fetch_add(1, std::memory_order::relaxed);
|
c.fetch_add(1, std::memory_order::relaxed);
|
||||||
co_return;
|
co_return;
|
||||||
|
@ -126,11 +119,10 @@ TEST_CASE("benchmark thread_pool{2} counter task")
|
||||||
{
|
{
|
||||||
constexpr std::size_t iterations = default_iterations;
|
constexpr std::size_t iterations = default_iterations;
|
||||||
|
|
||||||
coro::thread_pool tp{coro::thread_pool::options{2}};
|
coro::thread_pool tp{coro::thread_pool::options{2}};
|
||||||
std::atomic<uint64_t> counter{0};
|
std::atomic<uint64_t> counter{0};
|
||||||
|
|
||||||
auto make_task = [](coro::thread_pool& tp, std::atomic<uint64_t>& c) -> coro::task<void>
|
auto make_task = [](coro::thread_pool& tp, std::atomic<uint64_t>& c) -> coro::task<void> {
|
||||||
{
|
|
||||||
co_await tp.schedule().value();
|
co_await tp.schedule().value();
|
||||||
c.fetch_add(1, std::memory_order::relaxed);
|
c.fetch_add(1, std::memory_order::relaxed);
|
||||||
co_return;
|
co_return;
|
||||||
|
|
|
@ -516,7 +516,7 @@ TEST_CASE("scheduler task throws")
|
||||||
|
|
||||||
auto func = []() -> coro::task<void> {
|
auto func = []() -> coro::task<void> {
|
||||||
// Is it possible to actually notify the user when running a task in a scheduler?
|
// Is it possible to actually notify the user when running a task in a scheduler?
|
||||||
// Seems like the user will need to manually catch.
|
// Seems like the user will need to manually catch within the task themselves.
|
||||||
throw std::runtime_error{"I always throw."};
|
throw std::runtime_error{"I always throw."};
|
||||||
co_return;
|
co_return;
|
||||||
};
|
};
|
||||||
|
@ -525,4 +525,24 @@ TEST_CASE("scheduler task throws")
|
||||||
|
|
||||||
s.shutdown();
|
s.shutdown();
|
||||||
REQUIRE(s.empty());
|
REQUIRE(s.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("scheduler task throws after resume")
|
||||||
|
{
|
||||||
|
coro::scheduler s{};
|
||||||
|
auto token = s.generate_resume_token<void>();
|
||||||
|
|
||||||
|
auto func = [&]() -> coro::task<void> {
|
||||||
|
co_await token;
|
||||||
|
throw std::runtime_error{"I always throw."};
|
||||||
|
co_return;
|
||||||
|
};
|
||||||
|
|
||||||
|
s.schedule(func());
|
||||||
|
|
||||||
|
std::this_thread::sleep_for(50ms);
|
||||||
|
token.resume();
|
||||||
|
|
||||||
|
s.shutdown();
|
||||||
|
REQUIRE(s.empty());
|
||||||
|
}
|
||||||
|
|
|
@ -4,9 +4,7 @@
|
||||||
|
|
||||||
TEST_CASE("sync_wait simple integer return")
|
TEST_CASE("sync_wait simple integer return")
|
||||||
{
|
{
|
||||||
auto func = []() -> coro::task<int> {
|
auto func = []() -> coro::task<int> { co_return 11; };
|
||||||
co_return 11;
|
|
||||||
};
|
|
||||||
|
|
||||||
auto result = coro::sync_wait(func());
|
auto result = coro::sync_wait(func());
|
||||||
REQUIRE(result == 11);
|
REQUIRE(result == 11);
|
||||||
|
@ -51,8 +49,7 @@ TEST_CASE("sync_wait task co_await single")
|
||||||
|
|
||||||
TEST_CASE("sync_wait task that throws")
|
TEST_CASE("sync_wait task that throws")
|
||||||
{
|
{
|
||||||
auto f = []() -> coro::task<uint64_t>
|
auto f = []() -> coro::task<uint64_t> {
|
||||||
{
|
|
||||||
throw std::runtime_error("I always throw!");
|
throw std::runtime_error("I always throw!");
|
||||||
co_return 1;
|
co_return 1;
|
||||||
};
|
};
|
||||||
|
|
|
@ -8,8 +8,7 @@ TEST_CASE("thread_pool one worker one task")
|
||||||
{
|
{
|
||||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||||
|
|
||||||
auto func = [&tp]() -> coro::task<uint64_t>
|
auto func = [&tp]() -> coro::task<uint64_t> {
|
||||||
{
|
|
||||||
co_await tp.schedule().value(); // Schedule this coroutine on the scheduler.
|
co_await tp.schedule().value(); // Schedule this coroutine on the scheduler.
|
||||||
co_return 42;
|
co_return 42;
|
||||||
};
|
};
|
||||||
|
@ -22,8 +21,7 @@ TEST_CASE("thread_pool one worker many tasks tuple")
|
||||||
{
|
{
|
||||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||||
|
|
||||||
auto f = [&tp]() -> coro::task<uint64_t>
|
auto f = [&tp]() -> coro::task<uint64_t> {
|
||||||
{
|
|
||||||
co_await tp.schedule().value(); // Schedule this coroutine on the scheduler.
|
co_await tp.schedule().value(); // Schedule this coroutine on the scheduler.
|
||||||
co_return 50;
|
co_return 50;
|
||||||
};
|
};
|
||||||
|
@ -32,10 +30,7 @@ TEST_CASE("thread_pool one worker many tasks tuple")
|
||||||
REQUIRE(std::tuple_size<decltype(tasks)>() == 5);
|
REQUIRE(std::tuple_size<decltype(tasks)>() == 5);
|
||||||
|
|
||||||
uint64_t counter{0};
|
uint64_t counter{0};
|
||||||
std::apply([&counter](auto&&... t) -> void {
|
std::apply([&counter](auto&&... t) -> void { ((counter += t.return_value()), ...); }, tasks);
|
||||||
((counter += t.return_value()), ...);
|
|
||||||
},
|
|
||||||
tasks);
|
|
||||||
|
|
||||||
REQUIRE(counter == 250);
|
REQUIRE(counter == 250);
|
||||||
}
|
}
|
||||||
|
@ -44,8 +39,7 @@ TEST_CASE("thread_pool one worker many tasks vector")
|
||||||
{
|
{
|
||||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||||
|
|
||||||
auto f = [&tp]() -> coro::task<uint64_t>
|
auto f = [&tp]() -> coro::task<uint64_t> {
|
||||||
{
|
|
||||||
co_await tp.schedule().value(); // Schedule this coroutine on the scheduler.
|
co_await tp.schedule().value(); // Schedule this coroutine on the scheduler.
|
||||||
co_return 50;
|
co_return 50;
|
||||||
};
|
};
|
||||||
|
@ -60,7 +54,7 @@ TEST_CASE("thread_pool one worker many tasks vector")
|
||||||
REQUIRE(output_tasks.size() == 3);
|
REQUIRE(output_tasks.size() == 3);
|
||||||
|
|
||||||
uint64_t counter{0};
|
uint64_t counter{0};
|
||||||
for(const auto& task : output_tasks)
|
for (const auto& task : output_tasks)
|
||||||
{
|
{
|
||||||
counter += task.return_value();
|
counter += task.return_value();
|
||||||
}
|
}
|
||||||
|
@ -71,17 +65,16 @@ TEST_CASE("thread_pool one worker many tasks vector")
|
||||||
TEST_CASE("thread_pool N workers 100k tasks")
|
TEST_CASE("thread_pool N workers 100k tasks")
|
||||||
{
|
{
|
||||||
constexpr const std::size_t iterations = 100'000;
|
constexpr const std::size_t iterations = 100'000;
|
||||||
coro::thread_pool tp{};
|
coro::thread_pool tp{};
|
||||||
|
|
||||||
auto make_task = [](coro::thread_pool& tp) -> coro::task<uint64_t>
|
auto make_task = [](coro::thread_pool& tp) -> coro::task<uint64_t> {
|
||||||
{
|
|
||||||
co_await tp.schedule().value();
|
co_await tp.schedule().value();
|
||||||
co_return 1;
|
co_return 1;
|
||||||
};
|
};
|
||||||
|
|
||||||
std::vector<coro::task<uint64_t>> input_tasks{};
|
std::vector<coro::task<uint64_t>> input_tasks{};
|
||||||
input_tasks.reserve(iterations);
|
input_tasks.reserve(iterations);
|
||||||
for(std::size_t i = 0; i < iterations; ++i)
|
for (std::size_t i = 0; i < iterations; ++i)
|
||||||
{
|
{
|
||||||
input_tasks.emplace_back(make_task(tp));
|
input_tasks.emplace_back(make_task(tp));
|
||||||
}
|
}
|
||||||
|
@ -90,7 +83,7 @@ TEST_CASE("thread_pool N workers 100k tasks")
|
||||||
REQUIRE(output_tasks.size() == iterations);
|
REQUIRE(output_tasks.size() == iterations);
|
||||||
|
|
||||||
uint64_t counter{0};
|
uint64_t counter{0};
|
||||||
for(const auto& task : output_tasks)
|
for (const auto& task : output_tasks)
|
||||||
{
|
{
|
||||||
counter += task.return_value();
|
counter += task.return_value();
|
||||||
}
|
}
|
||||||
|
@ -102,12 +95,10 @@ TEST_CASE("thread_pool 1 worker task spawns another task")
|
||||||
{
|
{
|
||||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||||
|
|
||||||
auto f1 = [](coro::thread_pool& tp) -> coro::task<uint64_t>
|
auto f1 = [](coro::thread_pool& tp) -> coro::task<uint64_t> {
|
||||||
{
|
|
||||||
co_await tp.schedule().value();
|
co_await tp.schedule().value();
|
||||||
|
|
||||||
auto f2 = [](coro::thread_pool& tp) -> coro::task<uint64_t>
|
auto f2 = [](coro::thread_pool& tp) -> coro::task<uint64_t> {
|
||||||
{
|
|
||||||
co_await tp.schedule().value();
|
co_await tp.schedule().value();
|
||||||
co_return 5;
|
co_return 5;
|
||||||
};
|
};
|
||||||
|
@ -122,10 +113,9 @@ TEST_CASE("thread_pool shutdown")
|
||||||
{
|
{
|
||||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||||
|
|
||||||
auto f = [](coro::thread_pool& tp) -> coro::task<bool>
|
auto f = [](coro::thread_pool& tp) -> coro::task<bool> {
|
||||||
{
|
|
||||||
auto scheduled = tp.schedule();
|
auto scheduled = tp.schedule();
|
||||||
if(!scheduled.has_value())
|
if (!scheduled.has_value())
|
||||||
{
|
{
|
||||||
co_return true;
|
co_return true;
|
||||||
}
|
}
|
||||||
|
@ -158,7 +148,7 @@ TEST_CASE("thread_pool schedule functor return_type = void")
|
||||||
coro::thread_pool tp{coro::thread_pool::options{1}};
|
coro::thread_pool tp{coro::thread_pool::options{1}};
|
||||||
|
|
||||||
std::atomic<uint64_t> counter{0};
|
std::atomic<uint64_t> counter{0};
|
||||||
auto f = [](std::atomic<uint64_t>& c) -> void { c++; };
|
auto f = [](std::atomic<uint64_t>& c) -> void { c++; };
|
||||||
|
|
||||||
coro::sync_wait(tp.schedule(f, std::ref(counter)));
|
coro::sync_wait(tp.schedule(f, std::ref(counter)));
|
||||||
REQUIRE(counter == 1);
|
REQUIRE(counter == 1);
|
||||||
|
|
|
@ -4,47 +4,33 @@
|
||||||
|
|
||||||
TEST_CASE("when_all_awaitable single task with tuple container")
|
TEST_CASE("when_all_awaitable single task with tuple container")
|
||||||
{
|
{
|
||||||
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> {
|
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> { co_return amount; };
|
||||||
co_return amount;
|
|
||||||
};
|
|
||||||
|
|
||||||
auto output_tasks = coro::sync_wait(coro::when_all_awaitable(make_task(100)));
|
auto output_tasks = coro::sync_wait(coro::when_all_awaitable(make_task(100)));
|
||||||
REQUIRE(std::tuple_size<decltype(output_tasks)>() == 1);
|
REQUIRE(std::tuple_size<decltype(output_tasks)>() == 1);
|
||||||
|
|
||||||
uint64_t counter{0};
|
uint64_t counter{0};
|
||||||
std::apply(
|
std::apply([&counter](auto&&... tasks) -> void { ((counter += tasks.return_value()), ...); }, output_tasks);
|
||||||
[&counter](auto&&... tasks) -> void {
|
|
||||||
((counter += tasks.return_value()), ...);
|
|
||||||
},
|
|
||||||
output_tasks);
|
|
||||||
|
|
||||||
REQUIRE(counter == 100);
|
REQUIRE(counter == 100);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_CASE("when_all_awaitable multiple tasks with tuple container")
|
TEST_CASE("when_all_awaitable multiple tasks with tuple container")
|
||||||
{
|
{
|
||||||
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> {
|
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> { co_return amount; };
|
||||||
co_return amount;
|
|
||||||
};
|
|
||||||
|
|
||||||
auto output_tasks = coro::sync_wait(coro::when_all_awaitable(make_task(100), make_task(50), make_task(20)));
|
auto output_tasks = coro::sync_wait(coro::when_all_awaitable(make_task(100), make_task(50), make_task(20)));
|
||||||
REQUIRE(std::tuple_size<decltype(output_tasks)>() == 3);
|
REQUIRE(std::tuple_size<decltype(output_tasks)>() == 3);
|
||||||
|
|
||||||
uint64_t counter{0};
|
uint64_t counter{0};
|
||||||
std::apply(
|
std::apply([&counter](auto&&... tasks) -> void { ((counter += tasks.return_value()), ...); }, output_tasks);
|
||||||
[&counter](auto&&... tasks) -> void {
|
|
||||||
((counter += tasks.return_value()), ...);
|
|
||||||
},
|
|
||||||
output_tasks);
|
|
||||||
|
|
||||||
REQUIRE(counter == 170);
|
REQUIRE(counter == 170);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_CASE("when_all_awaitable single task with vector container")
|
TEST_CASE("when_all_awaitable single task with vector container")
|
||||||
{
|
{
|
||||||
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> {
|
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> { co_return amount; };
|
||||||
co_return amount;
|
|
||||||
};
|
|
||||||
|
|
||||||
std::vector<coro::task<uint64_t>> input_tasks;
|
std::vector<coro::task<uint64_t>> input_tasks;
|
||||||
input_tasks.emplace_back(make_task(100));
|
input_tasks.emplace_back(make_task(100));
|
||||||
|
@ -53,7 +39,7 @@ TEST_CASE("when_all_awaitable single task with vector container")
|
||||||
REQUIRE(output_tasks.size() == 1);
|
REQUIRE(output_tasks.size() == 1);
|
||||||
|
|
||||||
uint64_t counter{0};
|
uint64_t counter{0};
|
||||||
for(const auto& task : output_tasks)
|
for (const auto& task : output_tasks)
|
||||||
{
|
{
|
||||||
counter += task.return_value();
|
counter += task.return_value();
|
||||||
}
|
}
|
||||||
|
@ -63,9 +49,7 @@ TEST_CASE("when_all_awaitable single task with vector container")
|
||||||
|
|
||||||
TEST_CASE("when_all_ready multple task withs vector container")
|
TEST_CASE("when_all_ready multple task withs vector container")
|
||||||
{
|
{
|
||||||
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> {
|
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> { co_return amount; };
|
||||||
co_return amount;
|
|
||||||
};
|
|
||||||
|
|
||||||
std::vector<coro::task<uint64_t>> input_tasks;
|
std::vector<coro::task<uint64_t>> input_tasks;
|
||||||
input_tasks.emplace_back(make_task(100));
|
input_tasks.emplace_back(make_task(100));
|
||||||
|
@ -77,7 +61,7 @@ TEST_CASE("when_all_ready multple task withs vector container")
|
||||||
REQUIRE(output_tasks.size() == 4);
|
REQUIRE(output_tasks.size() == 4);
|
||||||
|
|
||||||
uint64_t counter{0};
|
uint64_t counter{0};
|
||||||
for(const auto& task : output_tasks)
|
for (const auto& task : output_tasks)
|
||||||
{
|
{
|
||||||
counter += task.return_value();
|
counter += task.return_value();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue