1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

engine works with normal coro::task<void>

This commit is contained in:
jbaldwin 2020-09-22 12:12:30 -06:00
parent 8cb23230e1
commit 2f575861dc
7 changed files with 366 additions and 534 deletions

View file

@ -11,6 +11,7 @@ set(LIBCORO_SOURCE_FILES
src/coro/async_manual_reset_event.hpp
src/coro/coro.hpp
src/coro/engine.hpp src/coro/engine.cpp
src/coro/sync_wait.hpp
src/coro/task.hpp
)

View file

@ -2,4 +2,5 @@
#include "coro/async_manual_reset_event.hpp"
#include "coro/engine.hpp"
#include "coro/sync_wait.hpp"
#include "coro/task.hpp"

View file

@ -1,6 +1,6 @@
#pragma once
// #include "coro/task.hpp"
#include "coro/task.hpp"
#include <atomic>
#include <vector>
@ -9,6 +9,8 @@
#include <mutex>
#include <thread>
#include <span>
#include <type_traits>
#include <list>
#include <sys/epoll.h>
#include <sys/eventfd.h>
@ -27,292 +29,6 @@
namespace coro
{
template<typename return_type = void>
class engine_task;
class engine;
using engine_task_id_type = uint64_t;
namespace engine_detail
{
struct promise_base
{
friend struct final_awaitable;
struct final_awaitable
{
final_awaitable(promise_base* promise) : m_promise(promise)
{
}
auto await_ready() const noexcept -> bool
{
// std::cerr << "engine_detail::promise_base::final_awaitable::await_ready() => false\n";
return false;
}
template<typename promise_type>
auto await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept -> std::coroutine_handle<>;
auto await_resume() noexcept -> void
{
// no-op
}
promise_base* m_promise{nullptr};
};
promise_base() noexcept = default;
~promise_base() = default;
auto initial_suspend()
{
return std::suspend_always{};
}
auto final_suspend()
{
return final_awaitable{this};
}
auto unhandled_exception() -> void
{
m_exception_ptr = std::current_exception();
}
auto set_continuation(std::coroutine_handle<> continuation) noexcept -> void
{
m_continuation = continuation;
}
auto parent_engine(engine* e) -> void { m_engine = e; }
auto parent_engine() const -> engine* { return m_engine; }
auto task_id(engine_task_id_type task_id) -> void { m_task_id = task_id; }
auto task_id() const -> engine_task_id_type { return m_task_id; }
protected:
std::coroutine_handle<> m_continuation{nullptr};
std::optional<std::exception_ptr> m_exception_ptr{std::nullopt};
engine* m_engine{nullptr};
engine_task_id_type m_task_id{0};
};
template<typename return_type>
struct promise final : public promise_base
{
using task_type = engine_task<return_type>;
using coro_handle = std::coroutine_handle<promise<return_type>>;
promise() noexcept = default;
~promise() = default;
auto get_return_object() noexcept -> task_type;
auto return_value(return_type result) -> void
{
m_result = std::move(result);
}
auto result() const & -> const return_type&
{
if(this->m_exception_ptr.has_value())
{
std::rethrow_exception(this->m_exception_ptr.value());
}
return m_result;
}
auto result() && -> return_type&&
{
if(this->m_exception_ptr.has_value())
{
std::rethrow_exception(this->m_exception_ptr.value());
}
return std::move(m_result);
}
private:
return_type m_result;
};
template<>
struct promise<void> : public promise_base
{
using task_type = engine_task<void>;
using coro_handle = std::coroutine_handle<promise<void>>;
promise() noexcept = default;
~promise() = default;
auto get_return_object() noexcept -> task_type;
auto return_void() -> void { }
auto result() const -> void
{
if(this->m_exception_ptr.has_value())
{
std::rethrow_exception(this->m_exception_ptr.value());
}
}
};
} // namespace engine_detail
template<typename return_type>
class engine_task
{
public:
using task_type = engine_task<return_type>;
using promise_type = engine_detail::promise<return_type>;
using coro_handle = std::coroutine_handle<promise_type>;
struct awaitable_base
{
awaitable_base(std::coroutine_handle<promise_type> coroutine) noexcept
: m_coroutine(coroutine)
{
}
auto await_ready() const noexcept -> bool
{
// std::cerr << "engine_task::awaitable::await_ready() => " << (!m_coroutine || m_coroutine.done()) << "\n";
return !m_coroutine || m_coroutine.done();
}
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> std::coroutine_handle<>
{
// std::cerr << "engine_task::awaitable::await_suspend()\n";
m_coroutine.promise().set_continuation(awaiting_coroutine);
return m_coroutine;
}
std::coroutine_handle<promise_type> m_coroutine{nullptr};
};
engine_task() noexcept
: m_coroutine(nullptr)
{
}
engine_task(coro_handle handle)
: m_coroutine(handle)
{
}
engine_task(const engine_task&) = delete;
engine_task(engine_task&& other) noexcept
: m_coroutine(other.m_coroutine)
{
other.m_coroutine = nullptr;
}
auto operator=(const engine_task&) -> engine_task& = delete;
auto operator=(engine_task&& other) noexcept -> engine_task&
{
if(std::addressof(other) != this)
{
if(m_coroutine != nullptr)
{
m_coroutine.destroy();
}
m_coroutine = other.m_coroutine;
other.m_coroutine = nullptr;
}
return *this;
}
/**
* @return True if the task is in its final suspend or if the task has been destroyed.
*/
auto is_ready() const noexcept -> bool
{
return m_coroutine == nullptr || m_coroutine.done();
}
auto resume() -> bool
{
if(!m_coroutine.done())
{
m_coroutine.resume();
}
return !m_coroutine.done();
}
auto final_resume() -> bool
{
if(m_coroutine != nullptr && m_coroutine.done())
{
m_coroutine.resume();
return true;
}
return false;
}
auto destroy() -> bool
{
if(m_coroutine != nullptr)
{
m_coroutine.destroy();
m_coroutine = nullptr;
return true;
}
return false;
}
auto operator co_await() const noexcept
{
struct awaitable : public awaitable_base
{
auto await_resume() noexcept -> decltype(auto)
{
// std::cerr << "engine_task::awaitable::await_resume()\n";
return this->m_coroutine.promise().result();
}
};
return awaitable{m_coroutine};
}
auto promise() & -> promise_type& { return m_coroutine.promise(); }
auto promise() const & -> const promise_type& { return m_coroutine.promise(); }
auto promise() && -> promise_type&& { return std::move(m_coroutine.promise()); }
private:
coro_handle m_coroutine{nullptr};
};
namespace engine_detail
{
template<typename return_type>
inline auto promise<return_type>::get_return_object() noexcept -> engine_task<return_type>
{
return engine_task<return_type>{coro_handle::from_promise(*this)};
}
inline auto promise<void>::get_return_object() noexcept -> engine_task<>
{
return engine_task<>{coro_handle::from_promise(*this)};
}
} // namespace engine_detail
} // namespace coro
namespace coro
{
enum class await_op
{
read = EPOLLIN,
@ -322,11 +38,8 @@ enum class await_op
class engine
{
/// To destroy the root execute() tasks upon await_resume().
friend engine_detail::promise_base::final_awaitable;
public:
using task_type = engine_task<void>;
using task_type = coro::task<void>;
using message_type = uint8_t;
using socket_type = int;
@ -339,7 +52,7 @@ public:
};
private:
static constexpr engine_task_id_type submit_id = 0xFFFFFFFFFFFFFFFF;
static constexpr void* m_submit_ptr = nullptr;
public:
engine()
@ -349,7 +62,7 @@ public:
struct epoll_event e{};
e.events = EPOLLIN;
e.data.u64 = submit_id;
e.data.ptr = m_submit_ptr;
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_submit_fd, &e);
m_background_thread = std::thread([this] { this->run(); });
@ -375,99 +88,92 @@ public:
}
}
auto execute(task_type task) -> engine_task_id_type
auto execute(coro::task<void>& task) -> bool
{
if(m_shutdown)
{
return false;
}
++m_size;
auto task_id = m_task_id_counter++;
auto& promise = task.promise();
promise.parent_engine(this);
promise.task_id(task_id);
auto coro_handle = std::coroutine_handle<coro::task<void>::promise_type>::from_promise(task.promise());
{
std::lock_guard<std::mutex> lock{m_mutex};
m_submitted_tasks.emplace_back(task_id, std::move(task));
m_submitted_tasks.emplace_back(coro_handle);
}
// Signal to the event loop there is a submitted task.
uint64_t value{1};
::write(m_submit_fd, &value, sizeof(value));
return task_id;
return true;
}
auto poll(socket_type socket, await_op op) -> engine_task<void>
auto poll(socket_type socket, await_op op) -> coro::task<void>
{
// co_await suspend(
// [&](auto task_id)
// {
// struct epoll_event e{};
// e.events = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLET;
// e.data.u64 = task_id;
// epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, socket, &e);
// },
// [&]()
// {
// epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, socket, nullptr);
// }
// );
auto [suspend_task, task_id] = suspend_point();
struct epoll_event e{};
e.events = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLET;
e.data.u64 = task_id;
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, socket, &e);
co_await suspend_task;
epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, socket, nullptr);
co_return;
co_await suspend(
[&](std::coroutine_handle<> handle)
{
struct epoll_event e{};
e.events = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLET;
e.data.ptr = handle.address();
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, socket, &e);
},
[&]()
{
epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, socket, nullptr);
}
);
}
auto read(socket_type socket, std::span<char> buffer) -> engine_task<ssize_t>
auto read(socket_type socket, std::span<char> buffer) -> coro::task<ssize_t>
{
co_await poll(socket, await_op::read);
co_return ::read(socket, buffer.data(), buffer.size());
}
auto write(socket_type socket, const std::span<const char> buffer) -> coro::engine_task<ssize_t>
auto write(socket_type socket, const std::span<const char> buffer) -> coro::task<ssize_t>
{
co_await poll(socket, await_op::write);
co_return ::write(socket, buffer.data(), buffer.size());;
}
/**
* Immediately suspends the current task and provides the task_id that the user should call
* `engine.resume(task_id)` with via the functor parameter. Normal usage of this might look
* like:
* engine.suspend([&](auto task_id) {
* auto on_service_complete = [&]() { engine.resume(task_id); };
* Immediately suspends the current task and provides the coroutine handle that the user should
* call `engine.resume(handle)` with via the functor_before parameter.
* Normal usage of this might look like:
* engine.suspend([&](std::coroutine_handle<> handle) {
* auto on_service_complete = [&]() { engine.resume(handle); };
* service.execute(on_service_complete);
* });
* The above example will suspend the current task and then through the 3rd party service's
* on complete callback function let the engine know that it should resum execution of the task.
* on complete callback function let the engine know that it should resume execution of the task.
*
* This function along with `engine::resume()` are special additions for working with 3rd party
* services that do not provide coroutine support, or that are event driven and cannot work
* directly with the engine.
* @tparam func Functor to invoke with the suspended task_id.
* @param f Immediately invoked functor with the suspend point task_id to resume with.
* @return A reference to the task to `co_await suspend` on and the task's ID to call
* `engine.resume(task_id)` from another thread to resume execution at this suspend
* point.
* @tparam func Functor to invoke with the suspended coroutine handle to be resumed.
* @param f Immediately invoked functor with the suspend point coroutine handle to resume with.
* @return A task to co_await until the manual `engine::resume(handle)` is called.
*/
template<std::invocable<engine_task_id_type> functor_before>
auto suspend(functor_before before) -> engine_task<void>&
template<std::invocable<std::coroutine_handle<>> functor_before>
auto suspend(functor_before before) -> coro::task<void>
{
auto [suspend_task, task_id] = suspend_point();
before(task_id);
return suspend_task;
auto task = suspend_point();
auto coro_handle = std::coroutine_handle<coro::task<void>::promise_type>::from_promise(task.promise());
before(coro_handle);
co_await task;
co_return;
}
template<std::invocable<engine_task_id_type> functor_before, std::invocable functor_after>
auto suspend(functor_before before, functor_after after) -> engine_task<void>
template<std::invocable<std::coroutine_handle<>> functor_before, std::invocable functor_after>
auto suspend(functor_before before, functor_after after) -> coro::task<void>
{
auto [suspend_task, task_id] = suspend_point();
before(task_id);
co_await suspend_task;
auto task = suspend_point();
auto coro_handle = std::coroutine_handle<coro::task<void>::promise_type>::from_promise(task.promise());
before(coro_handle);
co_await task;
after();
co_return;
}
@ -478,38 +184,30 @@ public:
* `engine.resume(task_id)` from another thread to resume execution at this suspend
* point.
*/
auto suspend_point() -> std::pair<engine_task<void>&, engine_task_id_type>
auto suspend_point() -> coro::task<void>
{
auto await_task_id = m_task_id_counter++;
auto await_task = [&]() -> engine_task<void>
return []() -> coro::task<void>
{
co_await std::suspend_always{};
co_return;
}();
++m_size;
auto emplaced = m_active_tasks.emplace(await_task_id, std::move(await_task));
auto& iter = emplaced.first;
auto& task = iter->second;
return {task, await_task_id};
}
/**
* Resumes a suspended task manually. The use case is to first call `engine.suspend()` and
* co_await the manual suspension point to pause execution of that task. Then later on another
* thread, probably a 3rd party service, call `engine.resume(task_id)` to resume execution of
* thread, probably a 3rd party service, call `engine.resume(handle)` to resume execution of
* the task that was previously paused with the 3rd party services result.
* @param task_id The task to resume its execution from its current suspend point.
* @param handle The task to resume its execution from its current suspend point.
*/
auto resume(engine_task_id_type task_id) -> void
auto resume(std::coroutine_handle<> handle) -> void
{
{
std::lock_guard<std::mutex> lock{m_mutex};
m_resume_tasks.emplace_back(task_id);
m_resume_tasks.emplace_back(handle);
}
// Signal to the event loop there is a resume task.
// Signal to the event loop there is a task to resume.
uint64_t value{1};
::write(m_submit_fd, &value, sizeof(value));
}
@ -535,6 +233,11 @@ public:
return m_is_running;
}
auto is_shutdown() const noexcept -> bool
{
return m_shutdown;
}
auto shutdown(shutdown_type wait_for_tasks = shutdown_type::sync) -> void
{
if(!m_shutdown.exchange(true))
@ -564,91 +267,43 @@ private:
std::atomic<uint64_t> m_task_id_counter{0};
mutable std::mutex m_mutex;
std::vector<std::pair<engine_task_id_type, task_type>> m_submitted_tasks;
std::vector<engine_task_id_type> m_destroy_tasks;
std::vector<engine_task_id_type> m_resume_tasks;
std::map<engine_task_id_type, task_type> m_active_tasks;
std::vector<std::coroutine_handle<coro::task<void>::promise_type>> m_submitted_tasks{};
std::vector<std::coroutine_handle<>> m_resume_tasks{};
std::atomic<std::size_t> m_size{0};
auto task_destroy(std::map<engine_task_id_type, task_type>::iterator iter) -> void
{
m_active_tasks.erase(iter);
--m_size;
}
auto task_start(engine_task_id_type task_id, task_type& task) -> void
{
// std::cerr << "engine: submit task.resume() task_id=" << task_id << "\n";
task.resume();
// If the task is still awaiting then immediately remove.
if(!task.is_ready())
{
m_active_tasks.emplace(task_id, std::move(task));
}
else
{
--m_size;
}
}
auto register_destroy(engine_task_id_type id) -> void
{
m_destroy_tasks.emplace_back(id);
// Signal to the event loop there is a task to possibly complete.
uint64_t value{1};
::write(m_submit_fd, &value, sizeof(value));
}
auto task_register_destroy(engine_task_id_type task_id) -> void
{
auto task_found = m_active_tasks.find(task_id);
if(task_found != m_active_tasks.end())
{
auto& task = task_found->second;
if(task.is_ready())
{
task_destroy(task_found);
}
}
}
auto task_resume(engine_task_id_type task_id) -> void
{
auto task_found = m_active_tasks.find(task_id);
if(task_found != m_active_tasks.end())
{
auto& task = task_found->second;
if(task.is_ready())
{
task_destroy(task_found);
}
else
{
task.resume();
if(task.is_ready())
{
task_destroy(task_found);
}
// else suspended again
}
}
else
{
std::cerr << "engine: task was not found task_id=" << task_id << "\n";
}
}
auto run() -> void
{
using namespace std::chrono_literals;
m_is_running = true;
constexpr std::size_t growth_size{256};
std::vector<std::optional<coro::task<void>>> finalize_tasks{};
std::list<std::size_t> finalize_indexes{};
std::vector<std::list<std::size_t>::iterator> delete_indexes{};
finalize_tasks.resize(growth_size);
for(size_t i = 0; i < growth_size; ++i)
{
finalize_indexes.emplace_back(i);
}
auto free_index = finalize_indexes.begin();
auto completed = [](
std::vector<std::list<std::size_t>::iterator>& delete_indexes,
std::list<std::size_t>::iterator pos
) mutable -> coro::task<void>
{
// Mark this task for deletion, it cannot delete itself.
delete_indexes.push_back(pos);
co_return;
};
constexpr std::chrono::milliseconds timeout{1000};
constexpr std::size_t max_events = 1;
constexpr std::size_t max_events = 8;
std::array<struct epoll_event, max_events> events{};
// Execute until stopped or there are more tasks to complete.
@ -659,48 +314,88 @@ private:
{
for(std::size_t i = 0; i < event_count; ++i)
{
engine_task_id_type task_id = events[i].data.u64;
void* handle_ptr = events[i].data.ptr;
if(task_id == submit_id)
// if(task_id == submit_id)
if(handle_ptr == m_submit_ptr)
{
uint64_t value{0};
::read(m_submit_fd, &value, sizeof(value));
(void)value; // discard, the read merely reset the eventfd counter in the kernel.
std::vector<std::pair<engine_task_id_type, task_type>> submit_tasks;
std::vector<engine_task_id_type> destroy_tasks;
std::vector<engine_task_id_type> resume_tasks;
std::vector<std::coroutine_handle<coro::task<void>::promise_type>> submit_tasks{};
std::vector<std::coroutine_handle<>> resume_tasks{};
{
std::lock_guard<std::mutex> lock{m_mutex};
submit_tasks.swap(m_submitted_tasks);
destroy_tasks.swap(m_destroy_tasks);
resume_tasks.swap(m_resume_tasks);
}
// New tasks to start executing.
for(auto& [task_id, task] : submit_tasks)
for(std::coroutine_handle<coro::task<void>::promise_type>& handle : submit_tasks)
{
task_start(task_id, task);
if(!handle.done())
{
if(free_index == finalize_indexes.end())
{
// Backtrack to keep position in middle of the list.
std::advance(free_index, -1);
std::size_t new_size = finalize_tasks.size() + growth_size;
for(size_t i = finalize_tasks.size(); i < new_size; ++i)
{
finalize_indexes.emplace_back(i);
}
finalize_tasks.resize(new_size);
// Move forward to the new free index.
std::advance(free_index, 1);
}
auto pos = free_index;
std::advance(free_index, 1);
// Wrap in finalizing task to subtract from m_size.
auto index = *pos;
finalize_tasks[index] = completed(delete_indexes, pos);
auto& task = finalize_tasks[index].value();
auto& promise = handle.promise();
promise.set_continuation(task.handle());
handle.resume();
}
}
// Completed execute() root tasks, destroy them.
for(auto& task_id : destroy_tasks)
for(auto& handle : resume_tasks)
{
task_register_destroy(task_id);
}
// User code driven tasks to resume.
for(auto& task_id : resume_tasks)
{
task_resume(task_id);
if(!handle.done())
{
handle.resume();
}
}
}
else
{
// Individual poll task wake-up.
task_resume(task_id);
auto handle = std::coroutine_handle<>::from_address(handle_ptr);
if(!handle.done())
{
handle.resume();
}
}
}
// delete everything marked as completed.
if(!delete_indexes.empty())
{
for(const auto& pos : delete_indexes)
{
std::size_t index = *pos;
finalize_tasks[index] = std::nullopt;
// Put the deleted position at the end of the free indexes list.
finalize_indexes.splice(finalize_indexes.end(), finalize_indexes, pos);
}
m_size -= delete_indexes.size();
delete_indexes.clear();
}
}
}
@ -708,35 +403,5 @@ private:
}
};
namespace engine_detail
{
template<typename promise_type>
inline auto promise_base::final_awaitable::await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept -> std::coroutine_handle<>
{
// If there is a continuation call it, otherwise this is the end of the line.
auto& promise = coroutine.promise();
if(promise.m_continuation != nullptr)
{
return promise.m_continuation;
}
else
{
// If this is a root submitted task check to see if its completely done.
if(m_promise->m_engine != nullptr)
{
// std::cerr << "engine_detail::promise_base::final_awaitable::await_suspend() register_destroy(" << m_promise->m_task_id << ")\n";
m_promise->m_engine->register_destroy(m_promise->m_task_id);
}
else
{
// std::cerr << "engine_detail::promise_base::final_awaitable::await_suspend() no engine ptr\n";
}
return std::noop_coroutine();
}
}
} // namespace engine_detail
} // namespace coro

