1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

Template task suspends, prototype out engine thoughts

This commit is contained in:
jbaldwin 2020-09-07 23:29:03 -06:00
parent bfe97a12b4
commit fb04c43370
9 changed files with 449 additions and 86 deletions

View file

@ -10,6 +10,7 @@ message("${PROJECT_NAME} CORO_CODE_COVERAGE = ${CORO_CODE_COVERAGE}")
set(LIBCORO_SOURCE_FILES
src/coro/async_manual_reset_event.hpp
src/coro/coro.hpp
src/coro/engine.hpp src/coro/engine.cpp
src/coro/task.hpp
)
@ -17,6 +18,7 @@ add_library(${PROJECT_NAME} STATIC ${LIBCORO_SOURCE_FILES})
set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE CXX)
target_compile_features(${PROJECT_NAME} PUBLIC cxx_std_20)
target_include_directories(${PROJECT_NAME} PUBLIC src)
target_link_libraries(${PROJECT_NAME} PUBLIC zmq pthread)
target_compile_options(${PROJECT_NAME} PUBLIC -fcoroutines)
if(CORO_BUILD_TESTS)

View file

@ -1,2 +1,5 @@
#pragma once
#include "coro/async_manual_reset_event.hpp"
#include "coro/engine.hpp"
#include "coro/task.hpp"

8
src/coro/engine.cpp Normal file
View file

@ -0,0 +1,8 @@
#include "coro/engine.hpp"
namespace coro
{
std::atomic<uint32_t> engine::m_engine_id_counter{0};
} // namespace coro

214
src/coro/engine.hpp Normal file
View file

@ -0,0 +1,214 @@
#pragma once
#include "coro/task.hpp"
#include <atomic>
#include <vector>
#include <memory>
#include <mutex>
#include <thread>
#include <zmq.hpp>
#include <iostream>
namespace coro
{
// class message
// {
// public:
// enum class type
// {
// new_web_request,
// async_resume
// };
// message() = default;
// message(type t, int socket)
// : m_type(t),
// m_socket(socket)
// {
// }
// ~message() = default;
// type m_type;
// int m_socket;
// };
// class web_request
// {
// public:
// web_request() = default;
// web_request(int socket) : m_socket(socket)
// {
// }
// ~web_request() = default;
// private:
// int m_socket{0};
// };
class engine
{
public:
/// Always suspend at the start since the engine will call the first `resume()`.
using task = coro::task<void, std::suspend_always>;
using message = uint8_t;
engine()
:
m_async_recv_events_socket(m_zmq_context, zmq::socket_type::pull),
m_async_send_notify_socket(m_zmq_context, zmq::socket_type::push)
{
auto dsn = "inproc://engine";
int linger = 125;
uint32_t high_water_mark = 0;
m_async_recv_events_socket.setsockopt<int>(ZMQ_LINGER, linger);
m_async_recv_events_socket.setsockopt<uint32_t>(ZMQ_RCVHWM, high_water_mark);
m_async_recv_events_socket.setsockopt<uint32_t>(ZMQ_SNDHWM, high_water_mark);
m_async_send_notify_socket.setsockopt<int>(ZMQ_LINGER, linger);
m_async_send_notify_socket.setsockopt<uint32_t>(ZMQ_RCVHWM, high_water_mark);
m_async_send_notify_socket.setsockopt<uint32_t>(ZMQ_SNDHWM, high_water_mark);
m_async_recv_events_socket.bind(dsn);
m_async_send_notify_socket.connect(dsn);
m_background_thread = std::thread([this] { this->run(); });
}
engine(const engine&) = delete;
engine(engine&&) = delete;
auto operator=(const engine&) -> engine& = delete;
auto operator=(engine&&) -> engine& = delete;
~engine()
{
stop();
m_background_thread.join();
}
auto submit_task(std::unique_ptr<task> t) -> bool
{
{
std::lock_guard<std::mutex> lock{m_queued_tasks_mutex};
m_queued_tasks.push_back(std::move(t));
}
message msg = 1;
zmq::message_t zmq_msg{&msg, sizeof(msg)};
zmq::send_result_t result;
{
std::lock_guard<std::mutex> lock{m_async_send_notify_mutex};
result = m_async_send_notify_socket.send(zmq_msg, zmq::send_flags::none);
}
if(!result.has_value())
{
return false;
}
else
{
return result.value() == sizeof(msg);
}
}
auto is_running() const noexcept -> bool
{
return m_is_running;
}
auto stop() -> void
{
m_stop = true;
}
auto run() -> void
{
using namespace std::chrono_literals;
m_is_running = true;
std::cerr << "running\n";
std::vector<zmq::pollitem_t> poll_items {
zmq::pollitem_t{static_cast<void*>(m_async_recv_events_socket), 0, ZMQ_POLLIN, 0}
};
while(!m_stop)
{
std::cerr << "polling\n";
auto events = zmq::poll(poll_items, 1000ms);
if(events > 0)
{
while(true)
{
message msg;
zmq::mutable_buffer buffer(static_cast<void*>(&msg), sizeof(message));
auto result = m_async_recv_events_socket.recv(buffer, zmq::recv_flags::dontwait);
if(!result.has_value())
{
std::cerr << "result no value\n";
// zmq returns 0 on no messages available
break; // while(true)
}
else if(result.value().truncated())
{
std::cerr << "message received with incorrect size " << result.value().size << "\n";
}
else
{
std::cerr << "message received\n";
std::vector<std::unique_ptr<task>> grabbed_tasks;
{
std::lock_guard<std::mutex> lock{m_queued_tasks_mutex};
grabbed_tasks.swap(m_queued_tasks);
}
for(auto& t : grabbed_tasks)
{
// start executing now
t->resume();
// if the task is awaiting then push into active tasks.
if(!t->is_done())
{
m_active_tasks.push_back(std::move(t));
}
}
}
}
}
}
m_is_running = false;
std::cerr << "stopping\n";
}
private:
static std::atomic<uint32_t> m_engine_id_counter;
const uint32_t m_engine_id{m_engine_id_counter++};
zmq::context_t m_zmq_context{0};
zmq::socket_t m_async_recv_events_socket;
std::mutex m_async_send_notify_mutex{};
zmq::socket_t m_async_send_notify_socket;
std::atomic<bool> m_is_running{false};
std::atomic<bool> m_stop{false};
std::thread m_background_thread;
std::mutex m_queued_tasks_mutex;
std::vector<std::unique_ptr<task>> m_queued_tasks;
std::vector<std::unique_ptr<task>> m_active_tasks;
};
} // namespace coro

