1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

rename engine to scheduler

rename schedule_task to resume_token
This commit is contained in:
jbaldwin 2020-09-26 23:35:33 -06:00
parent 6d5c3be6c3
commit 0093173c55
6 changed files with 244 additions and 228 deletions

View file

@ -10,7 +10,7 @@ message("${PROJECT_NAME} CORO_CODE_COVERAGE = ${CORO_CODE_COVERAGE}")
set(LIBCORO_SOURCE_FILES set(LIBCORO_SOURCE_FILES
src/coro/async_manual_reset_event.hpp src/coro/async_manual_reset_event.hpp
src/coro/coro.hpp src/coro/coro.hpp
src/coro/engine.hpp src/coro/engine.cpp src/coro/scheduler.hpp
src/coro/sync_wait.hpp src/coro/sync_wait.hpp
src/coro/task.hpp src/coro/task.hpp
) )
@ -21,11 +21,14 @@ target_compile_features(${PROJECT_NAME} PUBLIC cxx_std_20)
target_include_directories(${PROJECT_NAME} PUBLIC src) target_include_directories(${PROJECT_NAME} PUBLIC src)
target_link_libraries(${PROJECT_NAME} PUBLIC zmq pthread) target_link_libraries(${PROJECT_NAME} PUBLIC zmq pthread)
if(${CMAKE_CXX_COMPILER_ID} MATCHES "GNU") if(${CMAKE_CXX_COMPILER_ID} MATCHES "GNU")
if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS "10.2.0")
message(FATAL_ERROR "gcc version ${CMAKE_CXX_COMPILER_VERSION} is unsupported, please upgrade to at least 10.2.0")
endif()
target_compile_options(${PROJECT_NAME} PUBLIC -fcoroutines) target_compile_options(${PROJECT_NAME} PUBLIC -fcoroutines)
elseif(${CMAKE_CXX_COMPILER_ID} MATCHES "Clang") elseif(${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
target_compile_options(${PROJECT_NAME} PUBLIC -fcoroutines -fcoroutines-ts) message(FATAL_ERROR "Clang is currently not supported.")
endif() endif()

View file

@ -1,6 +1,6 @@
#pragma once #pragma once
#include "coro/async_manual_reset_event.hpp" #include "coro/async_manual_reset_event.hpp"
#include "coro/engine.hpp" #include "coro/scheduler.hpp"
#include "coro/sync_wait.hpp" #include "coro/sync_wait.hpp"
#include "coro/task.hpp" #include "coro/task.hpp"

View file

@ -1,8 +0,0 @@
#include "coro/engine.hpp"
namespace coro
{
std::atomic<uint32_t> engine::m_engine_id_counter{0};
} // namespace coro

View file

@ -11,7 +11,6 @@
#include <span> #include <span>
#include <type_traits> #include <type_traits>
#include <list> #include <list>
#include <variant>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <sys/eventfd.h> #include <sys/eventfd.h>
@ -31,25 +30,25 @@
namespace coro namespace coro
{ {
class engine; class scheduler;
namespace detail namespace detail
{ {
class engine_event_base class resume_token_base
{ {
public: public:
engine_event_base(engine* eng) noexcept resume_token_base(scheduler* eng) noexcept
: m_engine(eng), : m_scheduler(eng),
m_state(nullptr) m_state(nullptr)
{ {
} }
virtual ~engine_event_base() = default; virtual ~resume_token_base() = default;
engine_event_base(const engine_event_base&) = delete; resume_token_base(const resume_token_base&) = delete;
engine_event_base(engine_event_base&&) = delete; resume_token_base(resume_token_base&&) = delete;
auto operator=(const engine_event_base&) -> engine_event_base& = delete; auto operator=(const resume_token_base&) -> resume_token_base& = delete;
auto operator=(engine_event_base&&) -> engine_event_base& = delete; auto operator=(resume_token_base&&) -> resume_token_base& = delete;
bool is_set() const noexcept bool is_set() const noexcept
{ {
@ -58,25 +57,25 @@ public:
struct awaiter struct awaiter
{ {
awaiter(const engine_event_base& event) noexcept awaiter(const resume_token_base& token) noexcept
: m_event(event) : m_token(token)
{ {
} }
auto await_ready() const noexcept -> bool auto await_ready() const noexcept -> bool
{ {
return m_event.is_set(); return m_token.is_set();
} }
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
{ {
const void* const set_state = &m_event; const void* const set_state = &m_token;
m_awaiting_coroutine = awaiting_coroutine; m_awaiting_coroutine = awaiting_coroutine;
// This value will update if other threads write to it via acquire. // This value will update if other threads write to it via acquire.
void* old_value = m_event.m_state.load(std::memory_order_acquire); void* old_value = m_token.m_state.load(std::memory_order_acquire);
do do
{ {
// Resume immediately if already in the set state. // Resume immediately if already in the set state.
@ -86,7 +85,7 @@ public:
} }
m_next = static_cast<awaiter*>(old_value); m_next = static_cast<awaiter*>(old_value);
} while(!m_event.m_state.compare_exchange_weak( } while(!m_token.m_state.compare_exchange_weak(
old_value, old_value,
this, this,
std::memory_order_release, std::memory_order_release,
@ -97,10 +96,10 @@ public:
auto await_resume() noexcept auto await_resume() noexcept
{ {
// no-op
} }
const engine_event_base& m_event; const resume_token_base& m_token;
std::coroutine_handle<> m_awaiting_coroutine; std::coroutine_handle<> m_awaiting_coroutine;
awaiter* m_next{nullptr}; awaiter* m_next{nullptr};
}; };
@ -118,30 +117,30 @@ public:
protected: protected:
friend struct awaiter; friend struct awaiter;
engine* m_engine{nullptr}; scheduler* m_scheduler{nullptr};
mutable std::atomic<void*> m_state; mutable std::atomic<void*> m_state;
}; };
} // namespace detail } // namespace detail
template<typename return_type> template<typename return_type>
class engine_event final : public detail::engine_event_base class resume_token final : public detail::resume_token_base
{ {
friend engine; friend scheduler;
engine_event() resume_token()
: detail::engine_event_base(nullptr) : detail::resume_token_base(nullptr)
{ {
} }
public: public:
engine_event(engine& eng) resume_token(scheduler& s)
: detail::engine_event_base(&eng) : detail::resume_token_base(&s)
{ {
} }
~engine_event() override = default; ~resume_token() override = default;
auto set(return_type result) noexcept -> void; auto resume(return_type result) noexcept -> void;
auto result() const & -> const return_type& auto result() const & -> const return_type&
{ {
@ -157,23 +156,23 @@ private:
}; };
template<> template<>
class engine_event<void> final : public detail::engine_event_base class resume_token<void> final : public detail::resume_token_base
{ {
friend engine; friend scheduler;
engine_event() resume_token()
: detail::engine_event_base(nullptr) : detail::resume_token_base(nullptr)
{ {
} }
public: public:
engine_event(engine& eng) resume_token(scheduler& s)
: detail::engine_event_base(&eng) : detail::resume_token_base(&s)
{ {
} }
~engine_event() override = default; ~resume_token() override = default;
auto set() noexcept -> void; auto resume() noexcept -> void;
}; };
enum class poll_op enum class poll_op
@ -186,10 +185,11 @@ enum class poll_op
read_write = EPOLLIN | EPOLLOUT read_write = EPOLLIN | EPOLLOUT
}; };
class engine class scheduler
{ {
/// resume_token<T> needs to be able to call internal scheduler::resume()
template<typename return_type> template<typename return_type>
friend class engine_event; friend class resume_token;
public: public:
using fd_type = int; using fd_type = int;
@ -207,11 +207,13 @@ private:
public: public:
/** /**
* @param reserve_size Reserve up-front this many tasks for concurrent execution. The engine * @param reserve_size Reserve up-front this many tasks for concurrent execution. The scheduler
* will also automatically grow this if needed. * will also automatically grow this if needed.
* @param growth_factor The factor to grow by when the internal tasks are full.
*/ */
engine( scheduler(
std::size_t reserve_size = 16 std::size_t reserve_size = 8,
double growth_factor = 2
) )
: m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)), : m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)),
m_submit_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)) m_submit_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK))
@ -222,15 +224,15 @@ public:
e.data.ptr = m_submit_ptr; e.data.ptr = m_submit_ptr;
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_submit_fd, &e); epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_submit_fd, &e);
m_background_thread = std::thread([this, reserve_size] { this->run(reserve_size); }); m_background_thread = std::thread([this, reserve_size, growth_factor] { this->run(reserve_size, growth_factor); });
} }
engine(const engine&) = delete; scheduler(const scheduler&) = delete;
engine(engine&&) = delete; scheduler(scheduler&&) = delete;
auto operator=(const engine&) -> engine& = delete; auto operator=(const scheduler&) -> scheduler& = delete;
auto operator=(engine&&) -> engine& = delete; auto operator=(scheduler&&) -> scheduler& = delete;
~engine() ~scheduler()
{ {
shutdown(); shutdown();
if(m_epoll_fd != -1) if(m_epoll_fd != -1)
@ -245,7 +247,11 @@ public:
} }
} }
auto execute(coro::task<void>& task) -> bool // TODO:
// 1) Have schedule take ownership of task rather than forcing the user to maintain lifetimes.
// 2) schedule_afer(task, chrono<REP, RATIO>)
auto schedule(coro::task<void>& task) -> bool
{ {
if(m_shutdown) if(m_shutdown)
{ {
@ -270,11 +276,11 @@ public:
auto poll(fd_type fd, poll_op op) -> coro::task<void> auto poll(fd_type fd, poll_op op) -> coro::task<void>
{ {
co_await unsafe_yield<void>( co_await unsafe_yield<void>(
[&](engine_event<void>& event) [&](resume_token<void>& token)
{ {
struct epoll_event e{}; struct epoll_event e{};
e.events = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLET; e.events = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLET;
e.data.ptr = &event; e.data.ptr = &token;
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e); epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e);
} }
); );
@ -310,46 +316,46 @@ public:
} }
/** /**
* Immediately yields the current task and provides an event to set when the async operation * Immediately yields the current task and provides a resume token to resume this yielded
* being yield'ed to has completed. * coroutine when the async operation has completed.
* *
* Normal usage of this might look like: * Normal usage of this might look like:
* \code * \code
engine.yield([](coro::engine_event<void>& e) { scheduler.yield([](coro::resume_token<void>& t) {
auto on_service_complete = [&]() { e.set(); }; auto on_service_complete = [&]() { t.resume(); };
service.execute(on_service_complete); service.do_work(on_service_complete);
}); });
* \endcode * \endcode
* The above example will yield the current task and then through the 3rd party service's * The above example will yield the current task and then through the 3rd party service's
* on complete callback function let the engine know that it should resume execution of the task. * on complete callback function let the scheduler know that it should resume execution of the task.
* *
* This function along with `engine::resume()` are special additions for working with 3rd party
* services that do not provide coroutine support, or that are event driven and cannot work
* directly with the engine.
* @tparam func Functor to invoke with the yielded coroutine handle to be resumed. * @tparam func Functor to invoke with the yielded coroutine handle to be resumed.
* @param f Immediately invoked functor with the yield point coroutine handle to resume with. * @param f Immediately invoked functor with the yield point coroutine handle to resume with.
* @return A task to co_await until the manual `engine::resume(handle)` is called. * @return A task to co_await until the manual `scheduler::resume(handle)` is called.
*/ */
template<typename return_type, std::invocable<engine_event<return_type>&> before_functor> template<typename return_type, std::invocable<resume_token<return_type>&> before_functor>
auto yield(before_functor before) -> coro::task<return_type> auto yield(before_functor before) -> coro::task<return_type>
{ {
engine_event<return_type> e{*this}; resume_token<return_type> token{*this};
before(e); before(token);
co_await e; co_await token;
if constexpr (std::is_same_v<return_type, void>) if constexpr (std::is_same_v<return_type, void>)
{ {
co_return; co_return;
} }
else else
{ {
co_return e.result(); co_return token.result();
} }
} }
/**
* User provided resume token to yield the current coroutine until the token's resume is called.
*/
template<typename return_type> template<typename return_type>
auto yield(engine_event<return_type>& e) -> coro::task<void> auto yield(resume_token<return_type>& token) -> coro::task<void>
{ {
co_await e; co_await token;
co_return; co_return;
} }
@ -394,26 +400,26 @@ public:
auto size() const -> std::size_t { return m_size.load(); } auto size() const -> std::size_t { return m_size.load(); }
/** /**
* @return True if there are no tasks executing or waiting to be executed in this engine. * @return True if there are no tasks executing or waiting to be executed in this scheduler.
*/ */
auto empty() const -> bool { return m_size == 0; } auto empty() const -> bool { return m_size == 0; }
/** /**
* @return True if this engine is currently running. * @return True if this scheduler is currently running.
*/ */
auto is_running() const noexcept -> bool { return m_is_running; } auto is_running() const noexcept -> bool { return m_is_running; }
/** /**
* @return True if this engine has been requested to shutdown. * @return True if this scheduler has been requested to shutdown.
*/ */
auto is_shutdown() const noexcept -> bool { return m_shutdown; } auto is_shutdown() const noexcept -> bool { return m_shutdown; }
/** /**
* Requests the engine to finish processing all of its current tasks and shutdown. * Requests the scheduler to finish processing all of its current tasks and shutdown.
* New tasks submitted via `engine::execute()` will be rejected after this is called. * New tasks submitted via `scheduler::schedule()` will be rejected after this is called.
* @param wait_for_tasks This call will block until all tasks are complete if shutdown_type::sync * @param wait_for_tasks This call will block until all tasks are complete if shutdown_type::sync
* is passed in, if shutdown_type::async is passed this function will tell * is passed in, if shutdown_type::async is passed this function will tell
* the engine to shutdown but not wait for all tasks to complete, it returns * the scheduler to shutdown but not wait for all tasks to complete, it returns
* immediately. * immediately.
*/ */
auto shutdown(shutdown_type wait_for_tasks = shutdown_type::sync) -> void auto shutdown(shutdown_type wait_for_tasks = shutdown_type::sync) -> void
@ -432,13 +438,13 @@ public:
} }
/** /**
* @return A unique id to identify this engine. * @return A unique id to identify this scheduler.
*/ */
auto engine_id() const -> uint32_t { return m_engine_id; } auto scheduler_id() const -> uint32_t { return m_scheduler_id; }
private: private:
static std::atomic<uint32_t> m_engine_id_counter; static std::atomic<uint32_t> m_scheduler_id_counter;
const uint32_t m_engine_id{m_engine_id_counter++}; const uint32_t m_scheduler_id{m_scheduler_id_counter++};
fd_type m_epoll_fd{-1}; fd_type m_epoll_fd{-1};
fd_type m_submit_fd{-1}; fd_type m_submit_fd{-1};
@ -453,10 +459,10 @@ private:
std::atomic<std::size_t> m_size{0}; std::atomic<std::size_t> m_size{0};
template<typename return_type, std::invocable<engine_event<return_type>&> before_functor> template<typename return_type, std::invocable<resume_token<return_type>&> before_functor>
auto unsafe_yield(before_functor before) -> coro::task<return_type> auto unsafe_yield(before_functor before) -> coro::task<return_type>
{ {
engine_event<return_type> e{}; resume_token<return_type> e{};
before(e); before(e);
co_await e; co_await e;
if constexpr (std::is_same_v<return_type, void>) if constexpr (std::is_same_v<return_type, void>)
@ -481,12 +487,27 @@ private:
::write(m_submit_fd, &value, sizeof(value)); ::write(m_submit_fd, &value, sizeof(value));
} }
auto run(const std::size_t growth_size) -> void auto run(const std::size_t growth_size, const double growth_factor) -> void
{ {
using namespace std::chrono_literals; using namespace std::chrono_literals;
m_is_running = true; m_is_running = true;
/**
* Each task submitted into the scheduler has a finalize task that is set as the user's
* continuation to 'delete' itself from within the scheduler upon its completion. The
* finalize tasks lifetimes are maintained within the vector. The list of indexes
* maintains stable indexes into the vector but are swapped around when tasks complete
* as a 'free list'. This free list is divided into two partitions, used and unused
* based on the position of the free_index variable. When the vector is completely full
* it will grow by the given growth size, this might switch to doubling in the future.
*
* Finally, there is one last vector that takes itereators into the list of indexes, this
* final vector is special in that it contains 'dead' tasks to be deleted. Since a task
* cannot actually delete itself (double free/corruption) it marks itself as 'dead' and
* the sheduler will free it on the next event loop iteration.
*/
std::vector<std::optional<coro::task<void>>> finalize_tasks{}; std::vector<std::optional<coro::task<void>>> finalize_tasks{};
std::list<std::size_t> finalize_indexes{}; std::list<std::size_t> finalize_indexes{};
std::vector<std::list<std::size_t>::iterator> delete_indexes{}; std::vector<std::list<std::size_t>::iterator> delete_indexes{};
@ -517,7 +538,7 @@ private:
constexpr std::size_t max_events = 8; constexpr std::size_t max_events = 8;
std::array<struct epoll_event, max_events> events{}; std::array<struct epoll_event, max_events> events{};
// Execute until stopped or there are more tasks to complete. // Execute tasks until stopped or there are more tasks to complete.
while(!m_shutdown || m_size > 0) while(!m_shutdown || m_size > 0)
{ {
auto event_count = epoll_wait(m_epoll_fd, events.data(), max_events, timeout.count()); auto event_count = epoll_wait(m_epoll_fd, events.data(), max_events, timeout.count());
@ -577,8 +598,8 @@ private:
else else
{ {
// Individual poll task wake-up. // Individual poll task wake-up.
auto* event_ptr = static_cast<engine_event<void>*>(handle_ptr); auto* token_ptr = static_cast<resume_token<void>*>(handle_ptr);
event_ptr->set(); // this will resume the coroutine. token_ptr->resume();
} }
} }
@ -603,7 +624,7 @@ private:
}; };
template<typename return_type> template<typename return_type>
inline auto engine_event<return_type>::set(return_type result) noexcept -> void inline auto resume_token<return_type>::resume(return_type result) noexcept -> void
{ {
void* old_value = m_state.exchange(this, std::memory_order_acq_rel); void* old_value = m_state.exchange(this, std::memory_order_acq_rel);
if(old_value != this) if(old_value != this)
@ -614,22 +635,22 @@ inline auto engine_event<return_type>::set(return_type result) noexcept -> void
while(waiters != nullptr) while(waiters != nullptr)
{ {
auto* next = waiters->m_next; auto* next = waiters->m_next;
// If engine is nullptr this is an unsafe_yield() // If scheduler is nullptr this is an unsafe_yield()
// If engine is present this is a yield() // If scheduler is present this is a yield()
if(m_engine == nullptr) if(m_scheduler == nullptr)
{ {
waiters->m_awaiting_coroutine.resume(); waiters->m_awaiting_coroutine.resume();
} }
else else
{ {
m_engine->resume(waiters->m_awaiting_coroutine); m_scheduler->resume(waiters->m_awaiting_coroutine);
} }
waiters = next; waiters = next;
} }
} }
} }
inline auto engine_event<void>::set() noexcept -> void inline auto resume_token<void>::resume() noexcept -> void
{ {
void* old_value = m_state.exchange(this, std::memory_order_acq_rel); void* old_value = m_state.exchange(this, std::memory_order_acq_rel);
if(old_value != this) if(old_value != this)
@ -638,20 +659,22 @@ inline auto engine_event<void>::set() noexcept -> void
while(waiters != nullptr) while(waiters != nullptr)
{ {
auto* next = waiters->m_next; auto* next = waiters->m_next;
// If engine is nullptr this is an unsafe_yield() // If scheduler is nullptr this is an unsafe_yield()
// If engine is present this is a yield() // If scheduler is present this is a yield()
if(m_engine == nullptr) if(m_scheduler == nullptr)
{ {
waiters->m_awaiting_coroutine.resume(); waiters->m_awaiting_coroutine.resume();
} }
else else
{ {
m_engine->resume(waiters->m_awaiting_coroutine); m_scheduler->resume(waiters->m_awaiting_coroutine);
} }
waiters = next; waiters = next;
} }
} }
} }
inline std::atomic<uint32_t> scheduler::m_scheduler_id_counter{0};
} // namespace coro } // namespace coro