19
src/coro/sync_wait.hpp Normal file
View file

@ -0,0 +1,19 @@
#pragma once
#include "coro/task.hpp"
namespace coro
{
template<typename awaitable_functor>
auto sync_wait(awaitable_functor&& awaitable) -> decltype(auto)
{
auto task = awaitable();
while(!task.is_ready())
{
task.resume();
}
return task.promise().result();
}
} // namespace coro

View file

@ -125,7 +125,10 @@ struct promise<void> : public promise_base
auto get_return_object() noexcept -> task_type;
auto return_void() -> void { }
auto return_void() -> void
{
}
auto result() const -> void
{
@ -186,6 +189,14 @@ public:
other.m_coroutine = nullptr;
}
~task()
{
if(m_coroutine != nullptr)
{
m_coroutine.destroy();
}
}
auto operator=(const task&) -> task& = delete;
auto operator=(task&& other) noexcept -> task&
{
@ -245,8 +256,24 @@ public:
return awaitable{m_coroutine};
}
auto promise() const & -> const promise_type& { return m_coroutine.promise(); }
auto promise() && -> promise_type&& { return std::move(m_coroutine.promise()); }
auto promise() & -> promise_type&
{
return m_coroutine.promise();
}
auto promise() const & -> const promise_type&
{
return m_coroutine.promise();
}
auto promise() && -> promise_type&&
{
return std::move(m_coroutine.promise());
}
auto handle() -> coro_handle
{
return m_coroutine;
}
private:
coro_handle m_coroutine{nullptr};