View file

@ -7,92 +7,148 @@ namespace coro
{
template<
typename return_type,
typename return_type = void,
typename initial_suspend_type = std::suspend_never,
typename final_suspend_type = std::suspend_never>
class task;
namespace detail
{
template<
typename initial_suspend_type,
typename final_suspend_type>
struct promise_base
{
promise_base() noexcept = default;
~promise_base() = default;
auto initial_suspend()
{
return initial_suspend_type();
}
auto final_suspend()
{
return final_suspend_type();
}
auto unhandled_exception() -> void
{
m_exception_ptr = std::current_exception();
}
auto return_void() -> void
{
// no-op
}
protected:
std::optional<std::exception_ptr> m_exception_ptr;
};
template<
typename return_type,
typename initial_suspend_type,
typename final_suspend_type>
struct promise : public promise_base<initial_suspend_type, final_suspend_type>
{
using task_type = task<return_type, initial_suspend_type, final_suspend_type>;
using coro_handle = std::coroutine_handle<promise<return_type, initial_suspend_type, final_suspend_type>>;
promise() noexcept = default;
~promise() = default;
auto get_return_object() -> task_type;
auto return_value(return_type result) -> void
{
m_result = std::move(result);
}
auto result() const & -> const return_type&
{
if(this->m_exception_ptr.has_value())
{
std::rethrow_exception(this->m_exception_ptr.value());
}
return m_result;
}
auto result() && -> return_type&&
{
if(this->m_exception_ptr.has_value())
{
std::rethrow_exception(this->m_exception_ptr.value());
}
return std::move(m_result);
}
private:
return_type m_result;
};
template<
typename initial_suspend_type,
typename final_suspend_type>
struct promise<void, initial_suspend_type, final_suspend_type> : public promise_base<initial_suspend_type, final_suspend_type>
{
using task_type = task<void, initial_suspend_type, final_suspend_type>;
using coro_handle = std::coroutine_handle<promise<void, initial_suspend_type, final_suspend_type>>;
promise() noexcept = default;
~promise() = default;
auto get_return_object() -> task_type;
auto result() const -> void
{
if(this->m_exception_ptr.has_value())
{
std::rethrow_exception(this->m_exception_ptr.value());
}
}
};
} // namespace detail
template<
typename return_type,
typename initial_suspend_type,
typename final_suspend_type>
class task
{
public:
struct promise_type
{
/// The return value for this promise.
return_type m_return_value;
/// If an exception was thrown it will be stored here and re-thrown on accessing the return value.
std::optional<std::exception_ptr> m_e;
using coro_handle = std::coroutine_handle<promise_type>;
auto get_return_object() -> task<return_type, initial_suspend_type, final_suspend_type>
{
return coro_handle::from_promise(*this);
}
auto initial_suspend()
{
return initial_suspend_type();
}
auto final_suspend()
{
return final_suspend_type();
}
auto return_void() -> void
{
// no-op
}
auto unhandled_exception() -> void
{
m_e = std::current_exception();
}
auto return_value(return_type value) -> void
{
m_return_value = std::move(value);
}
auto return_value() const & -> const return_type&
{
if(m_e.has_value())
{
std::rethrow_exception(m_e.value());
}
return m_return_value;
}
auto return_value() && -> return_type&&
{
if(m_e.has_value())
{
std::rethrow_exception(m_e.value());
}
return std::move(m_return_value);
}
auto yield_value(return_type value)
{
m_return_value = std::move(value);
return std::suspend_always();
}
};
using promise_type = detail::promise<return_type, initial_suspend_type, final_suspend_type>;
using coro_handle = std::coroutine_handle<promise_type>;
task(coro_handle handle) : m_handle(std::move(handle))
task(coro_handle handle)
: m_handle(handle)
{
}
task(const task&) = delete;
task(task&&) = delete;
auto operator=(const task&) -> task& = delete;
auto operator=(task&&) -> task& = delete;
auto operator=(task&& other) -> task& = delete;
// {
// if(std::addressof(other) != this)
// {
// if(m_handle)
// {
// m_handle.destroy();
// }
// m_handle = other.m_handle;
// other.m_handle = nullptr;
// }
// }
auto is_done() const noexcept -> bool
{
return m_handle.done();
return m_handle == nullptr || m_handle.done();
}
auto resume() -> bool
@ -104,18 +160,67 @@ public:
return !m_handle.done();
}
auto return_value() const & -> const return_type&
struct awaiter
{
return m_handle.promise().return_value();
awaiter(coro_handle handle) noexcept
: m_handle(handle)
{
}
auto await_ready() const noexcept -> bool
{
return !m_handle || m_handle.done();
}
auto await_suspend(std::coroutine_handle<>) noexcept -> void
{
}
auto await_resume() noexcept -> return_type
{
return m_handle.promise().result();
}
coro_handle m_handle;
};
auto operator co_await() const noexcept -> awaiter
{
return awaiter(m_handle);
}
auto return_value() && -> return_type&&
{
return std::move(m_handle.promise()).return_value();
}
auto promise() const & -> const promise_type& { return m_handle.promise(); }
auto promise() && -> promise_type&& { return std::move(m_handle.promise()); }
private:
coro_handle m_handle;
coro_handle m_handle{nullptr};
};
namespace detail
{
template<
typename return_type,
typename initial_suspend_type,
typename final_suspend_type>
auto promise<return_type, initial_suspend_type, final_suspend_type>::get_return_object()
-> task_type
{
return coro_handle::from_promise(*this);
}
template<
typename initial_suspend_type,
typename final_suspend_type>
auto promise<void,initial_suspend_type, final_suspend_type>::get_return_object()
-> task_type
{
return coro_handle::from_promise(*this);
}
} // namespace detail
} // namespace coro

