mirror of
https://gitlab.com/niansa/libcrosscoro.git
synced 2025-03-06 20:53:32 +01:00
Refactor net and into cpp files (#25)
This commit is contained in:
parent
c02aefe26e
commit
6faafa0688
18 changed files with 890 additions and 854 deletions
|
@ -16,25 +16,25 @@ add_subdirectory(vendor/c-ares/c-ares)
|
|||
set(LIBCORO_SOURCE_FILES
|
||||
inc/coro/detail/void_value.hpp
|
||||
|
||||
inc/coro/net/connect.hpp src/net/connect.cpp
|
||||
inc/coro/net/dns_client.hpp src/net/dns_client.cpp
|
||||
inc/coro/net/hostname.hpp
|
||||
inc/coro/net/ip_address.hpp src/net/ip_address.cpp
|
||||
inc/coro/net/socket.hpp
|
||||
inc/coro/net/tcp_client.hpp src/net/tcp_client.cpp
|
||||
inc/coro/net/tcp_scheduler.hpp src/net/tcp_scheduler.cpp
|
||||
|
||||
inc/coro/awaitable.hpp
|
||||
inc/coro/connect.hpp src/connect.cpp
|
||||
inc/coro/coro.hpp
|
||||
inc/coro/dns_client.hpp src/dns_client.cpp
|
||||
inc/coro/event.hpp src/event.cpp
|
||||
inc/coro/generator.hpp
|
||||
inc/coro/io_scheduler.hpp
|
||||
inc/coro/io_scheduler.hpp src/io_scheduler.cpp
|
||||
inc/coro/latch.hpp
|
||||
inc/coro/poll.hpp
|
||||
inc/coro/promise.hpp
|
||||
inc/coro/shutdown.hpp
|
||||
inc/coro/sync_wait.hpp src/sync_wait.cpp
|
||||
inc/coro/task.hpp
|
||||
inc/coro/tcp_client.hpp src/tcp_client.cpp
|
||||
inc/coro/tcp_scheduler.hpp src/tcp_scheduler.cpp
|
||||
inc/coro/thread_pool.hpp src/thread_pool.cpp
|
||||
inc/coro/when_all.hpp
|
||||
)
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
#pragma once
|
||||
|
||||
#include "coro/net/connect.hpp"
|
||||
#include "coro/net/dns_client.hpp"
|
||||
#include "coro/net/hostname.hpp"
|
||||
#include "coro/net/ip_address.hpp"
|
||||
#include "coro/net/socket.hpp"
|
||||
#include "coro/net/tcp_client.hpp"
|
||||
#include "coro/net/tcp_scheduler.hpp"
|
||||
|
||||
#include "coro/awaitable.hpp"
|
||||
#include "coro/connect.hpp"
|
||||
#include "coro/dns_client.hpp"
|
||||
#include "coro/event.hpp"
|
||||
#include "coro/generator.hpp"
|
||||
#include "coro/io_scheduler.hpp"
|
||||
|
@ -14,7 +16,5 @@
|
|||
#include "coro/promise.hpp"
|
||||
#include "coro/sync_wait.hpp"
|
||||
#include "coro/task.hpp"
|
||||
#include "coro/tcp_client.hpp"
|
||||
#include "coro/tcp_scheduler.hpp"
|
||||
#include "coro/thread_pool.hpp"
|
||||
#include "coro/when_all.hpp"
|
||||
|
|
|
@ -27,8 +27,6 @@
|
|||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
namespace coro
|
||||
{
|
||||
class io_scheduler;
|
||||
|
@ -38,32 +36,15 @@ namespace detail
|
|||
class resume_token_base
|
||||
{
|
||||
public:
|
||||
resume_token_base(io_scheduler* s) noexcept : m_scheduler(s), m_state(nullptr) {}
|
||||
resume_token_base(io_scheduler* s) noexcept;
|
||||
|
||||
virtual ~resume_token_base() = default;
|
||||
|
||||
resume_token_base(const resume_token_base&) = delete;
|
||||
resume_token_base(resume_token_base&& other)
|
||||
{
|
||||
m_scheduler = other.m_scheduler;
|
||||
m_state = other.m_state.exchange(nullptr);
|
||||
|
||||
other.m_scheduler = nullptr;
|
||||
}
|
||||
resume_token_base(resume_token_base&& other);
|
||||
auto operator=(const resume_token_base&) -> resume_token_base& = delete;
|
||||
|
||||
auto operator=(resume_token_base&& other) -> resume_token_base&
|
||||
{
|
||||
if (std::addressof(other) != this)
|
||||
{
|
||||
m_scheduler = other.m_scheduler;
|
||||
m_state = other.m_state.exchange(nullptr);
|
||||
|
||||
other.m_scheduler = nullptr;
|
||||
}
|
||||
|
||||
return *this;
|
||||
}
|
||||
auto operator=(resume_token_base&& other) -> resume_token_base&;
|
||||
|
||||
bool is_set() const noexcept { return m_state.load(std::memory_order::acquire) == this; }
|
||||
|
||||
|
@ -72,34 +53,8 @@ public:
|
|||
awaiter(const resume_token_base& token) noexcept : m_token(token) {}
|
||||
|
||||
auto await_ready() const noexcept -> bool { return m_token.is_set(); }
|
||||
|
||||
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
||||
{
|
||||
const void* const set_state = &m_token;
|
||||
|
||||
m_awaiting_coroutine = awaiting_coroutine;
|
||||
|
||||
// This value will update if other threads write to it via acquire.
|
||||
void* old_value = m_token.m_state.load(std::memory_order::acquire);
|
||||
do
|
||||
{
|
||||
// Resume immediately if already in the set state.
|
||||
if (old_value == set_state)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
m_next = static_cast<awaiter*>(old_value);
|
||||
} while (!m_token.m_state.compare_exchange_weak(
|
||||
old_value, this, std::memory_order::release, std::memory_order::acquire));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto await_resume() noexcept
|
||||
{
|
||||
// no-op
|
||||
}
|
||||
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool;
|
||||
auto await_resume() noexcept { /* no-op */ }
|
||||
|
||||
const resume_token_base& m_token;
|
||||
std::coroutine_handle<> m_awaiting_coroutine;
|
||||
|
@ -108,11 +63,7 @@ public:
|
|||
|
||||
auto operator co_await() const noexcept -> awaiter { return awaiter{*this}; }
|
||||
|
||||
auto reset() noexcept -> void
|
||||
{
|
||||
void* old_value = this;
|
||||
m_state.compare_exchange_strong(old_value, nullptr, std::memory_order::acquire);
|
||||
}
|
||||
auto reset() noexcept -> void;
|
||||
|
||||
protected:
|
||||
friend struct awaiter;
|
||||
|
@ -187,15 +138,7 @@ private:
|
|||
public:
|
||||
using task_position = std::list<std::size_t>::iterator;
|
||||
|
||||
task_manager(const std::size_t reserve_size, const double growth_factor) : m_growth_factor(growth_factor)
|
||||
{
|
||||
m_tasks.resize(reserve_size);
|
||||
for (std::size_t i = 0; i < reserve_size; ++i)
|
||||
{
|
||||
m_task_indexes.emplace_back(i);
|
||||
}
|
||||
m_free_pos = m_task_indexes.begin();
|
||||
}
|
||||
task_manager(const std::size_t reserve_size, const double growth_factor);
|
||||
|
||||
/**
|
||||
* Stores a users task and sets a continuation coroutine to automatically mark the task
|
||||
|
@ -204,50 +147,13 @@ private:
|
|||
* first execution.
|
||||
* @return The task just stored wrapped in the self cleanup task.
|
||||
*/
|
||||
auto store(coro::task<void> user_task) -> task<void>&
|
||||
{
|
||||
// Only grow if completely full and attempting to add more.
|
||||
if (m_free_pos == m_task_indexes.end())
|
||||
{
|
||||
m_free_pos = grow();
|
||||
}
|
||||
|
||||
// Store the task inside a cleanup task for self deletion.
|
||||
auto index = *m_free_pos;
|
||||
m_tasks[index] = make_cleanup_task(std::move(user_task), m_free_pos);
|
||||
|
||||
// Mark the current used slot as used.
|
||||
std::advance(m_free_pos, 1);
|
||||
|
||||
return m_tasks[index];
|
||||
}
|
||||
auto store(coro::task<void> user_task) -> task<void>&;
|
||||
|
||||
/**
|
||||
* Garbage collects any tasks that are marked as deleted.
|
||||
* @return The number of tasks that were deleted.
|
||||
*/
|
||||
auto gc() -> std::size_t
|
||||
{
|
||||
std::size_t deleted{0};
|
||||
if (!m_tasks_to_delete.empty())
|
||||
{
|
||||
for (const auto& pos : m_tasks_to_delete)
|
||||
{
|
||||
// This doesn't actually 'delete' the task, it'll get overwritten when a
|
||||
// new user task claims the free space. It could be useful to actually
|
||||
// delete the tasks so the coroutine stack frames are destroyed. The advantage
|
||||
// of letting a new task replace and old one though is that its a 1:1 exchange
|
||||
// on delete and create, rather than a large pause here to delete all the
|
||||
// completed tasks.
|
||||
|
||||
// Put the deleted position at the end of the free indexes list.
|
||||
m_task_indexes.splice(m_task_indexes.end(), m_task_indexes, pos);
|
||||
}
|
||||
deleted = m_tasks_to_delete.size();
|
||||
m_tasks_to_delete.clear();
|
||||
}
|
||||
return deleted;
|
||||
}
|
||||
auto gc() -> std::size_t;
|
||||
|
||||
/**
|
||||
* @return The number of tasks that are awaiting deletion.
|
||||
|
@ -269,19 +175,7 @@ private:
|
|||
* Grows each task container by the growth factor.
|
||||
* @return The position of the free index after growing.
|
||||
*/
|
||||
auto grow() -> task_position
|
||||
{
|
||||
// Save an index at the current last item.
|
||||
auto last_pos = std::prev(m_task_indexes.end());
|
||||
std::size_t new_size = m_tasks.size() * m_growth_factor;
|
||||
for (std::size_t i = m_tasks.size(); i < new_size; ++i)
|
||||
{
|
||||
m_task_indexes.emplace_back(i);
|
||||
}
|
||||
m_tasks.resize(new_size);
|
||||
// Set the free pos to the item just after the previous last item.
|
||||
return std::next(last_pos);
|
||||
}
|
||||
auto grow() -> task_position;
|
||||
|
||||
/**
|
||||
* Encapsulate the users tasks in a cleanup task which marks itself for deletion upon
|
||||
|
@ -295,20 +189,7 @@ private:
|
|||
* @param pos The position where the task data will be stored in the task manager.
|
||||
* @return The user's task wrapped in a self cleanup task.
|
||||
*/
|
||||
auto make_cleanup_task(task<void> user_task, task_position pos) -> task<void>
|
||||
{
|
||||
try
|
||||
{
|
||||
co_await user_task;
|
||||
}
|
||||
catch (const std::runtime_error& e)
|
||||
{
|
||||
std::cerr << "scheduler user_task had an unhandled exception e.what()= " << e.what() << "\n";
|
||||
}
|
||||
|
||||
m_tasks_to_delete.push_back(pos);
|
||||
co_return;
|
||||
}
|
||||
auto make_cleanup_task(task<void> user_task, task_position pos) -> task<void>;
|
||||
|
||||
/// Maintains the lifetime of the tasks until they are completed.
|
||||
std::vector<task<void>> m_tasks{};
|
||||
|
@ -366,8 +247,6 @@ private:
|
|||
private:
|
||||
/// The io_scheduler that this operation will execute on.
|
||||
io_scheduler& m_io_scheduler;
|
||||
// // The coroutine awaiting execution.
|
||||
// std::coroutine_handle<> m_awaiting_coroutine{nullptr};
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -401,57 +280,14 @@ public:
|
|||
/**
|
||||
* @param options Various scheduler options to tune how it behaves.
|
||||
*/
|
||||
io_scheduler(const options opts = options{8, 2, thread_strategy_t::spawn})
|
||||
: m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)),
|
||||
m_accept_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)),
|
||||
m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)),
|
||||
m_thread_strategy(opts.thread_strategy),
|
||||
m_task_manager(opts.reserve_size, opts.growth_factor)
|
||||
{
|
||||
epoll_event e{};
|
||||
e.events = EPOLLIN;
|
||||
|
||||
e.data.ptr = const_cast<void*>(m_accept_ptr);
|
||||
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_accept_fd, &e);
|
||||
|
||||
e.data.ptr = const_cast<void*>(m_timer_ptr);
|
||||
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_timer_fd, &e);
|
||||
|
||||
if (m_thread_strategy == thread_strategy_t::spawn)
|
||||
{
|
||||
m_scheduler_thread = std::thread([this] { process_events_dedicated_thread(); });
|
||||
}
|
||||
else if (m_thread_strategy == thread_strategy_t::adopt)
|
||||
{
|
||||
process_events_dedicated_thread();
|
||||
}
|
||||
// else manual mode, the user must call process_events.
|
||||
}
|
||||
io_scheduler(const options opts = options{8, 2, thread_strategy_t::spawn});
|
||||
|
||||
io_scheduler(const io_scheduler&) = delete;
|
||||
io_scheduler(io_scheduler&&) = delete;
|
||||
auto operator=(const io_scheduler&) -> io_scheduler& = delete;
|
||||
auto operator=(io_scheduler&&) -> io_scheduler& = delete;
|
||||
|
||||
virtual ~io_scheduler()
|
||||
{
|
||||
shutdown();
|
||||
if (m_epoll_fd != -1)
|
||||
{
|
||||
close(m_epoll_fd);
|
||||
m_epoll_fd = -1;
|
||||
}
|
||||
if (m_accept_fd != -1)
|
||||
{
|
||||
close(m_accept_fd);
|
||||
m_accept_fd = -1;
|
||||
}
|
||||
if (m_timer_fd != -1)
|
||||
{
|
||||
close(m_timer_fd);
|
||||
m_timer_fd = -1;
|
||||
}
|
||||
}
|
||||
virtual ~io_scheduler();
|
||||
|
||||
/**
|
||||
* Schedules a task to be run as soon as possible. This pushes the task into a FIFO queue.
|
||||
|
@ -459,36 +295,9 @@ public:
|
|||
* @return True if the task has been scheduled, false if scheduling failed. Currently the only
|
||||
* way for this to fail is if the scheduler is trying to shutdown.
|
||||
*/
|
||||
auto schedule(coro::task<void> task) -> bool
|
||||
{
|
||||
if (is_shutdown())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
auto schedule(coro::task<void> task) -> bool;
|
||||
|
||||
// This function intentionally does not check to see if its executing on the thread that is
|
||||
// processing events. If the given task recursively generates tasks it will result in a
|
||||
// stack overflow very quickly. Instead it takes the long path of adding it to the FIFO
|
||||
// queue and processing through the normal pipeline. This simplifies the code and also makes
|
||||
// the order in which newly submitted tasks are more fair in regards to FIFO.
|
||||
|
||||
m_size.fetch_add(1, std::memory_order::relaxed);
|
||||
{
|
||||
std::lock_guard<std::mutex> lk{m_accept_mutex};
|
||||
m_accept_queue.emplace_back(std::move(task));
|
||||
}
|
||||
|
||||
// Send an event if one isn't already set. We use strong here to avoid spurious failures
|
||||
// but if it fails due to it actually being set we don't want to retry.
|
||||
bool expected{false};
|
||||
if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
|
||||
{
|
||||
uint64_t value{1};
|
||||
::write(m_accept_fd, &value, sizeof(value));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
auto schedule(std::vector<task<void>> tasks) -> bool;
|
||||
|
||||
template<awaitable_void... tasks_type>
|
||||
auto schedule(tasks_type&&... tasks) -> bool
|
||||
|
@ -514,49 +323,13 @@ public:
|
|||
return true;
|
||||
}
|
||||
|
||||
auto schedule(std::vector<task<void>>& tasks)
|
||||
{
|
||||
if (is_shutdown())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
m_size.fetch_add(tasks.size(), std::memory_order::relaxed);
|
||||
{
|
||||
std::lock_guard<std::mutex> lk{m_accept_mutex};
|
||||
m_accept_queue.insert(
|
||||
m_accept_queue.end(), std::make_move_iterator(tasks.begin()), std::make_move_iterator(tasks.end()));
|
||||
|
||||
// std::move(tasks.begin(), tasks.end(), std::back_inserter(m_accept_queue));
|
||||
}
|
||||
|
||||
tasks.clear();
|
||||
|
||||
bool expected{false};
|
||||
if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
|
||||
{
|
||||
uint64_t value{1};
|
||||
::write(m_accept_fd, &value, sizeof(value));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a task to be run after waiting for a certain period of time.
|
||||
* @param task The task to schedule after waiting `after` amount of time.
|
||||
* @return True if the task has been scheduled, false if scheduling failed. Currently the only
|
||||
* way for this to fail is if the scheduler is trying to shutdown.
|
||||
*/
|
||||
auto schedule_after(coro::task<void> task, std::chrono::milliseconds after) -> bool
|
||||
{
|
||||
if (m_shutdown_requested.load(std::memory_order::relaxed))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return schedule(make_scheduler_after_task(std::move(task), after));
|
||||
}
|
||||
auto schedule_after(coro::task<void> task, std::chrono::milliseconds after) -> bool;
|
||||
|
||||
/**
|
||||
* Schedules a task to be run at a specific time in the future.
|
||||
|
@ -565,19 +338,7 @@ public:
|
|||
* @return True if the task is scheduled. False if time is in the past or the scheduler is
|
||||
* trying to shutdown.
|
||||
*/
|
||||
auto schedule_at(coro::task<void> task, time_point time) -> bool
|
||||
{
|
||||
auto now = clock::now();
|
||||
|
||||
// If the requested time is in the past (or now!) bail out!
|
||||
if (time <= now)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
|
||||
return schedule_after(std::move(task), amount);
|
||||
}
|
||||
auto schedule_at(coro::task<void> task, time_point time) -> bool;
|
||||
|
||||
/**
|
||||
* Polls a specific file descriptor for the given poll operation.
|
||||
|
@ -587,47 +348,7 @@ public:
|
|||
* indefinitely until the event is triggered.
|
||||
*/
|
||||
auto poll(fd_t fd, poll_op op, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
|
||||
-> coro::task<poll_status>
|
||||
{
|
||||
// Setup two events, a timeout event and the actual poll for op event.
|
||||
// Whichever triggers first will delete the other to guarantee only one wins.
|
||||
// The resume token will be set by the scheduler to what the event turned out to be.
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
bool timeout_requested = (timeout > 0ms);
|
||||
|
||||
resume_token<poll_status> token{};
|
||||
timer_tokens::iterator timer_pos;
|
||||
|
||||
if (timeout_requested)
|
||||
{
|
||||
timer_pos = add_timer_token(clock::now() + timeout, &token);
|
||||
}
|
||||
|
||||
epoll_event e{};
|
||||
e.events = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLET | EPOLLRDHUP;
|
||||
e.data.ptr = &token;
|
||||
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e);
|
||||
|
||||
auto status = co_await unsafe_yield<poll_status>(token);
|
||||
switch (status)
|
||||
{
|
||||
// The event triggered first, delete the timeout.
|
||||
case poll_status::event:
|
||||
if (timeout_requested)
|
||||
{
|
||||
remove_timer_token(timer_pos);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// Deleting the event is done regardless below in epoll_ctl()
|
||||
break;
|
||||
}
|
||||
|
||||
epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
|
||||
|
||||
co_return status;
|
||||
}
|
||||
-> coro::task<poll_status>;
|
||||
|
||||
/**
|
||||
* This function will first poll the given `fd` to make sure it can be read from. Once notified
|
||||
|
@ -640,25 +361,12 @@ public:
|
|||
* @return The number of bytes read or an error code if negative.
|
||||
*/
|
||||
auto read(fd_t fd, std::span<char> buffer, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
|
||||
-> coro::task<std::pair<poll_status, ssize_t>>
|
||||
{
|
||||
auto status = co_await poll(fd, poll_op::read, timeout);
|
||||
switch (status)
|
||||
{
|
||||
case poll_status::event:
|
||||
co_return {status, ::read(fd, buffer.data(), buffer.size())};
|
||||
default:
|
||||
co_return {status, 0};
|
||||
}
|
||||
}
|
||||
-> coro::task<std::pair<poll_status, ssize_t>>;
|
||||
|
||||
auto read(
|
||||
const net::socket& sock,
|
||||
std::span<char> buffer,
|
||||
std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task<std::pair<poll_status, ssize_t>>
|
||||
{
|
||||
return read(sock.native_handle(), buffer, timeout);
|
||||
}
|
||||
std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task<std::pair<poll_status, ssize_t>>;
|
||||
|
||||
/**
|
||||
* This function will first poll the given `fd` to make sure it can be written to. Once notified
|
||||
|
@ -671,25 +379,12 @@ public:
|
|||
*/
|
||||
auto write(
|
||||
fd_t fd, const std::span<const char> buffer, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
|
||||
-> coro::task<std::pair<poll_status, ssize_t>>
|
||||
{
|
||||
auto status = co_await poll(fd, poll_op::write, timeout);
|
||||
switch (status)
|
||||
{
|
||||
case poll_status::event:
|
||||
co_return {status, ::write(fd, buffer.data(), buffer.size())};
|
||||
default:
|
||||
co_return {status, 0};
|
||||
}
|
||||
}
|
||||
-> coro::task<std::pair<poll_status, ssize_t>>;
|
||||
|
||||
auto write(
|
||||
const net::socket& sock,
|
||||
const std::span<const char> buffer,
|
||||
std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task<std::pair<poll_status, ssize_t>>
|
||||
{
|
||||
return write(sock.native_handle(), buffer, timeout);
|
||||
}
|
||||
std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task<std::pair<poll_status, ssize_t>>;
|
||||
|
||||
/**
|
||||
* Immediately yields the current task and places it at the end of the queue of tasks waiting
|
||||
|
@ -697,11 +392,7 @@ public:
|
|||
* FIFO task queue. This function is useful to yielding long processing tasks to let other tasks
|
||||
* get processing time.
|
||||
*/
|
||||
auto yield() -> coro::task<void>
|
||||
{
|
||||
co_await schedule();
|
||||
co_return;
|
||||
}
|
||||
auto yield() -> coro::task<void>;
|
||||
|
||||
/**
|
||||
* Immediately yields the current task and provides a resume token to resume this yielded
|
||||
|
@ -742,7 +433,7 @@ public:
|
|||
/**
|
||||
* User provided resume token to yield the current coroutine until the token's resume is called.
|
||||
* Its also possible to co_await a resume token inline without calling this yield function.
|
||||
* @param token Resume token to await the current task on. Use scheduer::generate_resume_token<T>() to
|
||||
* @param token Resume token to await the current task on. Use scheduer::make_resume_token<T>() to
|
||||
* generate a resume token to use on this scheduer.
|
||||
*/
|
||||
template<typename return_type>
|
||||
|
@ -756,67 +447,36 @@ public:
|
|||
* Yields the current coroutine for `amount` of time.
|
||||
* @param amount The amount of time to wait.
|
||||
*/
|
||||
auto yield_for(std::chrono::milliseconds amount) -> coro::task<void>
|
||||
{
|
||||
// If the requested amount of time is negative or zero just return.
|
||||
using namespace std::chrono_literals;
|
||||
if (amount <= 0ms)
|
||||
{
|
||||
co_return;
|
||||
}
|
||||
|
||||
resume_token<poll_status> token{};
|
||||
|
||||
add_timer_token(clock::now() + amount, &token);
|
||||
|
||||
// Wait for the token timer to trigger.
|
||||
co_await token;
|
||||
co_return;
|
||||
}
|
||||
auto yield_for(std::chrono::milliseconds amount) -> coro::task<void>;
|
||||
|
||||
/**
|
||||
* Yields the current coroutine until `time`. If time is in the past this function will
|
||||
* return immediately.
|
||||
* @param time The time point in the future to yield until.
|
||||
*/
|
||||
auto yield_until(time_point time) -> coro::task<void>
|
||||
{
|
||||
auto now = clock::now();
|
||||
|
||||
// If the requested time is in the past (or now!) just return.
|
||||
if (time <= now)
|
||||
{
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
|
||||
co_await yield_for(amount);
|
||||
co_return;
|
||||
}
|
||||
auto yield_until(time_point time) -> coro::task<void>;
|
||||
|
||||
/**
|
||||
* Generates a resume token that can be used to resume an executing task.
|
||||
* Makes a resume token that can be used to resume a suspended task from any thread. On resume
|
||||
* the task will resume execution on this scheduler thread.
|
||||
* @tparam The return type of resuming the async operation.
|
||||
* @return Resume token with the given return type.
|
||||
*/
|
||||
template<typename return_type = void>
|
||||
auto generate_resume_token() -> resume_token<return_type>
|
||||
auto make_resume_token() -> resume_token<return_type>
|
||||
{
|
||||
return resume_token<return_type>(*this);
|
||||
}
|
||||
|
||||
/**
|
||||
* If runnint in mode thread_strategy_t::manual this function must be called at regular
|
||||
* If running in mode thread_strategy_t::manual this function must be called at regular
|
||||
* intervals to process events on the io_scheduler. This function will do nothing in any
|
||||
* other thread_strategy_t mode.
|
||||
* @param timeout The timeout to wait for events.
|
||||
* @param timeout The timeout to wait for events, use zero (default) to process only events
|
||||
* that are currently ready.
|
||||
* @return The number of executing tasks.
|
||||
*/
|
||||
auto process_events(std::chrono::milliseconds timeout = std::chrono::milliseconds{1000}) -> std::size_t
|
||||
{
|
||||
process_events_external_thread(timeout);
|
||||
return m_size.load(std::memory_order::relaxed);
|
||||
}
|
||||
auto process_events(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> std::size_t;
|
||||
|
||||
/**
|
||||
* @return The number of active tasks still executing and unprocessed submitted tasks.
|
||||
|
@ -852,20 +512,7 @@ public:
|
|||
* the scheduler to shutdown but not wait for all tasks to complete, it returns
|
||||
* immediately.
|
||||
*/
|
||||
virtual auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) -> void
|
||||
{
|
||||
if (!m_shutdown_requested.exchange(true, std::memory_order::release))
|
||||
{
|
||||
// Signal the event loop to stop asap.
|
||||
uint64_t value{1};
|
||||
::write(m_accept_fd, &value, sizeof(value));
|
||||
|
||||
if (wait_for_tasks == shutdown_t::sync && m_scheduler_thread.joinable())
|
||||
{
|
||||
m_scheduler_thread.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
virtual auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) -> void;
|
||||
|
||||
private:
|
||||
/// The event loop epoll file descriptor.
|
||||
|
@ -905,13 +552,7 @@ private:
|
|||
|
||||
task_manager m_task_manager;
|
||||
|
||||
auto make_scheduler_after_task(coro::task<void> task, std::chrono::milliseconds wait_time) -> coro::task<void>
|
||||
{
|
||||
// Wait for the period requested, and then resume their task.
|
||||
co_await yield_for(wait_time);
|
||||
co_await task;
|
||||
co_return;
|
||||
}
|
||||
auto make_scheduler_after_task(coro::task<void> task, std::chrono::milliseconds wait_time) -> coro::task<void>;
|
||||
|
||||
template<typename return_type>
|
||||
auto unsafe_yield(resume_token<return_type>& token) -> coro::task<return_type>
|
||||
|
@ -927,269 +568,27 @@ private:
|
|||
}
|
||||
}
|
||||
|
||||
auto add_timer_token(time_point tp, resume_token<poll_status>* token_ptr) -> timer_tokens::iterator
|
||||
{
|
||||
auto pos = m_timer_tokens.emplace(tp, token_ptr);
|
||||
auto add_timer_token(time_point tp, resume_token<poll_status>* token_ptr) -> timer_tokens::iterator;
|
||||
|
||||
// If this item was inserted as the smallest time point, update the timeout.
|
||||
if (pos == m_timer_tokens.begin())
|
||||
{
|
||||
update_timeout(clock::now());
|
||||
}
|
||||
auto remove_timer_token(timer_tokens::iterator pos) -> void;
|
||||
|
||||
return pos;
|
||||
}
|
||||
auto resume(std::coroutine_handle<> handle) -> void;
|
||||
|
||||
auto remove_timer_token(timer_tokens::iterator pos) -> void
|
||||
{
|
||||
auto is_first = (m_timer_tokens.begin() == pos);
|
||||
|
||||
m_timer_tokens.erase(pos);
|
||||
|
||||
// If this was the first item, update the timeout. It would be acceptable to just let it
|
||||
// also fire the timeout as the event loop will ignore it since nothing will have timed
|
||||
// out but it feels like the right thing to do to update it to the correct timeout value.
|
||||
if (is_first)
|
||||
{
|
||||
update_timeout(clock::now());
|
||||
}
|
||||
}
|
||||
|
||||
auto resume(std::coroutine_handle<> handle) -> void
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lk{m_accept_mutex};
|
||||
m_accept_queue.emplace_back(handle);
|
||||
}
|
||||
|
||||
// Signal to the event loop there is a task to resume if one hasn't already been sent.
|
||||
bool expected{false};
|
||||
if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
|
||||
{
|
||||
uint64_t value{1};
|
||||
::write(m_accept_fd, &value, sizeof(value));
|
||||
}
|
||||
}
|
||||
|
||||
static constexpr std::chrono::milliseconds m_default_timeout{1000};
|
||||
static constexpr std::chrono::milliseconds m_no_timeout{0};
|
||||
static constexpr std::size_t m_max_events = 8;
|
||||
static const constexpr std::chrono::milliseconds m_default_timeout{1000};
|
||||
static const constexpr std::chrono::milliseconds m_no_timeout{0};
|
||||
static const constexpr std::size_t m_max_events = 8;
|
||||
std::array<struct epoll_event, m_max_events> m_events{};
|
||||
|
||||
auto process_task_and_start(task<void>& task) -> void { m_task_manager.store(std::move(task)).resume(); }
|
||||
auto process_task_and_start(task<void>& task) -> void;
|
||||
auto process_task_variant(task_variant& tv) -> void;
|
||||
auto process_task_queue() -> void;
|
||||
auto process_events_poll_execute(std::chrono::milliseconds user_timeout) -> void;
|
||||
auto event_to_poll_status(uint32_t events) -> poll_status;
|
||||
|
||||
inline auto process_task_variant(task_variant& tv) -> void
|
||||
{
|
||||
if (std::holds_alternative<coro::task<void>>(tv))
|
||||
{
|
||||
auto& task = std::get<coro::task<void>>(tv);
|
||||
// Store the users task and immediately start executing it.
|
||||
process_task_and_start(task);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto handle = std::get<std::coroutine_handle<>>(tv);
|
||||
// The cleanup wrapper task will catch all thrown exceptions unconditionally.
|
||||
handle.resume();
|
||||
}
|
||||
}
|
||||
auto process_events_external_thread(std::chrono::milliseconds user_timeout) -> void;
|
||||
auto process_events_dedicated_thread() -> void;
|
||||
|
||||
auto process_task_queue() -> void
|
||||
{
|
||||
std::size_t amount{0};
|
||||
{
|
||||
std::lock_guard<std::mutex> lk{m_accept_mutex};
|
||||
while (!m_accept_queue.empty() && amount < task_inline_process_amount)
|
||||
{
|
||||
m_processing_tasks[amount] = std::move(m_accept_queue.front());
|
||||
m_accept_queue.pop_front();
|
||||
++amount;
|
||||
}
|
||||
}
|
||||
|
||||
// The queue is empty, we are done here.
|
||||
if (amount == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
for (std::size_t i = 0; i < amount; ++i)
|
||||
{
|
||||
process_task_variant(m_processing_tasks[i]);
|
||||
}
|
||||
}
|
||||
|
||||
auto process_events_poll_execute(std::chrono::milliseconds user_timeout) -> void
|
||||
{
|
||||
// Need to acquire m_accept_queue size to determine if there are any pending tasks.
|
||||
std::atomic_thread_fence(std::memory_order::acquire);
|
||||
bool tasks_ready = !m_accept_queue.empty();
|
||||
|
||||
auto timeout = (tasks_ready) ? m_no_timeout : user_timeout;
|
||||
|
||||
// Poll is run every iteration to make sure 'waiting' events are properly put into
|
||||
// the FIFO queue for when they are ready.
|
||||
auto event_count = epoll_wait(m_epoll_fd, m_events.data(), m_max_events, timeout.count());
|
||||
if (event_count > 0)
|
||||
{
|
||||
for (std::size_t i = 0; i < static_cast<std::size_t>(event_count); ++i)
|
||||
{
|
||||
epoll_event& event = m_events[i];
|
||||
void* handle_ptr = event.data.ptr;
|
||||
|
||||
if (handle_ptr == m_accept_ptr)
|
||||
{
|
||||
uint64_t value{0};
|
||||
::read(m_accept_fd, &value, sizeof(value));
|
||||
(void)value; // discard, the read merely resets the eventfd counter to zero.
|
||||
|
||||
// Let any threads scheduling work know that the event set has been consumed.
|
||||
// Important to do this after the accept file descriptor has been read.
|
||||
// This needs to succeed so best practice is to loop compare exchange weak.
|
||||
bool expected{true};
|
||||
while (!m_event_set.compare_exchange_weak(
|
||||
expected, false, std::memory_order::release, std::memory_order::relaxed))
|
||||
{
|
||||
}
|
||||
|
||||
tasks_ready = true;
|
||||
}
|
||||
else if (handle_ptr == m_timer_ptr)
|
||||
{
|
||||
// If the timer fd triggered, loop and call every task that has a wait time <= now.
|
||||
while (!m_timer_tokens.empty())
|
||||
{
|
||||
// Now is continuously calculated since resuming tasks could take a fairly
|
||||
// significant amount of time and might 'trigger' more timeouts.
|
||||
auto now = clock::now();
|
||||
|
||||
auto first = m_timer_tokens.begin();
|
||||
auto [tp, token_ptr] = *first;
|
||||
|
||||
if (tp <= now)
|
||||
{
|
||||
// Important to erase first so if any timers are updated after resume
|
||||
// this timer won't be taken into account.
|
||||
m_timer_tokens.erase(first);
|
||||
// Every event triggered on the timer tokens is *always* a timeout.
|
||||
token_ptr->resume(poll_status::timeout);
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Update the time to the next smallest time point, re-take the current now time
|
||||
// since processing tasks could shit the time.
|
||||
update_timeout(clock::now());
|
||||
}
|
||||
else
|
||||
{
|
||||
// Individual poll task wake-up, this will queue the coroutines waiting
|
||||
// on the resume token into the FIFO queue for processing.
|
||||
auto* token_ptr = static_cast<resume_token<poll_status>*>(handle_ptr);
|
||||
token_ptr->resume(event_to_poll_status(event.events));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (tasks_ready)
|
||||
{
|
||||
process_task_queue();
|
||||
}
|
||||
|
||||
if (!m_task_manager.delete_tasks_empty())
|
||||
{
|
||||
m_size.fetch_sub(m_task_manager.gc(), std::memory_order::relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
auto event_to_poll_status(uint32_t events) -> poll_status
|
||||
{
|
||||
if (events & EPOLLIN || events & EPOLLOUT)
|
||||
{
|
||||
return poll_status::event;
|
||||
}
|
||||
else if (events & EPOLLERR)
|
||||
{
|
||||
return poll_status::error;
|
||||
}
|
||||
else if (events & EPOLLRDHUP || events & EPOLLHUP)
|
||||
{
|
||||
return poll_status::closed;
|
||||
}
|
||||
|
||||
throw std::runtime_error{"invalid epoll state"};
|
||||
}
|
||||
|
||||
auto process_events_external_thread(std::chrono::milliseconds user_timeout) -> void
|
||||
{
|
||||
// Do not allow two threads to process events at the same time.
|
||||
bool expected{false};
|
||||
if (m_running.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
|
||||
{
|
||||
process_events_poll_execute(user_timeout);
|
||||
m_running.exchange(false, std::memory_order::release);
|
||||
}
|
||||
}
|
||||
|
||||
auto process_events_dedicated_thread() -> void
|
||||
{
|
||||
m_running.exchange(true, std::memory_order::release);
|
||||
// Execute tasks until stopped or there are more tasks to complete.
|
||||
while (!m_shutdown_requested.load(std::memory_order::relaxed) || m_size.load(std::memory_order::relaxed) > 0)
|
||||
{
|
||||
process_events_poll_execute(m_default_timeout);
|
||||
}
|
||||
m_running.exchange(false, std::memory_order::release);
|
||||
}
|
||||
|
||||
auto update_timeout(time_point now) -> void
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
if (!m_timer_tokens.empty())
|
||||
{
|
||||
auto& [tp, task] = *m_timer_tokens.begin();
|
||||
|
||||
auto amount = tp - now;
|
||||
|
||||
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(amount);
|
||||
amount -= seconds;
|
||||
auto nanoseconds = std::chrono::duration_cast<std::chrono::nanoseconds>(amount);
|
||||
|
||||
// As a safeguard if both values end up as zero (or negative) then trigger the timeout
|
||||
// immediately as zero disarms timerfd according to the man pages and negative valeues
|
||||
// will result in an error return value.
|
||||
if (seconds <= 0s)
|
||||
{
|
||||
seconds = 0s;
|
||||
if (nanoseconds <= 0ns)
|
||||
{
|
||||
// just trigger immediately!
|
||||
nanoseconds = 1ns;
|
||||
}
|
||||
}
|
||||
|
||||
itimerspec ts{};
|
||||
ts.it_value.tv_sec = seconds.count();
|
||||
ts.it_value.tv_nsec = nanoseconds.count();
|
||||
|
||||
if (timerfd_settime(m_timer_fd, 0, &ts, nullptr) == -1)
|
||||
{
|
||||
std::string msg = "Failed to set timerfd errorno=[" + std::string{strerror(errno)} + "].";
|
||||
throw std::runtime_error(msg.data());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Setting these values to zero disables the timer.
|
||||
itimerspec ts{};
|
||||
ts.it_value.tv_sec = 0;
|
||||
ts.it_value.tv_nsec = 0;
|
||||
timerfd_settime(m_timer_fd, 0, &ts, nullptr);
|
||||
}
|
||||
}
|
||||
auto update_timeout(time_point now) -> void;
|
||||
};
|
||||
|
||||
template<typename return_type>
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
#include <string>
|
||||
|
||||
namespace coro
|
||||
namespace coro::net
|
||||
{
|
||||
enum class connect_status
|
||||
{
|
||||
|
@ -26,4 +26,4 @@ enum class connect_status
|
|||
*/
|
||||
auto to_string(const connect_status& status) -> const std::string&;
|
||||
|
||||
} // namespace coro
|
||||
} // namespace coro::net
|
|
@ -16,7 +16,7 @@
|
|||
#include <unordered_set>
|
||||
#include <sys/epoll.h>
|
||||
|
||||
namespace coro
|
||||
namespace coro::net
|
||||
{
|
||||
|
||||
class dns_client;
|
||||
|
@ -84,4 +84,4 @@ private:
|
|||
auto make_poll_task(io_scheduler::fd_t fd, poll_op ops) -> coro::task<void>;
|
||||
};
|
||||
|
||||
} // namespace coro
|
||||
} // namespace coro::net
|
|
@ -1,12 +1,12 @@
|
|||
#pragma once
|
||||
|
||||
#include "coro/net/dns_client.hpp"
|
||||
#include "coro/net/ip_address.hpp"
|
||||
#include "coro/net/hostname.hpp"
|
||||
#include "coro/net/socket.hpp"
|
||||
#include "coro/connect.hpp"
|
||||
#include "coro/net/connect.hpp"
|
||||
#include "coro/poll.hpp"
|
||||
#include "coro/task.hpp"
|
||||
#include "coro/dns_client.hpp"
|
||||
|
||||
#include <chrono>
|
||||
#include <optional>
|
||||
|
@ -16,6 +16,10 @@
|
|||
namespace coro
|
||||
{
|
||||
class io_scheduler;
|
||||
} // namespace coro
|
||||
|
||||
namespace coro::net
|
||||
{
|
||||
|
||||
class tcp_client
|
||||
{
|
||||
|
@ -30,7 +34,7 @@ public:
|
|||
net::domain_t domain{net::domain_t::ipv4};
|
||||
/// If using a hostname to connect to then provide a dns client to lookup the host's ip address.
|
||||
/// This is optional if using ip addresses directly.
|
||||
dns_client* dns{nullptr};
|
||||
net::dns_client* dns{nullptr};
|
||||
};
|
||||
|
||||
tcp_client(io_scheduler& scheduler, options opts = options{
|
||||
|
@ -44,7 +48,7 @@ public:
|
|||
auto operator=(tcp_client&&) noexcept -> tcp_client& = default;
|
||||
~tcp_client() = default;
|
||||
|
||||
auto connect(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task<connect_status>;
|
||||
auto connect(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task<net::connect_status>;
|
||||
|
||||
auto socket() const -> const net::socket& { return m_socket; }
|
||||
auto socket() -> net::socket& { return m_socket; }
|
||||
|
@ -57,7 +61,7 @@ private:
|
|||
/// The tcp socket.
|
||||
net::socket m_socket;
|
||||
/// Cache the status of the connect in the event the user calls connect() again.
|
||||
std::optional<connect_status> m_connect_status{std::nullopt};
|
||||
std::optional<net::connect_status> m_connect_status{std::nullopt};
|
||||
};
|
||||
|
||||
} // namespace coro
|
||||
} // namespace coro::net
|
|
@ -9,7 +9,7 @@
|
|||
#include <functional>
|
||||
#include <sys/socket.h>
|
||||
|
||||
namespace coro
|
||||
namespace coro::net
|
||||
{
|
||||
class tcp_scheduler : public io_scheduler
|
||||
{
|
||||
|
@ -63,4 +63,4 @@ private:
|
|||
auto make_accept_task() -> coro::task<void>;
|
||||
};
|
||||
|
||||
} // namespace coro
|
||||
} // namespace coro::net
|
694
src/io_scheduler.cpp
Normal file
694
src/io_scheduler.cpp
Normal file
|
@ -0,0 +1,694 @@
|
|||
#include "coro/io_scheduler.hpp"
|
||||
|
||||
#include <iostream>
|
||||
|
||||
namespace coro
|
||||
{
|
||||
|
||||
detail::resume_token_base::resume_token_base(io_scheduler* s) noexcept
|
||||
: m_scheduler(s), m_state(nullptr)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
detail::resume_token_base::resume_token_base(resume_token_base&& other)
|
||||
{
|
||||
m_scheduler = other.m_scheduler;
|
||||
m_state = other.m_state.exchange(nullptr);
|
||||
|
||||
other.m_scheduler = nullptr;
|
||||
}
|
||||
|
||||
auto detail::resume_token_base::awaiter::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
|
||||
{
|
||||
const void* const set_state = &m_token;
|
||||
|
||||
m_awaiting_coroutine = awaiting_coroutine;
|
||||
|
||||
// This value will update if other threads write to it via acquire.
|
||||
void* old_value = m_token.m_state.load(std::memory_order::acquire);
|
||||
do
|
||||
{
|
||||
// Resume immediately if already in the set state.
|
||||
if (old_value == set_state)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
m_next = static_cast<awaiter*>(old_value);
|
||||
} while (!m_token.m_state.compare_exchange_weak(
|
||||
old_value, this, std::memory_order::release, std::memory_order::acquire));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto detail::resume_token_base::reset() noexcept -> void
|
||||
{
|
||||
void* old_value = this;
|
||||
m_state.compare_exchange_strong(old_value, nullptr, std::memory_order::acquire);
|
||||
}
|
||||
|
||||
auto detail::resume_token_base::operator=(resume_token_base&& other) -> resume_token_base&
|
||||
{
|
||||
if (std::addressof(other) != this)
|
||||
{
|
||||
m_scheduler = other.m_scheduler;
|
||||
m_state = other.m_state.exchange(nullptr);
|
||||
|
||||
other.m_scheduler = nullptr;
|
||||
}
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
io_scheduler::task_manager::task_manager(const std::size_t reserve_size, const double growth_factor)
|
||||
: m_growth_factor(growth_factor)
|
||||
{
|
||||
m_tasks.resize(reserve_size);
|
||||
for (std::size_t i = 0; i < reserve_size; ++i)
|
||||
{
|
||||
m_task_indexes.emplace_back(i);
|
||||
}
|
||||
m_free_pos = m_task_indexes.begin();
|
||||
}
|
||||
|
||||
auto io_scheduler::task_manager::store(coro::task<void> user_task) -> task<void>&
|
||||
{
|
||||
// Only grow if completely full and attempting to add more.
|
||||
if (m_free_pos == m_task_indexes.end())
|
||||
{
|
||||
m_free_pos = grow();
|
||||
}
|
||||
|
||||
// Store the task inside a cleanup task for self deletion.
|
||||
auto index = *m_free_pos;
|
||||
m_tasks[index] = make_cleanup_task(std::move(user_task), m_free_pos);
|
||||
|
||||
// Mark the current used slot as used.
|
||||
std::advance(m_free_pos, 1);
|
||||
|
||||
return m_tasks[index];
|
||||
}
|
||||
|
||||
auto io_scheduler::task_manager::gc() -> std::size_t
|
||||
{
|
||||
std::size_t deleted{0};
|
||||
if (!m_tasks_to_delete.empty())
|
||||
{
|
||||
for (const auto& pos : m_tasks_to_delete)
|
||||
{
|
||||
// This doesn't actually 'delete' the task, it'll get overwritten when a
|
||||
// new user task claims the free space. It could be useful to actually
|
||||
// delete the tasks so the coroutine stack frames are destroyed. The advantage
|
||||
// of letting a new task replace and old one though is that its a 1:1 exchange
|
||||
// on delete and create, rather than a large pause here to delete all the
|
||||
// completed tasks.
|
||||
|
||||
// Put the deleted position at the end of the free indexes list.
|
||||
m_task_indexes.splice(m_task_indexes.end(), m_task_indexes, pos);
|
||||
}
|
||||
deleted = m_tasks_to_delete.size();
|
||||
m_tasks_to_delete.clear();
|
||||
}
|
||||
return deleted;
|
||||
}
|
||||
|
||||
auto io_scheduler::task_manager::grow() -> task_position
|
||||
{
|
||||
// Save an index at the current last item.
|
||||
auto last_pos = std::prev(m_task_indexes.end());
|
||||
std::size_t new_size = m_tasks.size() * m_growth_factor;
|
||||
for (std::size_t i = m_tasks.size(); i < new_size; ++i)
|
||||
{
|
||||
m_task_indexes.emplace_back(i);
|
||||
}
|
||||
m_tasks.resize(new_size);
|
||||
// Set the free pos to the item just after the previous last item.
|
||||
return std::next(last_pos);
|
||||
}
|
||||
|
||||
auto io_scheduler::task_manager::make_cleanup_task(task<void> user_task, task_position pos) -> task<void>
|
||||
{
|
||||
try
|
||||
{
|
||||
co_await user_task;
|
||||
}
|
||||
catch (const std::runtime_error& e)
|
||||
{
|
||||
// TODO: what would be a good way to report this to the user...? Catching here is required
|
||||
// since the co_await will unwrap the unhandled exception on the task. The scheduler thread
|
||||
// should really not throw unhandled exceptions, otherwise it'll take the application down.
|
||||
// The user's task should ideally be wrapped in a catch all and handle it themselves.
|
||||
std::cerr << "scheduler user_task had an unhandled exception e.what()= " << e.what() << "\n";
|
||||
}
|
||||
|
||||
m_tasks_to_delete.push_back(pos);
|
||||
co_return;
|
||||
}
|
||||
|
||||
io_scheduler::io_scheduler(const options opts)
|
||||
: m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)),
|
||||
m_accept_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)),
|
||||
m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)),
|
||||
m_thread_strategy(opts.thread_strategy),
|
||||
m_task_manager(opts.reserve_size, opts.growth_factor)
|
||||
{
|
||||
epoll_event e{};
|
||||
e.events = EPOLLIN;
|
||||
|
||||
e.data.ptr = const_cast<void*>(m_accept_ptr);
|
||||
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_accept_fd, &e);
|
||||
|
||||
e.data.ptr = const_cast<void*>(m_timer_ptr);
|
||||
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_timer_fd, &e);
|
||||
|
||||
if (m_thread_strategy == thread_strategy_t::spawn)
|
||||
{
|
||||
m_scheduler_thread = std::thread([this] { process_events_dedicated_thread(); });
|
||||
}
|
||||
else if (m_thread_strategy == thread_strategy_t::adopt)
|
||||
{
|
||||
process_events_dedicated_thread();
|
||||
}
|
||||
// else manual mode, the user must call process_events.
|
||||
}
|
||||
|
||||
io_scheduler::~io_scheduler()
|
||||
{
|
||||
shutdown();
|
||||
if (m_epoll_fd != -1)
|
||||
{
|
||||
close(m_epoll_fd);
|
||||
m_epoll_fd = -1;
|
||||
}
|
||||
if (m_accept_fd != -1)
|
||||
{
|
||||
close(m_accept_fd);
|
||||
m_accept_fd = -1;
|
||||
}
|
||||
if (m_timer_fd != -1)
|
||||
{
|
||||
close(m_timer_fd);
|
||||
m_timer_fd = -1;
|
||||
}
|
||||
}
|
||||
|
||||
auto io_scheduler::schedule(coro::task<void> task) -> bool
|
||||
{
|
||||
if (is_shutdown())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
// This function intentionally does not check to see if its executing on the thread that is
|
||||
// processing events. If the given task recursively generates tasks it will result in a
|
||||
// stack overflow very quickly. Instead it takes the long path of adding it to the FIFO
|
||||
// queue and processing through the normal pipeline. This simplifies the code and also makes
|
||||
// the order in which newly submitted tasks are more fair in regards to FIFO.
|
||||
|
||||
m_size.fetch_add(1, std::memory_order::relaxed);
|
||||
{
|
||||
std::lock_guard<std::mutex> lk{m_accept_mutex};
|
||||
m_accept_queue.emplace_back(std::move(task));
|
||||
}
|
||||
|
||||
// Send an event if one isn't already set. We use strong here to avoid spurious failures
|
||||
// but if it fails due to it actually being set we don't want to retry.
|
||||
bool expected{false};
|
||||
if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
|
||||
{
|
||||
uint64_t value{1};
|
||||
::write(m_accept_fd, &value, sizeof(value));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto io_scheduler::schedule(std::vector<task<void>> tasks) -> bool
|
||||
{
|
||||
if (is_shutdown())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
m_size.fetch_add(tasks.size(), std::memory_order::relaxed);
|
||||
{
|
||||
std::lock_guard<std::mutex> lk{m_accept_mutex};
|
||||
m_accept_queue.insert(
|
||||
m_accept_queue.end(), std::make_move_iterator(tasks.begin()), std::make_move_iterator(tasks.end()));
|
||||
|
||||
// std::move(tasks.begin(), tasks.end(), std::back_inserter(m_accept_queue));
|
||||
}
|
||||
|
||||
tasks.clear();
|
||||
|
||||
bool expected{false};
|
||||
if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
|
||||
{
|
||||
uint64_t value{1};
|
||||
::write(m_accept_fd, &value, sizeof(value));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto io_scheduler::schedule_after(coro::task<void> task, std::chrono::milliseconds after) -> bool
|
||||
{
|
||||
if (m_shutdown_requested.load(std::memory_order::relaxed))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
return schedule(make_scheduler_after_task(std::move(task), after));
|
||||
}
|
||||
|
||||
auto io_scheduler::schedule_at(coro::task<void> task, time_point time) -> bool
|
||||
{
|
||||
auto now = clock::now();
|
||||
|
||||
// If the requested time is in the past (or now!) bail out!
|
||||
if (time <= now)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
|
||||
return schedule_after(std::move(task), amount);
|
||||
}
|
||||
|
||||
auto io_scheduler::poll(fd_t fd, poll_op op, std::chrono::milliseconds timeout) -> coro::task<poll_status>
|
||||
{
|
||||
// Setup two events, a timeout event and the actual poll for op event.
|
||||
// Whichever triggers first will delete the other to guarantee only one wins.
|
||||
// The resume token will be set by the scheduler to what the event turned out to be.
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
bool timeout_requested = (timeout > 0ms);
|
||||
|
||||
resume_token<poll_status> token{};
|
||||
timer_tokens::iterator timer_pos;
|
||||
|
||||
if (timeout_requested)
|
||||
{
|
||||
timer_pos = add_timer_token(clock::now() + timeout, &token);
|
||||
}
|
||||
|
||||
epoll_event e{};
|
||||
e.events = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLET | EPOLLRDHUP;
|
||||
e.data.ptr = &token;
|
||||
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e);
|
||||
|
||||
auto status = co_await unsafe_yield<poll_status>(token);
|
||||
switch (status)
|
||||
{
|
||||
// The event triggered first, delete the timeout.
|
||||
case poll_status::event:
|
||||
if (timeout_requested)
|
||||
{
|
||||
remove_timer_token(timer_pos);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// Deleting the event is done regardless below in epoll_ctl()
|
||||
break;
|
||||
}
|
||||
|
||||
epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
|
||||
|
||||
co_return status;
|
||||
}
|
||||
|
||||
auto io_scheduler::read(fd_t fd, std::span<char> buffer, std::chrono::milliseconds timeout)
|
||||
-> coro::task<std::pair<poll_status, ssize_t>>
|
||||
{
|
||||
auto status = co_await poll(fd, poll_op::read, timeout);
|
||||
switch (status)
|
||||
{
|
||||
case poll_status::event:
|
||||
co_return {status, ::read(fd, buffer.data(), buffer.size())};
|
||||
default:
|
||||
co_return {status, 0};
|
||||
}
|
||||
}
|
||||
|
||||
auto io_scheduler::read(
|
||||
const net::socket& sock,
|
||||
std::span<char> buffer,
|
||||
std::chrono::milliseconds timeout) -> coro::task<std::pair<poll_status, ssize_t>>
|
||||
{
|
||||
return read(sock.native_handle(), buffer, timeout);
|
||||
}
|
||||
|
||||
auto io_scheduler::write(
|
||||
fd_t fd, const std::span<const char> buffer, std::chrono::milliseconds timeout)
|
||||
-> coro::task<std::pair<poll_status, ssize_t>>
|
||||
{
|
||||
auto status = co_await poll(fd, poll_op::write, timeout);
|
||||
switch (status)
|
||||
{
|
||||
case poll_status::event:
|
||||
co_return {status, ::write(fd, buffer.data(), buffer.size())};
|
||||
default:
|
||||
co_return {status, 0};
|
||||
}
|
||||
}
|
||||
|
||||
auto io_scheduler::write(
|
||||
const net::socket& sock,
|
||||
const std::span<const char> buffer,
|
||||
std::chrono::milliseconds timeout) -> coro::task<std::pair<poll_status, ssize_t>>
|
||||
{
|
||||
return write(sock.native_handle(), buffer, timeout);
|
||||
}
|
||||
|
||||
auto io_scheduler::yield() -> coro::task<void>
|
||||
{
|
||||
co_await schedule();
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto io_scheduler::yield_for(std::chrono::milliseconds amount) -> coro::task<void>
|
||||
{
|
||||
// If the requested amount of time is negative or zero just return.
|
||||
using namespace std::chrono_literals;
|
||||
if (amount <= 0ms)
|
||||
{
|
||||
co_return;
|
||||
}
|
||||
|
||||
resume_token<poll_status> token{};
|
||||
|
||||
add_timer_token(clock::now() + amount, &token);
|
||||
|
||||
// Wait for the token timer to trigger.
|
||||
co_await token;
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto io_scheduler::yield_until(time_point time) -> coro::task<void>
|
||||
{
|
||||
auto now = clock::now();
|
||||
|
||||
// If the requested time is in the past (or now!) just return.
|
||||
if (time <= now)
|
||||
{
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
|
||||
co_await yield_for(amount);
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto io_scheduler::process_events(std::chrono::milliseconds timeout) -> std::size_t
|
||||
{
|
||||
process_events_external_thread(timeout);
|
||||
return m_size.load(std::memory_order::relaxed);
|
||||
}
|
||||
|
||||
auto io_scheduler::shutdown(shutdown_t wait_for_tasks) -> void
|
||||
{
|
||||
if (!m_shutdown_requested.exchange(true, std::memory_order::release))
|
||||
{
|
||||
// Signal the event loop to stop asap.
|
||||
uint64_t value{1};
|
||||
::write(m_accept_fd, &value, sizeof(value));
|
||||
|
||||
if (wait_for_tasks == shutdown_t::sync && m_scheduler_thread.joinable())
|
||||
{
|
||||
m_scheduler_thread.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto io_scheduler::make_scheduler_after_task(coro::task<void> task, std::chrono::milliseconds wait_time) -> coro::task<void>
|
||||
{
|
||||
// Wait for the period requested, and then resume their task.
|
||||
co_await yield_for(wait_time);
|
||||
co_await task;
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto io_scheduler::add_timer_token(time_point tp, resume_token<poll_status>* token_ptr) -> timer_tokens::iterator
|
||||
{
|
||||
auto pos = m_timer_tokens.emplace(tp, token_ptr);
|
||||
|
||||
// If this item was inserted as the smallest time point, update the timeout.
|
||||
if (pos == m_timer_tokens.begin())
|
||||
{
|
||||
update_timeout(clock::now());
|
||||
}
|
||||
|
||||
return pos;
|
||||
}
|
||||
|
||||
auto io_scheduler::remove_timer_token(timer_tokens::iterator pos) -> void
|
||||
{
|
||||
auto is_first = (m_timer_tokens.begin() == pos);
|
||||
|
||||
m_timer_tokens.erase(pos);
|
||||
|
||||
// If this was the first item, update the timeout. It would be acceptable to just let it
|
||||
// also fire the timeout as the event loop will ignore it since nothing will have timed
|
||||
// out but it feels like the right thing to do to update it to the correct timeout value.
|
||||
if (is_first)
|
||||
{
|
||||
update_timeout(clock::now());
|
||||
}
|
||||
}
|
||||
|
||||
auto io_scheduler::resume(std::coroutine_handle<> handle) -> void
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lk{m_accept_mutex};
|
||||
m_accept_queue.emplace_back(handle);
|
||||
}
|
||||
|
||||
// Signal to the event loop there is a task to resume if one hasn't already been sent.
|
||||
bool expected{false};
|
||||
if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
|
||||
{
|
||||
uint64_t value{1};
|
||||
::write(m_accept_fd, &value, sizeof(value));
|
||||
}
|
||||
}
|
||||
|
||||
auto io_scheduler::process_task_and_start(task<void>& task) -> void
|
||||
{
|
||||
m_task_manager.store(std::move(task)).resume();
|
||||
}
|
||||
|
||||
auto io_scheduler::process_task_variant(task_variant& tv) -> void
|
||||
{
|
||||
if (std::holds_alternative<coro::task<void>>(tv))
|
||||
{
|
||||
auto& task = std::get<coro::task<void>>(tv);
|
||||
// Store the users task and immediately start executing it.
|
||||
process_task_and_start(task);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto handle = std::get<std::coroutine_handle<>>(tv);
|
||||
// The cleanup wrapper task will catch all thrown exceptions unconditionally.
|
||||
handle.resume();
|
||||
}
|
||||
}
|
||||
|
||||
auto io_scheduler::process_task_queue() -> void
|
||||
{
|
||||
std::size_t amount{0};
|
||||
{
|
||||
std::lock_guard<std::mutex> lk{m_accept_mutex};
|
||||
while (!m_accept_queue.empty() && amount < task_inline_process_amount)
|
||||
{
|
||||
m_processing_tasks[amount] = std::move(m_accept_queue.front());
|
||||
m_accept_queue.pop_front();
|
||||
++amount;
|
||||
}
|
||||
}
|
||||
|
||||
// The queue is empty, we are done here.
|
||||
if (amount == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
for (std::size_t i = 0; i < amount; ++i)
|
||||
{
|
||||
process_task_variant(m_processing_tasks[i]);
|
||||
}
|
||||
}
|
||||
|
||||
auto io_scheduler::process_events_poll_execute(std::chrono::milliseconds user_timeout) -> void
|
||||
{
|
||||
// Need to acquire m_accept_queue size to determine if there are any pending tasks.
|
||||
std::atomic_thread_fence(std::memory_order::acquire);
|
||||
bool tasks_ready = !m_accept_queue.empty();
|
||||
|
||||
auto timeout = (tasks_ready) ? m_no_timeout : user_timeout;
|
||||
|
||||
// Poll is run every iteration to make sure 'waiting' events are properly put into
|
||||
// the FIFO queue for when they are ready.
|
||||
auto event_count = epoll_wait(m_epoll_fd, m_events.data(), m_max_events, timeout.count());
|
||||
if (event_count > 0)
|
||||
{
|
||||
for (std::size_t i = 0; i < static_cast<std::size_t>(event_count); ++i)
|
||||
{
|
||||
epoll_event& event = m_events[i];
|
||||
void* handle_ptr = event.data.ptr;
|
||||
|
||||
if (handle_ptr == m_accept_ptr)
|
||||
{
|
||||
uint64_t value{0};
|
||||
::read(m_accept_fd, &value, sizeof(value));
|
||||
(void)value; // discard, the read merely resets the eventfd counter to zero.
|
||||
|
||||
// Let any threads scheduling work know that the event set has been consumed.
|
||||
// Important to do this after the accept file descriptor has been read.
|
||||
// This needs to succeed so best practice is to loop compare exchange weak.
|
||||
bool expected{true};
|
||||
while (!m_event_set.compare_exchange_weak(
|
||||
expected, false, std::memory_order::release, std::memory_order::relaxed))
|
||||
{
|
||||
}
|
||||
|
||||
tasks_ready = true;
|
||||
}
|
||||
else if (handle_ptr == m_timer_ptr)
|
||||
{
|
||||
// If the timer fd triggered, loop and call every task that has a wait time <= now.
|
||||
while (!m_timer_tokens.empty())
|
||||
{
|
||||
// Now is continuously calculated since resuming tasks could take a fairly
|
||||
// significant amount of time and might 'trigger' more timeouts.
|
||||
auto now = clock::now();
|
||||
|
||||
auto first = m_timer_tokens.begin();
|
||||
auto [tp, token_ptr] = *first;
|
||||
|
||||
if (tp <= now)
|
||||
{
|
||||
// Important to erase first so if any timers are updated after resume
|
||||
// this timer won't be taken into account.
|
||||
m_timer_tokens.erase(first);
|
||||
// Every event triggered on the timer tokens is *always* a timeout.
|
||||
token_ptr->resume(poll_status::timeout);
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Update the time to the next smallest time point, re-take the current now time
|
||||
// since processing tasks could shit the time.
|
||||
update_timeout(clock::now());
|
||||
}
|
||||
else
|
||||
{
|
||||
// Individual poll task wake-up, this will queue the coroutines waiting
|
||||
// on the resume token into the FIFO queue for processing.
|
||||
auto* token_ptr = static_cast<resume_token<poll_status>*>(handle_ptr);
|
||||
token_ptr->resume(event_to_poll_status(event.events));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (tasks_ready)
|
||||
{
|
||||
process_task_queue();
|
||||
}
|
||||
|
||||
if (!m_task_manager.delete_tasks_empty())
|
||||
{
|
||||
m_size.fetch_sub(m_task_manager.gc(), std::memory_order::relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
auto io_scheduler::event_to_poll_status(uint32_t events) -> poll_status
|
||||
{
|
||||
if (events & EPOLLIN || events & EPOLLOUT)
|
||||
{
|
||||
return poll_status::event;
|
||||
}
|
||||
else if (events & EPOLLERR)
|
||||
{
|
||||
return poll_status::error;
|
||||
}
|
||||
else if (events & EPOLLRDHUP || events & EPOLLHUP)
|
||||
{
|
||||
return poll_status::closed;
|
||||
}
|
||||
|
||||
throw std::runtime_error{"invalid epoll state"};
|
||||
}
|
||||
|
||||
auto io_scheduler::process_events_external_thread(std::chrono::milliseconds user_timeout) -> void
|
||||
{
|
||||
// Do not allow two threads to process events at the same time.
|
||||
bool expected{false};
|
||||
if (m_running.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
|
||||
{
|
||||
process_events_poll_execute(user_timeout);
|
||||
m_running.exchange(false, std::memory_order::release);
|
||||
}
|
||||
}
|
||||
|
||||
auto io_scheduler::process_events_dedicated_thread() -> void
|
||||
{
|
||||
m_running.exchange(true, std::memory_order::release);
|
||||
// Execute tasks until stopped or there are more tasks to complete.
|
||||
while (!m_shutdown_requested.load(std::memory_order::relaxed) || m_size.load(std::memory_order::relaxed) > 0)
|
||||
{
|
||||
process_events_poll_execute(m_default_timeout);
|
||||
}
|
||||
m_running.exchange(false, std::memory_order::release);
|
||||
}
|
||||
|
||||
auto io_scheduler::update_timeout(time_point now) -> void
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
if (!m_timer_tokens.empty())
|
||||
{
|
||||
auto& [tp, task] = *m_timer_tokens.begin();
|
||||
|
||||
auto amount = tp - now;
|
||||
|
||||
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(amount);
|
||||
amount -= seconds;
|
||||
auto nanoseconds = std::chrono::duration_cast<std::chrono::nanoseconds>(amount);
|
||||
|
||||
// As a safeguard if both values end up as zero (or negative) then trigger the timeout
|
||||
// immediately as zero disarms timerfd according to the man pages and negative valeues
|
||||
// will result in an error return value.
|
||||
if (seconds <= 0s)
|
||||
{
|
||||
seconds = 0s;
|
||||
if (nanoseconds <= 0ns)
|
||||
{
|
||||
// just trigger "immediately"!
|
||||
nanoseconds = 1ns;
|
||||
}
|
||||
}
|
||||
|
||||
itimerspec ts{};
|
||||
ts.it_value.tv_sec = seconds.count();
|
||||
ts.it_value.tv_nsec = nanoseconds.count();
|
||||
|
||||
if (timerfd_settime(m_timer_fd, 0, &ts, nullptr) == -1)
|
||||
{
|
||||
std::string msg = "Failed to set timerfd errorno=[" + std::string{strerror(errno)} + "].";
|
||||
throw std::runtime_error(msg.data());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Setting these values to zero disables the timer.
|
||||
itimerspec ts{};
|
||||
ts.it_value.tv_sec = 0;
|
||||
ts.it_value.tv_nsec = 0;
|
||||
timerfd_settime(m_timer_fd, 0, &ts, nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace coro
|
|
@ -1,8 +1,8 @@
|
|||
#include "coro/connect.hpp"
|
||||
#include "coro/net/connect.hpp"
|
||||
|
||||
#include <stdexcept>
|
||||
|
||||
namespace coro
|
||||
namespace coro::net
|
||||
{
|
||||
static std::string connect_status_connected{"connected"};
|
||||
static std::string connect_status_invalid_ip_address{"invalid_ip_address"};
|
||||
|
@ -32,4 +32,4 @@ auto to_string(const connect_status& status) -> const std::string&
|
|||
throw std::logic_error{"Invalid/unknown connect status."};
|
||||
}
|
||||
|
||||
} // namespace coro
|
||||
} // namespace coro::net
|
|
@ -1,10 +1,10 @@
|
|||
#include "coro/dns_client.hpp"
|
||||
#include "coro/net/dns_client.hpp"
|
||||
|
||||
#include <iostream>
|
||||
#include <netdb.h>
|
||||
#include <arpa/inet.h>
|
||||
|
||||
namespace coro
|
||||
namespace coro::net
|
||||
{
|
||||
|
||||
uint64_t dns_client::m_ares_count{0};
|
||||
|
@ -97,7 +97,7 @@ dns_client::~dns_client()
|
|||
|
||||
auto dns_client::host_by_name(const net::hostname& hn) -> coro::task<std::unique_ptr<dns_result>>
|
||||
{
|
||||
auto token = m_scheduler.generate_resume_token<void>();
|
||||
auto token = m_scheduler.make_resume_token<void>();
|
||||
auto result_ptr = std::make_unique<dns_result>(token, 2);
|
||||
|
||||
ares_gethostbyname(m_ares_channel, hn.data().data(), AF_INET, ares_dns_callback, result_ptr.get());
|
||||
|
@ -190,4 +190,4 @@ auto dns_client::make_poll_task(io_scheduler::fd_t fd, poll_op ops) -> coro::tas
|
|||
co_return;
|
||||
};
|
||||
|
||||
} // namespace coro
|
||||
} // namespace coro::net
|
|
@ -1,9 +1,9 @@
|
|||
#include "coro/tcp_client.hpp"
|
||||
#include "coro/net/tcp_client.hpp"
|
||||
#include "coro/io_scheduler.hpp"
|
||||
|
||||
#include <ares.h>
|
||||
|
||||
namespace coro
|
||||
namespace coro::net
|
||||
{
|
||||
tcp_client::tcp_client(io_scheduler& scheduler, options opts)
|
||||
: m_io_scheduler(scheduler),
|
||||
|
@ -20,8 +20,9 @@ auto tcp_client::connect(std::chrono::milliseconds timeout) -> coro::task<connec
|
|||
}
|
||||
|
||||
const net::ip_address* ip_addr{nullptr};
|
||||
std::unique_ptr<dns_result> result_ptr{nullptr};
|
||||
std::unique_ptr<net::dns_result> result_ptr{nullptr};
|
||||
|
||||
// If the user provided a hostname then perform the dns lookup.
|
||||
if(std::holds_alternative<net::hostname>(m_options.address))
|
||||
{
|
||||
if(m_options.dns == nullptr)
|
||||
|
@ -31,7 +32,7 @@ auto tcp_client::connect(std::chrono::milliseconds timeout) -> coro::task<connec
|
|||
}
|
||||
const auto& hn = std::get<net::hostname>(m_options.address);
|
||||
result_ptr = co_await m_options.dns->host_by_name(hn);
|
||||
if(result_ptr->status() != dns_status::complete)
|
||||
if(result_ptr->status() != net::dns_status::complete)
|
||||
{
|
||||
m_connect_status = connect_status::dns_lookup_failure;
|
||||
co_return connect_status::dns_lookup_failure;
|
||||
|
@ -99,4 +100,4 @@ auto tcp_client::connect(std::chrono::milliseconds timeout) -> coro::task<connec
|
|||
co_return connect_status::error;
|
||||
}
|
||||
|
||||
} // namespace coro
|
||||
} // namespace coro::net
|
|
@ -1,6 +1,6 @@
|
|||
#include "coro/tcp_scheduler.hpp"
|
||||
#include "coro/net/tcp_scheduler.hpp"
|
||||
|
||||
namespace coro
|
||||
namespace coro::net
|
||||
{
|
||||
|
||||
tcp_scheduler::tcp_scheduler(options opts)
|
||||
|
@ -68,8 +68,7 @@ auto tcp_scheduler::make_accept_task() -> coro::task<void>
|
|||
|
||||
if (!tasks.empty())
|
||||
{
|
||||
schedule(tasks);
|
||||
tasks.clear();
|
||||
schedule(std::move(tasks));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -78,4 +77,4 @@ auto tcp_scheduler::make_accept_task() -> coro::task<void>
|
|||
co_return;
|
||||
};
|
||||
|
||||
} // namespace coro
|
||||
} // namespace coro::net
|
|
@ -2,17 +2,17 @@ cmake_minimum_required(VERSION 3.16)
|
|||
project(libcoro_test)
|
||||
|
||||
set(LIBCORO_TEST_SOURCE_FILES
|
||||
net/test_dns_client.cpp
|
||||
net/test_ip_address.cpp
|
||||
net/test_tcp_scheduler.cpp
|
||||
|
||||
bench.cpp
|
||||
test_dns_client.cpp
|
||||
test_event.cpp
|
||||
test_generator.cpp
|
||||
test_io_scheduler.cpp
|
||||
test_latch.cpp
|
||||
test_sync_wait.cpp
|
||||
test_task.cpp
|
||||
test_tcp_scheduler.cpp
|
||||
test_thread_pool.cpp
|
||||
test_when_all.cpp
|
||||
)
|
||||
|
|
|
@ -179,7 +179,7 @@ TEST_CASE("benchmark counter task io_scheduler yield -> resume from main")
|
|||
std::vector<coro::resume_token<void>> tokens{};
|
||||
for (std::size_t i = 0; i < iterations; ++i)
|
||||
{
|
||||
tokens.emplace_back(s.generate_resume_token<void>());
|
||||
tokens.emplace_back(s.make_resume_token<void>());
|
||||
}
|
||||
|
||||
std::atomic<uint64_t> counter{0};
|
||||
|
@ -219,7 +219,7 @@ TEST_CASE("benchmark counter task io_scheduler yield -> resume from coroutine")
|
|||
std::vector<coro::resume_token<void>> tokens{};
|
||||
for (std::size_t i = 0; i < iterations; ++i)
|
||||
{
|
||||
tokens.emplace_back(s.generate_resume_token<void>());
|
||||
tokens.emplace_back(s.make_resume_token<void>());
|
||||
}
|
||||
|
||||
std::atomic<uint64_t> counter{0};
|
||||
|
@ -260,7 +260,7 @@ TEST_CASE("benchmark counter task io_scheduler resume from coroutine -> yield")
|
|||
std::vector<coro::resume_token<void>> tokens{};
|
||||
for (std::size_t i = 0; i < iterations; ++i)
|
||||
{
|
||||
tokens.emplace_back(s.generate_resume_token<void>());
|
||||
tokens.emplace_back(s.make_resume_token<void>());
|
||||
}
|
||||
|
||||
std::atomic<uint64_t> counter{0};
|
||||
|
@ -301,7 +301,7 @@ TEST_CASE("benchmark counter task io_scheduler yield (all) -> resume (all) from
|
|||
std::vector<coro::resume_token<void>> tokens{};
|
||||
for (std::size_t i = 0; i < iterations; ++i)
|
||||
{
|
||||
tokens.emplace_back(s.generate_resume_token<void>());
|
||||
tokens.emplace_back(s.make_resume_token<void>());
|
||||
}
|
||||
|
||||
std::atomic<uint64_t> counter{0};
|
||||
|
|
|
@ -10,7 +10,7 @@ TEST_CASE("dns_client basic")
|
|||
coro::io_scheduler::options{.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn}
|
||||
};
|
||||
|
||||
coro::dns_client dns_client{scheduler, std::chrono::milliseconds{5000}};
|
||||
coro::net::dns_client dns_client{scheduler, std::chrono::milliseconds{5000}};
|
||||
|
||||
std::atomic<bool> done{false};
|
||||
|
||||
|
@ -18,7 +18,7 @@ TEST_CASE("dns_client basic")
|
|||
{
|
||||
auto result_ptr = co_await std::move(dns_client.host_by_name(hn));
|
||||
|
||||
if(result_ptr->status() == coro::dns_status::complete)
|
||||
if(result_ptr->status() == coro::net::dns_status::complete)
|
||||
{
|
||||
for(const auto& ip_addr : result_ptr->ip_addresses())
|
||||
{
|
91
test/net/test_tcp_scheduler.cpp
Normal file
91
test/net/test_tcp_scheduler.cpp
Normal file
|
@ -0,0 +1,91 @@
|
|||
#include "catch.hpp"
|
||||
|
||||
#include <coro/coro.hpp>
|
||||
|
||||
TEST_CASE("tcp_scheduler no on connection throws")
|
||||
{
|
||||
REQUIRE_THROWS(coro::net::tcp_scheduler{coro::net::tcp_scheduler::options{.on_connection = nullptr}});
|
||||
}
|
||||
|
||||
static auto tcp_scheduler_echo_server(
|
||||
const std::variant<coro::net::hostname, coro::net::ip_address> address,
|
||||
const std::string msg
|
||||
) -> void
|
||||
{
|
||||
auto on_connection = [&msg](coro::net::tcp_scheduler& scheduler, coro::net::socket sock) -> coro::task<void> {
|
||||
std::string in(64, '\0');
|
||||
|
||||
auto [rstatus, rbytes] = co_await scheduler.read(sock, std::span<char>{in.data(), in.size()});
|
||||
REQUIRE(rstatus == coro::poll_status::event);
|
||||
|
||||
in.resize(rbytes);
|
||||
REQUIRE(in == msg);
|
||||
|
||||
auto [wstatus, wbytes] = co_await scheduler.write(sock, std::span<const char>(in.data(), in.length()));
|
||||
REQUIRE(wstatus == coro::poll_status::event);
|
||||
REQUIRE(wbytes == in.length());
|
||||
|
||||
co_return;
|
||||
};
|
||||
|
||||
coro::net::tcp_scheduler scheduler{coro::net::tcp_scheduler::options{
|
||||
.address = coro::net::ip_address::from_string("0.0.0.0"),
|
||||
.port = 8080,
|
||||
.backlog = 128,
|
||||
.on_connection = on_connection,
|
||||
.io_options = coro::io_scheduler::options{.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn}}};
|
||||
|
||||
coro::net::dns_client dns_client{scheduler, std::chrono::seconds{5}};
|
||||
|
||||
auto make_client_task = [&scheduler, &dns_client, &address, &msg]() -> coro::task<void> {
|
||||
coro::net::tcp_client client{
|
||||
scheduler,
|
||||
coro::net::tcp_client::options{
|
||||
.address = address,
|
||||
.port = 8080,
|
||||
.domain = coro::net::domain_t::ipv4,
|
||||
.dns = &dns_client}};
|
||||
|
||||
auto cstatus = co_await client.connect();
|
||||
REQUIRE(cstatus == coro::net::connect_status::connected);
|
||||
|
||||
auto [wstatus, wbytes] =
|
||||
co_await scheduler.write(client.socket(), std::span<const char>{msg.data(), msg.length()});
|
||||
|
||||
REQUIRE(wstatus == coro::poll_status::event);
|
||||
REQUIRE(wbytes == msg.length());
|
||||
|
||||
std::string response(64, '\0');
|
||||
|
||||
auto [rstatus, rbytes] =
|
||||
co_await scheduler.read(client.socket(), std::span<char>{response.data(), response.length()});
|
||||
|
||||
REQUIRE(rstatus == coro::poll_status::event);
|
||||
REQUIRE(rbytes == msg.length());
|
||||
response.resize(rbytes);
|
||||
REQUIRE(response == msg);
|
||||
|
||||
co_return;
|
||||
};
|
||||
|
||||
REQUIRE(scheduler.schedule(make_client_task()));
|
||||
|
||||
// Shutting down the scheduler will cause it to stop accepting new connections, to avoid requiring
|
||||
// another scheduler for this test the main thread can spin sleep until the tcp scheduler reports
|
||||
// that it is empty. tcp schedulers do not report their accept task as a task in its size/empty count.
|
||||
while (!scheduler.empty())
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{1});
|
||||
}
|
||||
|
||||
scheduler.shutdown();
|
||||
REQUIRE(scheduler.empty());
|
||||
}
|
||||
|
||||
TEST_CASE("tcp_scheduler echo server")
|
||||
{
|
||||
const std::string msg{"Hello from client"};
|
||||
|
||||
tcp_scheduler_echo_server(coro::net::ip_address::from_string("127.0.0.1"), msg);
|
||||
tcp_scheduler_echo_server(coro::net::hostname{"localhost"}, msg);
|
||||
}
|
|
@ -91,8 +91,7 @@ TEST_CASE("io_scheduler task with multiple yields on event")
|
|||
{
|
||||
std::atomic<uint64_t> counter{0};
|
||||
coro::io_scheduler s{};
|
||||
auto token = s.generate_resume_token<uint64_t>();
|
||||
// coro::resume_token<uint64_t> token{s};
|
||||
auto token = s.make_resume_token<uint64_t>();
|
||||
|
||||
auto func = [&]() -> coro::task<void> {
|
||||
std::cerr << "1st suspend\n";
|
||||
|
@ -383,7 +382,7 @@ TEST_CASE("io_scheduler separate thread resume")
|
|||
auto func = [&]() -> coro::task<void> {
|
||||
// User manual resume token, create one specifically for each task being generated
|
||||
// coro::resume_token<void> token{s};
|
||||
auto token = s.generate_resume_token<void>();
|
||||
auto token = s.make_resume_token<void>();
|
||||
|
||||
// Normally this thread is probably already running for real world use cases, but in general
|
||||
// the 3rd party function api will be set, they should have "user data" void* or ability
|
||||
|
@ -568,8 +567,7 @@ TEST_CASE("io_scheduler yield user event")
|
|||
{
|
||||
std::string expected_result = "Here I am!";
|
||||
coro::io_scheduler s{};
|
||||
auto token = s.generate_resume_token<std::string>();
|
||||
// coro::resume_token<std::string> token{s};
|
||||
auto token = s.make_resume_token<std::string>();
|
||||
|
||||
auto func = [&]() -> coro::task<void> {
|
||||
co_await s.yield(token);
|
||||
|
@ -588,7 +586,7 @@ TEST_CASE("io_scheduler yield user event multiple waiters")
|
|||
{
|
||||
std::atomic<int> counter{0};
|
||||
coro::io_scheduler s{};
|
||||
auto token = s.generate_resume_token<void>();
|
||||
auto token = s.make_resume_token<void>();
|
||||
|
||||
auto func = [&](int amount) -> coro::task<void> {
|
||||
co_await token;
|
||||
|
@ -660,7 +658,7 @@ TEST_CASE("io_scheduler task throws")
|
|||
TEST_CASE("io_scheduler task throws after resume")
|
||||
{
|
||||
coro::io_scheduler s{};
|
||||
auto token = s.generate_resume_token<void>();
|
||||
auto token = s.make_resume_token<void>();
|
||||
|
||||
auto func = [&]() -> coro::task<void> {
|
||||
co_await token;
|
||||
|
@ -711,7 +709,7 @@ TEST_CASE("io_scheduler schedule vector<task>")
|
|||
tasks.emplace_back(make_task());
|
||||
tasks.emplace_back(make_task());
|
||||
|
||||
s.schedule(tasks);
|
||||
s.schedule(std::move(tasks));
|
||||
|
||||
REQUIRE(tasks.empty());
|
||||
|
||||
|
|
|
@ -1,150 +0,0 @@
|
|||
#include "catch.hpp"
|
||||
|
||||
#include <coro/coro.hpp>
|
||||
|
||||
TEST_CASE("tcp_scheduler no on connection throws")
|
||||
{
|
||||
REQUIRE_THROWS(coro::tcp_scheduler{coro::tcp_scheduler::options{.on_connection = nullptr}});
|
||||
}
|
||||
|
||||
TEST_CASE("tcp_scheduler echo server ip address")
|
||||
{
|
||||
const std::string msg{"Hello from client"};
|
||||
|
||||
auto on_connection = [&msg](coro::tcp_scheduler& scheduler, coro::net::socket sock) -> coro::task<void> {
|
||||
std::string in(64, '\0');
|
||||
|
||||
auto [rstatus, rbytes] = co_await scheduler.read(sock, std::span<char>{in.data(), in.size()});
|
||||
REQUIRE(rstatus == coro::poll_status::event);
|
||||
|
||||
in.resize(rbytes);
|
||||
REQUIRE(in == msg);
|
||||
|
||||
auto [wstatus, wbytes] = co_await scheduler.write(sock, std::span<const char>(in.data(), in.length()));
|
||||
REQUIRE(wstatus == coro::poll_status::event);
|
||||
REQUIRE(wbytes == in.length());
|
||||
|
||||
co_return;
|
||||
};
|
||||
|
||||
coro::tcp_scheduler scheduler{coro::tcp_scheduler::options{
|
||||
.address = coro::net::ip_address::from_string("0.0.0.0"),
|
||||
.port = 8080,
|
||||
.backlog = 128,
|
||||
.on_connection = on_connection,
|
||||
.io_options = coro::io_scheduler::options{.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn}}};
|
||||
|
||||
auto make_client_task = [&scheduler, &msg]() -> coro::task<void> {
|
||||
coro::tcp_client client{
|
||||
scheduler,
|
||||
coro::tcp_client::options{.address = coro::net::ip_address::from_string("127.0.0.1"), .port = 8080, .domain = coro::net::domain_t::ipv4}};
|
||||
|
||||
auto cstatus = co_await client.connect();
|
||||
REQUIRE(cstatus == coro::connect_status::connected);
|
||||
|
||||
auto [wstatus, wbytes] =
|
||||
co_await scheduler.write(client.socket(), std::span<const char>{msg.data(), msg.length()});
|
||||
|
||||
REQUIRE(wstatus == coro::poll_status::event);
|
||||
REQUIRE(wbytes == msg.length());
|
||||
|
||||
std::string response(64, '\0');
|
||||
|
||||
auto [rstatus, rbytes] =
|
||||
co_await scheduler.read(client.socket(), std::span<char>{response.data(), response.length()});
|
||||
|
||||
REQUIRE(rstatus == coro::poll_status::event);
|
||||
REQUIRE(rbytes == msg.length());
|
||||
response.resize(rbytes);
|
||||
REQUIRE(response == msg);
|
||||
|
||||
co_return;
|
||||
};
|
||||
|
||||
scheduler.schedule(make_client_task());
|
||||
|
||||
// Shutting down the scheduler will cause it to stop accepting new connections, to avoid requiring
|
||||
// another scheduler for this test the main thread can spin sleep until the tcp scheduler reports
|
||||
// that it is empty. tcp schedulers do not report their accept task as a task in its size/empty count.
|
||||
while (!scheduler.empty())
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{1});
|
||||
}
|
||||
|
||||
scheduler.shutdown();
|
||||
REQUIRE(scheduler.empty());
|
||||
}
|
||||
|
||||
TEST_CASE("tcp_scheduler echo server hostname")
|
||||
{
|
||||
const std::string msg{"Hello from client with dns lookup"};
|
||||
|
||||
auto on_connection = [&msg](coro::tcp_scheduler& scheduler, coro::net::socket sock) -> coro::task<void> {
|
||||
std::string in(64, '\0');
|
||||
|
||||
auto [rstatus, rbytes] = co_await scheduler.read(sock, std::span<char>{in.data(), in.size()});
|
||||
REQUIRE(rstatus == coro::poll_status::event);
|
||||
|
||||
in.resize(rbytes);
|
||||
REQUIRE(in == msg);
|
||||
|
||||
auto [wstatus, wbytes] = co_await scheduler.write(sock, std::span<const char>(in.data(), in.length()));
|
||||
REQUIRE(wstatus == coro::poll_status::event);
|
||||
REQUIRE(wbytes == in.length());
|
||||
|
||||
co_return;
|
||||
};
|
||||
|
||||
coro::tcp_scheduler scheduler{coro::tcp_scheduler::options{
|
||||
.address = coro::net::ip_address::from_string("0.0.0.0"),
|
||||
.port = 8080,
|
||||
.backlog = 128,
|
||||
.on_connection = on_connection,
|
||||
.io_options = coro::io_scheduler::options{.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn}}};
|
||||
|
||||
coro::dns_client dns_client{scheduler, std::chrono::seconds{5}};
|
||||
|
||||
auto make_client_task = [&scheduler, &dns_client, &msg]() -> coro::task<void> {
|
||||
coro::tcp_client client{
|
||||
scheduler,
|
||||
coro::tcp_client::options{
|
||||
.address = coro::net::hostname{"localhost"},
|
||||
.port = 8080,
|
||||
.domain = coro::net::domain_t::ipv4,
|
||||
.dns = &dns_client}};
|
||||
|
||||
auto cstatus = co_await client.connect();
|
||||
REQUIRE(cstatus == coro::connect_status::connected);
|
||||
|
||||
auto [wstatus, wbytes] =
|
||||
co_await scheduler.write(client.socket(), std::span<const char>{msg.data(), msg.length()});
|
||||
|
||||
REQUIRE(wstatus == coro::poll_status::event);
|
||||
REQUIRE(wbytes == msg.length());
|
||||
|
||||
std::string response(64, '\0');
|
||||
|
||||
auto [rstatus, rbytes] =
|
||||
co_await scheduler.read(client.socket(), std::span<char>{response.data(), response.length()});
|
||||
|
||||
REQUIRE(rstatus == coro::poll_status::event);
|
||||
REQUIRE(rbytes == msg.length());
|
||||
response.resize(rbytes);
|
||||
REQUIRE(response == msg);
|
||||
|
||||
co_return;
|
||||
};
|
||||
|
||||
scheduler.schedule(make_client_task());
|
||||
|
||||
// Shutting down the scheduler will cause it to stop accepting new connections, to avoid requiring
|
||||
// another scheduler for this test the main thread can spin sleep until the tcp scheduler reports
|
||||
// that it is empty. tcp schedulers do not report their accept task as a task in its size/empty count.
|
||||
while (!scheduler.empty())
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{1});
|
||||
}
|
||||
|
||||
scheduler.shutdown();
|
||||
REQUIRE(scheduler.empty());
|
||||
}
|
Loading…
Add table
Reference in a new issue