View file

@ -9,13 +9,23 @@
#include <fcntl.h>
using namespace std::chrono_literals;
using task_type = coro::engine::task_type;
TEST_CASE("engine submit single functor")
{
std::atomic<uint64_t> counter{0};
coro::engine e{};
e.execute([&]() -> task_type { counter++; co_return; }());
auto task = [&]() -> coro::task<void>
{
std::cerr << "Hello world from engine task!\n";
counter++;
co_return;
}();
e.execute(task);
// while(counter != 1) std::this_thread::sleep_for(1ms);
e.shutdown();
REQUIRE(counter == 1);
@ -27,10 +37,14 @@ TEST_CASE("engine submit mutiple tasks")
std::atomic<uint64_t> counter{0};
coro::engine e{};
auto func = [&]() -> task_type { counter++; co_return; };
std::vector<coro::task<void>> tasks{};
tasks.reserve(n);
auto func = [&]() -> coro::task<void> { counter++; co_return; };
for(std::size_t i = 0; i < n; ++i)
{
e.execute(func());
tasks.emplace_back(func());
e.execute(tasks.back());
}
e.shutdown();
@ -39,31 +53,35 @@ TEST_CASE("engine submit mutiple tasks")
TEST_CASE("engine task with multiple suspends and manual resumes")
{
std::atomic<coro::engine_task_id_type> task_id{0};
// std::atomic<coro::engine_task_id_type> task_id{0};
std::atomic<uint64_t> counter{0};
coro::engine e{};
auto task = [&]() -> coro::task<void>
{
std::cerr << "1st suspend\n";
co_await std::suspend_always{};
++counter;
std::cerr << "never suspend\n";
co_await std::suspend_never{};
std::cerr << "2nd suspend\n";
co_await std::suspend_always{};
++counter;
std::cerr << "3rd suspend\n";
co_await std::suspend_always{};
++counter;
co_return;
}();
auto resume_task = [&](int expected) {
e.resume(task_id);
e.resume(task.handle());
while(counter != expected)
{
std::this_thread::sleep_for(1ms);
}
};
auto task = [&]() -> task_type
{
co_await std::suspend_always{};
++counter;
co_await std::suspend_never{};
co_await std::suspend_always{};
++counter;
co_await std::suspend_always{};
++counter;
co_return;
}();
e.execute(std::move(task));
e.execute(task);
resume_task(1);
resume_task(2);
@ -77,14 +95,15 @@ TEST_CASE("engine task with read poll")
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::engine e{};
auto task = [&]() -> task_type
auto task = [&]() -> coro::task<void>
{
// Poll will block until there is data to read.
co_await e.poll(trigger_fd, coro::await_op::read);
REQUIRE(true);
co_return;
}();
e.execute(std::move(task));
e.execute(task);
uint64_t value{42};
write(trigger_fd, &value, sizeof(value));
@ -99,7 +118,7 @@ TEST_CASE("engine task with read")
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::engine e{};
auto task = [&]() -> task_type
auto task = [&]() -> coro::task<void>
{
uint64_t val{0};
auto bytes_read = co_await e.read(
@ -112,7 +131,7 @@ TEST_CASE("engine task with read")
co_return;
}();
e.execute(std::move(task));
e.execute(task);
write(trigger_fd, &expected_value, sizeof(expected_value));
@ -132,7 +151,7 @@ TEST_CASE("engine task with read and write same fd")
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::engine e{};
auto task = [&]() -> task_type
auto task = [&]() -> coro::task<void>
{
auto bytes_written = co_await e.write(
trigger_fd,
@ -152,7 +171,7 @@ TEST_CASE("engine task with read and write same fd")
co_return;
}();
e.execute(std::move(task));
e.execute(task);
e.shutdown();
close(trigger_fd);
@ -166,7 +185,7 @@ TEST_CASE("engine task with read and write pipe")
coro::engine e{};
auto read_task = [&]() -> task_type
auto read_task = [&]() -> coro::task<void>
{
std::string buffer(4096, '0');
std::span<char> view{buffer.data(), buffer.size()};
@ -176,15 +195,15 @@ TEST_CASE("engine task with read and write pipe")
REQUIRE(buffer == msg);
}();
auto write_task = [&]() -> task_type
auto write_task = [&]() -> coro::task<void>
{
std::span<const char> view{msg.data(), msg.size()};
auto bytes_written = co_await e.write(pipe_fd[1], view);
REQUIRE(bytes_written == msg.size());
}();
e.execute(std::move(read_task));
e.execute(std::move(write_task));
e.execute(read_task);
e.execute(write_task);
e.shutdown();
close(pipe_fd[0]);
@ -195,7 +214,7 @@ static auto standalone_read(
coro::engine& e,
coro::engine::socket_type socket,
std::span<char> buffer
) -> coro::engine_task<ssize_t>
) -> coro::task<ssize_t>
{
// do other stuff in larger function
co_return co_await e.read(socket, buffer);
@ -208,7 +227,7 @@ TEST_CASE("engine standalone read task")
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::engine e{};
auto task = [&]() -> task_type
auto task = [&]() -> coro::task<void>
{
ssize_t v{0};
auto bytes_read = co_await standalone_read(e, trigger_fd, std::span<char>(reinterpret_cast<char*>(&v), sizeof(v)));
@ -218,7 +237,7 @@ TEST_CASE("engine standalone read task")
co_return;
}();
e.execute(std::move(task));
e.execute(task);
write(trigger_fd, &expected_value, sizeof(expected_value));
@ -228,7 +247,7 @@ TEST_CASE("engine standalone read task")
TEST_CASE("engine separate thread resume")
{
coro::engine_task_id_type task_id;
std::coroutine_handle<> handle;
coro::engine e{};
// This lambda will mimic a 3rd party service which will execute on a service on
@ -239,20 +258,20 @@ TEST_CASE("engine separate thread resume")
std::thread third_party_thread([&]() -> void {
// mimic some expensive computation
// std::this_thread::sleep_for(1s);
std::cerr << "task_id=" << task_id << "\n";
e.resume(task_id);
e.resume(handle);
});
third_party_thread.detach();
return std::suspend_always{};
};
auto task = [&]() -> task_type
auto task = [&]() -> coro::task<void>
{
co_await third_party_service();
REQUIRE(true);
}();
handle = task.handle();
task_id = e.execute(std::move(task));
e.execute(task);
e.shutdown();
}
@ -264,7 +283,7 @@ TEST_CASE("engine separate thread resume with return")
struct shared_data
{
std::atomic<bool> ready{false};
std::optional<coro::engine_task_id_type> task_id{};
std::optional<std::coroutine_handle<>> handle{};
uint64_t output{0};
} data{};
@ -277,25 +296,51 @@ TEST_CASE("engine separate thread resume with return")
}
data.output = expected_value;
e.resume(data.task_id.value());
e.resume(data.handle.value());
}
};
auto third_party_service = [&](int multiplier) -> coro::engine_task<uint64_t>
auto third_party_service = [&](int multiplier) -> coro::task<uint64_t>
{
co_await e.suspend([&](auto task_id) { data.task_id = task_id; data.ready = true; });
co_await e.suspend([&](std::coroutine_handle<> handle) {
data.handle = handle;
data.ready = true;
});
co_return data.output * multiplier;
};
auto task = [&]() -> task_type
auto task = [&]() -> coro::task<void>
{
int multiplier{5};
uint64_t value = co_await third_party_service(multiplier);
REQUIRE(value == (expected_value * multiplier));
}();
e.execute(std::move(task));
e.execute(task);
e.shutdown();
service.join();
e.shutdown();
}
TEST_CASE("engine with normal task")
{
constexpr std::size_t expected_value{5};
std::atomic<uint64_t> counter{0};
coro::engine e{};
auto add_data = [&](uint64_t val) -> coro::task<int>
{
co_return val;
};
auto task1 = [&]() -> coro::task<void>
{
counter += co_await add_data(expected_value);
co_return;
}();
e.execute(task1);
e.shutdown();
REQUIRE(counter == expected_value);
}