View file

@ -3,7 +3,7 @@ project(libcoro_test)
set(LIBCORO_TEST_SOURCE_FILES set(LIBCORO_TEST_SOURCE_FILES
test_async_manual_reset_event.cpp test_async_manual_reset_event.cpp
test_engine.cpp test_scheduler.cpp
test_task.cpp test_task.cpp
) )

View file

@ -10,32 +10,32 @@
using namespace std::chrono_literals; using namespace std::chrono_literals;
TEST_CASE("engine submit single functor") TEST_CASE("scheduler submit single functor")
{ {
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
coro::engine e{}; coro::scheduler s{};
auto task = [&]() -> coro::task<void> auto task = [&]() -> coro::task<void>
{ {
std::cerr << "Hello world from engine task!\n"; std::cerr << "Hello world from scheduler task!\n";
counter++; counter++;
co_return; co_return;
}(); }();
e.execute(task); s.schedule(task);
// while(counter != 1) std::this_thread::sleep_for(1ms); // while(counter != 1) std::this_thread::sleep_for(1ms);
e.shutdown(); s.shutdown();
REQUIRE(counter == 1); REQUIRE(counter == 1);
} }
TEST_CASE("engine submit mutiple tasks") TEST_CASE("scheduler submit mutiple tasks")
{ {
constexpr std::size_t n = 1000; constexpr std::size_t n = 1000;
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
coro::engine e{}; coro::scheduler s{};
std::vector<coro::task<void>> tasks{}; std::vector<coro::task<void>> tasks{};
tasks.reserve(n); tasks.reserve(n);
@ -44,42 +44,43 @@ TEST_CASE("engine submit mutiple tasks")
for(std::size_t i = 0; i < n; ++i) for(std::size_t i = 0; i < n; ++i)
{ {
tasks.emplace_back(func()); tasks.emplace_back(func());
e.execute(tasks.back()); s.schedule(tasks.back());
} }
e.shutdown(); s.shutdown();
REQUIRE(counter == n); REQUIRE(counter == n);
} }
TEST_CASE("engine task with multiple yields on event") TEST_CASE("scheduler task with multiple yields on event")
{ {
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
coro::engine e{}; coro::scheduler s{};
coro::engine_event<uint64_t> event1{e}; coro::resume_token<uint64_t> token{s};
coro::engine_event<uint64_t> event2{e};
coro::engine_event<uint64_t> event3{e};
auto task = [&]() -> coro::task<void> auto task = [&]() -> coro::task<void>
{ {
std::cerr << "1st suspend\n"; std::cerr << "1st suspend\n";
co_await e.yield(event1); co_await s.yield(token);
std::cerr << "1st resume\n"; std::cerr << "1st resume\n";
counter += event1.result(); counter += token.result();
token.reset();
std::cerr << "never suspend\n"; std::cerr << "never suspend\n";
co_await std::suspend_never{}; co_await std::suspend_never{};
std::cerr << "2nd suspend\n"; std::cerr << "2nd suspend\n";
co_await e.yield(event2); co_await s.yield(token);
token.reset();
std::cerr << "2nd resume\n"; std::cerr << "2nd resume\n";
counter += event2.result(); counter += token.result();
std::cerr << "3rd suspend\n"; std::cerr << "3rd suspend\n";
co_await e.yield(event3); co_await s.yield(token);
token.reset();
std::cerr << "3rd resume\n"; std::cerr << "3rd resume\n";
counter += event3.result(); counter += token.result();
co_return; co_return;
}(); }();
auto resume_task = [&](coro::engine_event<uint64_t>& event, int expected) { auto resume_task = [&](coro::resume_token<uint64_t>& token, int expected) {
event.set(1); token.resume(1);
while(counter != expected) while(counter != expected)
{ {
std::cerr << "counter=" << counter << "\n"; std::cerr << "counter=" << counter << "\n";
@ -87,49 +88,49 @@ TEST_CASE("engine task with multiple yields on event")
} }
}; };
e.execute(task); s.schedule(task);
resume_task(event1, 1); resume_task(token, 1);
resume_task(event2, 2); resume_task(token, 2);
resume_task(event3, 3); resume_task(token, 3);
e.shutdown(); s.shutdown();
REQUIRE(e.empty()); REQUIRE(s.empty());
} }
TEST_CASE("engine task with read poll") TEST_CASE("scheduler task with read poll")
{ {
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::engine e{}; coro::scheduler s{};
auto task = [&]() -> coro::task<void> auto task = [&]() -> coro::task<void>
{ {
// Poll will block until there is data to read. // Poll will block until there is data to read.
co_await e.poll(trigger_fd, coro::poll_op::read); co_await s.poll(trigger_fd, coro::poll_op::read);
REQUIRE(true); REQUIRE(true);
co_return; co_return;
}(); }();
e.execute(task); s.schedule(task);
uint64_t value{42}; uint64_t value{42};
write(trigger_fd, &value, sizeof(value)); write(trigger_fd, &value, sizeof(value));
e.shutdown(); s.shutdown();
close(trigger_fd); close(trigger_fd);
} }
TEST_CASE("engine task with read") TEST_CASE("scheduler task with read")
{ {
constexpr uint64_t expected_value{42}; constexpr uint64_t expected_value{42};
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::engine e{}; coro::scheduler s{};
auto task = [&]() -> coro::task<void> auto task = [&]() -> coro::task<void>
{ {
uint64_t val{0}; uint64_t val{0};
auto bytes_read = co_await e.read( auto bytes_read = co_await s.read(
trigger_fd, trigger_fd,
std::span<char>(reinterpret_cast<char*>(&val), sizeof(val)) std::span<char>(reinterpret_cast<char*>(&val), sizeof(val))
); );
@ -139,29 +140,29 @@ TEST_CASE("engine task with read")
co_return; co_return;
}(); }();
e.execute(task); s.schedule(task);
write(trigger_fd, &expected_value, sizeof(expected_value)); write(trigger_fd, &expected_value, sizeof(expected_value));
e.shutdown(); s.shutdown();
close(trigger_fd); close(trigger_fd);
} }
TEST_CASE("engine task with read and write same fd") TEST_CASE("scheduler task with read and write same fd")
{ {
// Since this test uses an eventfd, only 1 task at a time can poll/read/write to that // Since this test uses an eventfd, only 1 task at a time can poll/read/write to that
// event descriptor through the engine. It could be possible to modify the engine // event descriptor through the scheduler. It could be possible to modify the scheduler
// to keep track of read and write events on a specific socket/fd and update the tasks // to keep track of read and write events on a specific socket/fd and update the tasks
// as well as resumes accordingly, right now this is just a known limitation, see the // as well as resumes accordingly, right now this is just a known limitation, see the
// pipe test for two concurrent tasks read and write awaiting on different file descriptors. // pipe test for two concurrent tasks read and write awaiting on different file descriptors.
constexpr uint64_t expected_value{9001}; constexpr uint64_t expected_value{9001};
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::engine e{}; coro::scheduler s{};
auto task = [&]() -> coro::task<void> auto task = [&]() -> coro::task<void>
{ {
auto bytes_written = co_await e.write( auto bytes_written = co_await s.write(
trigger_fd, trigger_fd,
std::span<const char>(reinterpret_cast<const char*>(&expected_value), sizeof(expected_value)) std::span<const char>(reinterpret_cast<const char*>(&expected_value), sizeof(expected_value))
); );
@ -169,7 +170,7 @@ TEST_CASE("engine task with read and write same fd")
REQUIRE(bytes_written == sizeof(uint64_t)); REQUIRE(bytes_written == sizeof(uint64_t));
uint64_t val{0}; uint64_t val{0};
auto bytes_read = co_await e.read( auto bytes_read = co_await s.read(
trigger_fd, trigger_fd,
std::span<char>(reinterpret_cast<char*>(&val), sizeof(val)) std::span<char>(reinterpret_cast<char*>(&val), sizeof(val))
); );
@ -179,25 +180,25 @@ TEST_CASE("engine task with read and write same fd")
co_return; co_return;
}(); }();
e.execute(task); s.schedule(task);
e.shutdown(); s.shutdown();
close(trigger_fd); close(trigger_fd);
} }
TEST_CASE("engine task with read and write pipe") TEST_CASE("scheduler task with read and write pipe")
{ {
const std::string msg{"coroutines are really cool but not that EASY!"}; const std::string msg{"coroutines are really cool but not that EASY!"};
int pipe_fd[2]; int pipe_fd[2];
pipe2(pipe_fd, O_NONBLOCK); pipe2(pipe_fd, O_NONBLOCK);
coro::engine e{}; coro::scheduler s{};
auto read_task = [&]() -> coro::task<void> auto read_task = [&]() -> coro::task<void>
{ {
std::string buffer(4096, '0'); std::string buffer(4096, '0');
std::span<char> view{buffer.data(), buffer.size()}; std::span<char> view{buffer.data(), buffer.size()};
auto bytes_read = co_await e.read(pipe_fd[0], view); auto bytes_read = co_await s.read(pipe_fd[0], view);
REQUIRE(bytes_read == msg.size()); REQUIRE(bytes_read == msg.size());
buffer.resize(bytes_read); buffer.resize(bytes_read);
REQUIRE(buffer == msg); REQUIRE(buffer == msg);
@ -206,108 +207,105 @@ TEST_CASE("engine task with read and write pipe")
auto write_task = [&]() -> coro::task<void> auto write_task = [&]() -> coro::task<void>
{ {
std::span<const char> view{msg.data(), msg.size()}; std::span<const char> view{msg.data(), msg.size()};
auto bytes_written = co_await e.write(pipe_fd[1], view); auto bytes_written = co_await s.write(pipe_fd[1], view);
REQUIRE(bytes_written == msg.size()); REQUIRE(bytes_written == msg.size());
}(); }();
e.execute(read_task); s.schedule(read_task);
e.execute(write_task); s.schedule(write_task);
e.shutdown(); s.shutdown();
close(pipe_fd[0]); close(pipe_fd[0]);
close(pipe_fd[1]); close(pipe_fd[1]);
} }
static auto standalone_read( static auto standalone_read(
coro::engine& e, coro::scheduler& s,
coro::engine::fd_type socket, coro::scheduler::fd_type socket,
std::span<char> buffer std::span<char> buffer
) -> coro::task<ssize_t> ) -> coro::task<ssize_t>
{ {
// do other stuff in larger function // do other stuff in larger function
co_return co_await e.read(socket, buffer); co_return co_await s.read(socket, buffer);
// do more stuff in larger function // do more stuff in larger function
} }
TEST_CASE("engine standalone read task") TEST_CASE("scheduler standalone read task")
{ {
constexpr ssize_t expected_value{1111}; constexpr ssize_t expected_value{1111};
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::engine e{}; coro::scheduler s{};
auto task = [&]() -> coro::task<void> auto task = [&]() -> coro::task<void>
{ {
ssize_t v{0}; ssize_t v{0};
auto bytes_read = co_await standalone_read(e, trigger_fd, std::span<char>(reinterpret_cast<char*>(&v), sizeof(v))); auto bytes_read = co_await standalone_read(s, trigger_fd, std::span<char>(reinterpret_cast<char*>(&v), sizeof(v)));
REQUIRE(bytes_read == sizeof(ssize_t)); REQUIRE(bytes_read == sizeof(ssize_t));
REQUIRE(v == expected_value); REQUIRE(v == expected_value);
co_return; co_return;
}(); }();
e.execute(task); s.schedule(task);
write(trigger_fd, &expected_value, sizeof(expected_value)); write(trigger_fd, &expected_value, sizeof(expected_value));
e.shutdown(); s.shutdown();
close(trigger_fd); close(trigger_fd);
} }
TEST_CASE("engine separate thread resume") TEST_CASE("scheduler separate thread resume")
{ {
coro::engine e{}; coro::scheduler s{};
// This lambda will mimic a 3rd party service which will execute on a service on a background thread.
// Uses the passed event handle to resume execution of the awaiting corountine on the engine.
auto third_party_service = [](coro::engine_event<void>& handle) -> coro::task<void>
{
// Normally this thread is probably already running for real world use cases.
std::thread third_party_thread([](coro::engine_event<void>& h) -> void {
// mimic some expensive computation
// std::this_thread::sleep_for(1s);
h.set();
}, std::ref(handle));
third_party_thread.detach();
co_await handle;
co_return;
};
auto task = [&]() -> coro::task<void> auto task = [&]() -> coro::task<void>
{ {
coro::engine_event<void> handle{e}; // User manual resume token, create one specifically for each task being generated
co_await third_party_service(handle); coro::resume_token<void> token{s};
// Normally this thread is probably already running for real world use cases, but in general
// the 3rd party function api will be set, they should have "user data" void* or ability
// to capture variables via lambdas for on complete callbacks, here we mimic an on complete
// callback by capturing the hande.
std::thread third_party_thread([&token]() -> void {
// mimic some expensive computation
// std::this_thread::sleep_for(1s);
token.resume();
});
third_party_thread.detach();
// Wait on the handle until the 3rd party service is completed.
co_await token;
REQUIRE(true); REQUIRE(true);
}(); }();
e.execute(task); s.schedule(task);
e.shutdown(); s.shutdown();
} }
TEST_CASE("engine separate thread resume with return") TEST_CASE("scheduler separate thread resume with return")
{ {
constexpr uint64_t expected_value{1337}; constexpr uint64_t expected_value{1337};
coro::engine e{}; coro::scheduler s{};
std::atomic<coro::engine_event<uint64_t>*> event{}; std::atomic<coro::resume_token<uint64_t>*> token{};
std::thread service{ std::thread service{
[&]() -> void [&]() -> void
{ {
while(event == nullptr) while(token == nullptr)
{ {
std::this_thread::sleep_for(1ms); std::this_thread::sleep_for(1ms);
} }
event.load()->set(expected_value); token.load()->resume(expected_value);
} }
}; };
auto third_party_service = [&](int multiplier) -> coro::task<uint64_t> auto third_party_service = [&](int multiplier) -> coro::task<uint64_t>
{ {
auto output = co_await e.yield<uint64_t>([&](coro::engine_event<uint64_t>& ev) { auto output = co_await s.yield<uint64_t>([&](coro::resume_token<uint64_t>& t) {
event = &ev; token = &t;
}); });
co_return output * multiplier; co_return output * multiplier;
}; };
@ -319,17 +317,17 @@ TEST_CASE("engine separate thread resume with return")
REQUIRE(value == (expected_value * multiplier)); REQUIRE(value == (expected_value * multiplier));
}(); }();
e.execute(task); s.schedule(task);
service.join(); service.join();
e.shutdown(); s.shutdown();
} }
TEST_CASE("engine with normal task") TEST_CASE("scheduler with basic task")
{ {
constexpr std::size_t expected_value{5}; constexpr std::size_t expected_value{5};
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
coro::engine e{}; coro::scheduler s{};
auto add_data = [&](uint64_t val) -> coro::task<int> auto add_data = [&](uint64_t val) -> coro::task<int>
{ {
@ -342,21 +340,21 @@ TEST_CASE("engine with normal task")
co_return; co_return;
}(); }();
e.execute(task1); s.schedule(task1);
e.shutdown(); s.shutdown();
REQUIRE(counter == expected_value); REQUIRE(counter == expected_value);
} }
TEST_CASE("engine trigger growth of internal tasks storage") TEST_CASE("scheduler trigger growth of internal tasks storage")
{ {
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
constexpr std::size_t iterations{512}; constexpr std::size_t iterations{512};
coro::engine e{1}; coro::scheduler s{1};
auto wait_func = [&](uint64_t id, std::chrono::milliseconds wait_time) -> coro::task<void> auto wait_func = [&](uint64_t id, std::chrono::milliseconds wait_time) -> coro::task<void>
{ {
co_await e.yield_for(wait_time); co_await s.yield_for(wait_time);
++counter; ++counter;
co_return; co_return;
}; };
@ -366,25 +364,25 @@ TEST_CASE("engine trigger growth of internal tasks storage")
for(std::size_t i = 0; i < iterations; ++i) for(std::size_t i = 0; i < iterations; ++i)
{ {
tasks.emplace_back(wait_func(i, std::chrono::milliseconds{50})); tasks.emplace_back(wait_func(i, std::chrono::milliseconds{50}));
e.execute(tasks.back()); s.schedule(tasks.back());
} }
e.shutdown(); s.shutdown();
REQUIRE(counter == iterations); REQUIRE(counter == iterations);
} }
TEST_CASE("engine yield with engine event void") TEST_CASE("scheduler yield with scheduler event void")
{ {
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
coro::engine e{}; coro::scheduler s{};
auto task = [&]() -> coro::task<void> auto task = [&]() -> coro::task<void>
{ {
co_await e.yield<void>( co_await s.yield<void>(
[&](coro::engine_event<void>& event) -> void [&](coro::resume_token<void>& token) -> void
{ {
event.set(); token.resume();
} }
); );
@ -392,53 +390,53 @@ TEST_CASE("engine yield with engine event void")
co_return; co_return;
}(); }();
e.execute(task); s.schedule(task);
e.shutdown(); s.shutdown();
REQUIRE(counter == 42); REQUIRE(counter == 42);
} }
TEST_CASE("engine yield with engine event uint64_t") TEST_CASE("scheduler yield with scheduler event uint64_t")
{ {
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
coro::engine e{}; coro::scheduler s{};
auto task = [&]() -> coro::task<void> auto task = [&]() -> coro::task<void>
{ {
counter += co_await e.yield<uint64_t>( counter += co_await s.yield<uint64_t>(
[&](coro::engine_event<uint64_t>& event) -> void [&](coro::resume_token<uint64_t>& token) -> void
{ {
event.set(42); token.resume(42);
} }
); );
co_return; co_return;
}(); }();
e.execute(task); s.schedule(task);
e.shutdown(); s.shutdown();
REQUIRE(counter == 42); REQUIRE(counter == 42);
} }
TEST_CASE("engine yield user provided event") TEST_CASE("scheduler yield user provided event")
{ {
std::string expected_result = "Here I am!"; std::string expected_result = "Here I am!";
coro::engine e{}; coro::scheduler s{};
coro::engine_event<std::string> event{e}; coro::resume_token<std::string> token{s};
auto task = [&]() -> coro::task<void> auto task = [&]() -> coro::task<void>
{ {
co_await e.yield(event); co_await s.yield(token);
REQUIRE(event.result() == expected_result); REQUIRE(token.result() == expected_result);
co_return; co_return;
}(); }();
e.execute(task); s.schedule(task);
event.set(expected_result); token.resume(expected_result);
e.shutdown(); s.shutdown();
} }