View file

@ -3,6 +3,7 @@ project(libcoro_test)
set(LIBCORO_TEST_SOURCE_FILES
test_async_manual_reset_event.cpp
test_engine.cpp
test_task.cpp
)

View file

@ -31,7 +31,7 @@ TEST_CASE("manual reset event one watcher")
mre_producer<uint64_t>(event, 42);
value.resume();
REQUIRE(value.return_value() == 42);
REQUIRE(value.promise().result() == 42);
}
TEST_CASE("manual reset event multiple watcher")
@ -47,7 +47,7 @@ TEST_CASE("manual reset event multiple watcher")
value2.resume();
value3.resume();
REQUIRE(value1.return_value() == expected_value);
REQUIRE(value2.return_value() == expected_value);
REQUIRE(value3.return_value() == expected_value);
REQUIRE(value1.promise().result() == expected_value);
REQUIRE(value2.promise().result() == expected_value);
REQUIRE(value3.promise().result() == expected_value);
}

16
test/test_engine.cpp Normal file
View file

@ -0,0 +1,16 @@
// #include "catch.hpp"
// #include <coro/coro.hpp>
// auto execute_task() -> coro::engine::task
// {
// std::cerr << "engine task successfully executed\n";
// co_return;
// }
// TEST_CASE("engine submit one request")
// {
// coro::engine eng{};
// eng.submit_task(execute_task());
// }

View file

@ -15,6 +15,11 @@ static auto world() -> coro::task<std::string, std::suspend_always>
co_return "World";
}
static auto void_task() -> coro::task<void, std::suspend_always>
{
co_return;
}
static auto throws_exception() -> coro::task<std::string, std::suspend_always>
{
co_await std::suspend_always();
@ -27,19 +32,28 @@ TEST_CASE("hello world task")
auto h = hello();
auto w = world();
REQUIRE(h.return_value().empty());
REQUIRE(w.return_value().empty());
REQUIRE(h.promise().result().empty());
REQUIRE(w.promise().result().empty());
h.resume(); // task suspends immediately
w.resume();
auto w_value = std::move(w).return_value();
auto w_value = std::move(w).promise().result();
REQUIRE(h.return_value() == "Hello");
REQUIRE(h.promise().result() == "Hello");
REQUIRE(w_value == "World");
REQUIRE(w.return_value().empty());
REQUIRE(w.promise().result().empty());
}
// This currently won't report as is_done(), not sure why yet...
// TEST_CASE("void task")
// {
// auto task = void_task();
// task.resume();
// REQUIRE(task.is_done());
// }
TEST_CASE("Exception thrown")
{
auto task = throws_exception();