View file

@ -174,3 +174,77 @@ TEST_CASE("task multiple suspends return integer")
REQUIRE(task.is_ready());
REQUIRE(task.promise().result() == 11);
}
TEST_CASE("task multiple suspends return integer with sync_wait")
{
auto func = []() -> coro::task<int>
{
co_await std::suspend_always{};
co_await std::suspend_always{};
co_await std::suspend_always{};
co_return 11;
};
auto result = coro::sync_wait(func);
REQUIRE(result == 11);
}
TEST_CASE("task co_await single")
{
auto answer = []() -> coro::task<int>
{
std::cerr << "\tThinking deep thoughts...\n";
co_return 42;
};
auto await_answer = [&]() -> coro::task<int>
{
std::cerr << "\tStarting to wait for answer.\n";
auto a = answer();
std::cerr << "\tGot the coroutine, getting the value.\n";
auto v = co_await a;
std::cerr << "\tCoroutine value is " << v << "\n";
REQUIRE(v == 42);
v = co_await a;
std::cerr << "\tValue is still " << v << "\n";
REQUIRE(v == 42);
co_return 1337;
};
auto output = coro::sync_wait(await_answer);
REQUIRE(output == 1337);
}
TEST_CASE("task resume from promise to coroutine handles of different types")
{
auto task1 = [&]() -> coro::task<int>
{
std::cerr << "Task ran\n";
co_return 42;
}();
auto task2 = [&]() -> coro::task<void>
{
std::cerr << "Task 2 ran\n";
co_return;
}();
// task.resume(); normal method of resuming
std::vector<std::coroutine_handle<>> handles;
handles.emplace_back(std::coroutine_handle<coro::task<int>::promise_type>::from_promise(task1.promise()));
handles.emplace_back(std::coroutine_handle<coro::task<void>::promise_type>::from_promise(task2.promise()));
auto& coro_handle1 = handles[0];
coro_handle1.resume();
auto& coro_handle2 = handles[1];
coro_handle2.resume();
REQUIRE(task1.is_ready());
REQUIRE(coro_handle1.done());
REQUIRE(task1.promise().result() == 42);
REQUIRE(task2.is_ready());
REQUIRE(coro_handle2.done());
}