1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

io_scheduler support timeouts (#20)

* io_scheduler support timeouts

Closes #19

* io_scheduler resume_token<poll_status> for poll()

* io_scheduler read/write now use poll_status + size return
This commit is contained in:
Josh Baldwin 2020-11-11 23:06:42 -07:00 committed by GitHub
parent 1c7b340c72
commit b15c7c1d16
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 540 additions and 88 deletions

View file

@ -165,10 +165,17 @@ public:
class io_scheduler class io_scheduler
{ {
public:
using fd_t = int;
private: private:
using clock = std::chrono::steady_clock;
using time_point = clock::time_point;
using task_variant = std::variant<coro::task<void>, std::coroutine_handle<>>; using task_variant = std::variant<coro::task<void>, std::coroutine_handle<>>;
using task_queue = std::deque<task_variant>; using task_queue = std::deque<task_variant>;
using timer_tokens = std::multimap<time_point, resume_token<poll_status>*>;
/// resume_token<T> needs to be able to call internal scheduler::resume() /// resume_token<T> needs to be able to call internal scheduler::resume()
template<typename return_type> template<typename return_type>
friend class resume_token; friend class resume_token;
@ -317,9 +324,58 @@ private:
static constexpr const int m_accept_object{0}; static constexpr const int m_accept_object{0};
static constexpr const void* m_accept_ptr = &m_accept_object; static constexpr const void* m_accept_ptr = &m_accept_object;
public: static constexpr const int m_timer_object{0};
using fd_t = int; static constexpr const void* m_timer_ptr = &m_timer_object;
/**
* An operation is an awaitable type with a coroutine to resume the task scheduled on one of
* the executor threads.
*/
class operation
{
friend class io_scheduler;
/**
* Only io_schedulers can create operations when a task is being scheduled.
* @param tp The io scheduler that created this operation.
*/
explicit operation(io_scheduler& ios) noexcept : m_io_scheduler(ios) {}
public:
/**
* Operations always pause so the executing thread and be switched.
*/
auto await_ready() noexcept -> bool { return false; }
/**
* Suspending always returns to the caller (using void return of await_suspend()) and
* stores the coroutine internally for the executing thread to resume from.
*/
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
{
// m_awaiting_coroutine = awaiting_coroutine;
m_io_scheduler.resume(awaiting_coroutine);
}
/**
* no-op as this is the function called first by the io_scheduler's executing thread.
*/
auto await_resume() noexcept -> void {}
private:
/// The io_scheduler that this operation will execute on.
io_scheduler& m_io_scheduler;
// // The coroutine awaiting execution.
// std::coroutine_handle<> m_awaiting_coroutine{nullptr};
};
/**
* Schedules the currently executing task onto this io_scheduler, effectively placing it at
* the end of the FIFO queue.
* `co_await s.yield()`
*/
auto schedule() -> operation { return operation{*this}; }
public:
enum class thread_strategy_t enum class thread_strategy_t
{ {
/// Spawns a background thread for the scheduler to run on. /// Spawns a background thread for the scheduler to run on.
@ -346,17 +402,19 @@ public:
io_scheduler(const options opts = options{8, 2, thread_strategy_t::spawn}) io_scheduler(const options opts = options{8, 2, thread_strategy_t::spawn})
: m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)), : m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)),
m_accept_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)), m_accept_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)),
m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)),
m_thread_strategy(opts.thread_strategy), m_thread_strategy(opts.thread_strategy),
m_task_manager(opts.reserve_size, opts.growth_factor) m_task_manager(opts.reserve_size, opts.growth_factor)
{ {
struct epoll_event e epoll_event e{};
{
};
e.events = EPOLLIN; e.events = EPOLLIN;
e.data.ptr = const_cast<void*>(m_accept_ptr); e.data.ptr = const_cast<void*>(m_accept_ptr);
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_accept_fd, &e); epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_accept_fd, &e);
e.data.ptr = const_cast<void*>(m_timer_ptr);
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_timer_fd, &e);
if (m_thread_strategy == thread_strategy_t::spawn) if (m_thread_strategy == thread_strategy_t::spawn)
{ {
m_scheduler_thread = std::thread([this] { process_events_dedicated_thread(); }); m_scheduler_thread = std::thread([this] { process_events_dedicated_thread(); });
@ -386,6 +444,11 @@ public:
close(m_accept_fd); close(m_accept_fd);
m_accept_fd = -1; m_accept_fd = -1;
} }
if (m_timer_fd != -1)
{
close(m_timer_fd);
m_timer_fd = -1;
}
} }
/** /**
@ -490,24 +553,78 @@ public:
return false; return false;
} }
return schedule(scheduler_after_func(std::move(task), after)); return schedule(make_scheduler_after_task(std::move(task), after));
}
/**
* Schedules a task to be run at a specific time in the future.
* @param task
* @param time
* @return True if the task is scheduled. False if time is in the past or the scheduler is
* trying to shutdown.
*/
auto schedule_at(coro::task<void> task, time_point time) -> bool
{
auto now = clock::now();
// If the requested time is in the past (or now!) bail out!
if (time <= now)
{
return false;
}
auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
return schedule_after(std::move(task), amount);
} }
/** /**
* Polls a specific file descriptor for the given poll operation. * Polls a specific file descriptor for the given poll operation.
* @param fd The file descriptor to poll. * @param fd The file descriptor to poll.
* @param op The type of poll operation to perform. * @param op The type of poll operation to perform.
* @param timeout The timeout for this poll operation, if timeout <= 0 then poll will block
* indefinitely until the event is triggered.
*/ */
auto poll(fd_t fd, poll_op op) -> coro::task<void> auto poll(fd_t fd, poll_op op, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<poll_status>
{ {
co_await unsafe_yield<void>([&](resume_token<void>& token) { // Setup two events, a timeout event and the actual poll for op event.
epoll_event e{}; // Whichever triggers first will delete the other to guarantee only one wins.
e.events = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLET | EPOLLRDHUP; // The resume token will be set by the scheduler to what the event turned out to be.
e.data.ptr = &token;
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e); using namespace std::chrono_literals;
}); bool timeout_requested = (timeout > 0ms);
resume_token<poll_status> token{};
timer_tokens::iterator timer_pos;
if (timeout_requested)
{
timer_pos = add_timer_token(clock::now() + timeout, &token);
}
epoll_event e{};
e.events = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLET | EPOLLRDHUP;
e.data.ptr = &token;
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e);
auto status = co_await unsafe_yield<poll_status>(token);
switch (status)
{
// The event triggered first, delete the timeout.
case poll_status::event:
if (timeout_requested)
{
remove_timer_token(timer_pos);
}
break;
default:
// Deleting the event is done regardless below in epoll_ctl()
break;
}
epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, nullptr); epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
co_return status;
} }
/** /**
@ -516,19 +633,21 @@ public:
* size of data. The number of bytes read is returned. * size of data. The number of bytes read is returned.
* @param fd The file desriptor to read from. * @param fd The file desriptor to read from.
* @param buffer The buffer to place read bytes into. * @param buffer The buffer to place read bytes into.
* @param timeout The timeout for the read operation, if timeout <= 0 then read will block
* indefinitely until the event is triggered.
* @return The number of bytes read or an error code if negative. * @return The number of bytes read or an error code if negative.
*/ */
auto read(fd_t fd, std::span<char> buffer) -> coro::task<ssize_t> auto read(fd_t fd, std::span<char> buffer, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<std::pair<poll_status, ssize_t>>
{ {
/*auto status =*/co_await poll(fd, poll_op::read); auto status = co_await poll(fd, poll_op::read, timeout);
co_return ::read(fd, buffer.data(), buffer.size()); switch (status)
// switch(status) {
// { case poll_status::event:
// case poll_status::success: co_return {status, ::read(fd, buffer.data(), buffer.size())};
// co_return ::read(fd, buffer.data(), buffer.size()); default:
// default: co_return {status, 0};
// co_return 0; }
// }
} }
/** /**
@ -538,17 +657,28 @@ public:
* @param buffer The data to write to `fd`. * @param buffer The data to write to `fd`.
* @return The number of bytes written or an error code if negative. * @return The number of bytes written or an error code if negative.
*/ */
auto write(fd_t fd, const std::span<const char> buffer) -> coro::task<ssize_t> auto write(fd_t fd, const std::span<const char> buffer) -> coro::task<std::pair<poll_status, ssize_t>>
{ {
/*auto status =*/co_await poll(fd, poll_op::write); auto status = co_await poll(fd, poll_op::write);
co_return ::write(fd, buffer.data(), buffer.size()); switch (status)
// switch(status) {
// { case poll_status::event:
// case poll_status::success: co_return {status, ::write(fd, buffer.data(), buffer.size())};
// co_return ::write(fd, buffer.data(), buffer.size()); default:
// default: co_return {status, 0};
// co_return 0; }
// } }
/**
* Immediately yields the current task and places it at the end of the queue of tasks waiting
* to be processed. This will immediately be picked up again once it naturally goes through the
* FIFO task queue. This function is useful to yielding long processing tasks to let other tasks
* get processing time.
*/
auto yield() -> coro::task<void>
{
co_await schedule();
co_return;
} }
/** /**
@ -602,38 +732,43 @@ public:
/** /**
* Yields the current coroutine for `amount` of time. * Yields the current coroutine for `amount` of time.
* @throw std::runtime_error If the internal system failed to setup required resources to wait.
* @param amount The amount of time to wait. * @param amount The amount of time to wait.
*/ */
auto yield_for(std::chrono::milliseconds amount) -> coro::task<void> auto yield_for(std::chrono::milliseconds amount) -> coro::task<void>
{ {
fd_t timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); // If the requested amount of time is negative or zero just return.
if (timer_fd == -1) using namespace std::chrono_literals;
if (amount <= 0ms)
{ {
std::string msg = "Failed to create timerfd errorno=[" + std::string{strerror(errno)} + "]."; co_return;
throw std::runtime_error(msg.data());
} }
struct itimerspec ts resume_token<poll_status> token{};
add_timer_token(clock::now() + amount, &token);
// Wait for the token timer to trigger.
co_await token;
co_return;
}
/**
* Yields the current coroutine until `time`. If time is in the past this function will
* return immediately.
* @param time The time point in the future to yield until.
*/
auto yield_until(time_point time) -> coro::task<void>
{
auto now = clock::now();
// If the requested time is in the past (or now!) just return.
if (time <= now)
{ {
}; co_return;
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(amount);
amount -= seconds;
auto nanoseconds = std::chrono::duration_cast<std::chrono::nanoseconds>(amount);
ts.it_value.tv_sec = seconds.count();
ts.it_value.tv_nsec = nanoseconds.count();
if (timerfd_settime(timer_fd, 0, &ts, nullptr) == -1)
{
std::string msg = "Failed to set timerfd errorno=[" + std::string{strerror(errno)} + "].";
throw std::runtime_error(msg.data());
} }
uint64_t value{0}; auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
co_await read(timer_fd, std::span<char>{reinterpret_cast<char*>(&value), sizeof(value)}); co_await yield_for(amount);
close(timer_fd);
co_return; co_return;
} }
@ -648,6 +783,13 @@ public:
return resume_token<return_type>(*this); return resume_token<return_type>(*this);
} }
/**
* If runnint in mode thread_strategy_t::manual this function must be called at regular
* intervals to process events on the io_scheduler. This function will do nothing in any
* other thread_strategy_t mode.
* @param timeout The timeout to wait for events.
* @return The number of executing tasks.
*/
auto process_events(std::chrono::milliseconds timeout = std::chrono::milliseconds{1000}) -> std::size_t auto process_events(std::chrono::milliseconds timeout = std::chrono::milliseconds{1000}) -> std::size_t
{ {
process_events_external_thread(timeout); process_events_external_thread(timeout);
@ -708,6 +850,12 @@ private:
fd_t m_epoll_fd{-1}; fd_t m_epoll_fd{-1};
/// The event loop accept new tasks and resume tasks file descriptor. /// The event loop accept new tasks and resume tasks file descriptor.
fd_t m_accept_fd{-1}; fd_t m_accept_fd{-1};
/// The event loop timer fd for timed events, e.g. yield_for() or scheduler_after().
fd_t m_timer_fd{-1};
/// The map of time point's to resume tokens for tasks that are yielding for a period of time
/// or for tasks that are polling with timeouts.
timer_tokens m_timer_tokens;
/// The threading strategy this scheduler is using. /// The threading strategy this scheduler is using.
thread_strategy_t m_thread_strategy; thread_strategy_t m_thread_strategy;
@ -735,29 +883,17 @@ private:
task_manager m_task_manager; task_manager m_task_manager;
auto scheduler_after_func(coro::task<void> inner_task, std::chrono::milliseconds wait_time) -> coro::task<void> auto make_scheduler_after_task(coro::task<void> task, std::chrono::milliseconds wait_time) -> coro::task<void>
{ {
// Seems to already be done.
if (inner_task.is_ready())
{
co_return;
}
// Wait for the period requested, and then resume their task. // Wait for the period requested, and then resume their task.
co_await yield_for(wait_time); co_await yield_for(wait_time);
inner_task.resume(); co_await task;
if (!inner_task.is_ready())
{
m_task_manager.store(std::move(inner_task));
}
co_return; co_return;
} }
template<typename return_type, std::invocable<resume_token<return_type>&> before_functor> template<typename return_type>
auto unsafe_yield(before_functor before) -> coro::task<return_type> auto unsafe_yield(resume_token<return_type>& token) -> coro::task<return_type>
{ {
resume_token<return_type> token{};
before(token);
co_await token; co_await token;
if constexpr (std::is_same_v<return_type, void>) if constexpr (std::is_same_v<return_type, void>)
{ {
@ -769,6 +905,34 @@ private:
} }
} }
auto add_timer_token(time_point tp, resume_token<poll_status>* token_ptr) -> timer_tokens::iterator
{
auto pos = m_timer_tokens.emplace(tp, token_ptr);
// If this item was inserted as the smallest time point, update the timeout.
if (pos == m_timer_tokens.begin())
{
update_timeout(clock::now());
}
return pos;
}
auto remove_timer_token(timer_tokens::iterator pos) -> void
{
auto is_first = (m_timer_tokens.begin() == pos);
m_timer_tokens.erase(pos);
// If this was the first item, update the timeout. It would be acceptable to just let it
// also fire the timeout as the event loop will ignore it since nothing will have timed
// out but it feels like the right thing to do to update it to the correct timeout value.
if (is_first)
{
update_timeout(clock::now());
}
}
auto resume(std::coroutine_handle<> handle) -> void auto resume(std::coroutine_handle<> handle) -> void
{ {
{ {
@ -790,13 +954,15 @@ private:
static constexpr std::size_t m_max_events = 8; static constexpr std::size_t m_max_events = 8;
std::array<struct epoll_event, m_max_events> m_events{}; std::array<struct epoll_event, m_max_events> m_events{};
auto process_task_and_start(task<void>& task) -> void { m_task_manager.store(std::move(task)).resume(); }
inline auto process_task_variant(task_variant& tv) -> void inline auto process_task_variant(task_variant& tv) -> void
{ {
if (std::holds_alternative<coro::task<void>>(tv)) if (std::holds_alternative<coro::task<void>>(tv))
{ {
auto& task = std::get<coro::task<void>>(tv); auto& task = std::get<coro::task<void>>(tv);
// Store the users task and immediately start executing it. // Store the users task and immediately start executing it.
m_task_manager.store(std::move(task)).resume(); process_task_and_start(task);
} }
else else
{ {
@ -846,7 +1012,8 @@ private:
{ {
for (std::size_t i = 0; i < static_cast<std::size_t>(event_count); ++i) for (std::size_t i = 0; i < static_cast<std::size_t>(event_count); ++i)
{ {
void* handle_ptr = m_events[i].data.ptr; epoll_event& event = m_events[i];
void* handle_ptr = event.data.ptr;
if (handle_ptr == m_accept_ptr) if (handle_ptr == m_accept_ptr)
{ {
@ -864,12 +1031,42 @@ private:
tasks_ready = true; tasks_ready = true;
} }
else if (handle_ptr == m_timer_ptr)
{
// If the timer fd triggered, loop and call every task that has a wait time <= now.
while (!m_timer_tokens.empty())
{
// Now is continuously calculated since resuming tasks could take a fairly
// significant amount of time and might 'trigger' more timeouts.
auto now = clock::now();
auto first = m_timer_tokens.begin();
auto [tp, token_ptr] = *first;
if (tp <= now)
{
// Important to erase first so if any timers are updated after resume
// this timer won't be taken into account.
m_timer_tokens.erase(first);
// Every event triggered on the timer tokens is *always* a timeout.
token_ptr->resume(poll_status::timeout);
}
else
{
break;
}
}
// Update the time to the next smallest time point, re-take the current now time
// since processing tasks could shit the time.
update_timeout(clock::now());
}
else else
{ {
// Individual poll task wake-up, this will queue the coroutines waiting // Individual poll task wake-up, this will queue the coroutines waiting
// on the resume token into the FIFO queue for processing. // on the resume token into the FIFO queue for processing.
auto* token_ptr = static_cast<resume_token<void>*>(handle_ptr); auto* token_ptr = static_cast<resume_token<poll_status>*>(handle_ptr);
token_ptr->resume(); token_ptr->resume(event_to_poll_status(event.events));
} }
} }
} }
@ -885,6 +1082,24 @@ private:
} }
} }
auto event_to_poll_status(uint32_t events) -> poll_status
{
if (events & EPOLLIN || events & EPOLLOUT)
{
return poll_status::event;
}
else if (events & EPOLLERR)
{
return poll_status::error;
}
else if (events & EPOLLRDHUP || events & EPOLLHUP)
{
return poll_status::closed;
}
throw std::runtime_error{"invalid epoll state"};
}
auto process_events_external_thread(std::chrono::milliseconds user_timeout) -> void auto process_events_external_thread(std::chrono::milliseconds user_timeout) -> void
{ {
// Do not allow two threads to process events at the same time. // Do not allow two threads to process events at the same time.
@ -906,6 +1121,52 @@ private:
} }
m_running.exchange(false, std::memory_order::release); m_running.exchange(false, std::memory_order::release);
} }
auto update_timeout(time_point now) -> void
{
using namespace std::chrono_literals;
if (!m_timer_tokens.empty())
{
auto& [tp, task] = *m_timer_tokens.begin();
auto amount = tp - now;
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(amount);
amount -= seconds;
auto nanoseconds = std::chrono::duration_cast<std::chrono::nanoseconds>(amount);
// As a safeguard if both values end up as zero (or negative) then trigger the timeout
// immediately as zero disarms timerfd according to the man pages and negative valeues
// will result in an error return value.
if (seconds <= 0s)
{
seconds = 0s;
if (nanoseconds <= 0ns)
{
// just trigger immediately!
nanoseconds = 1ns;
}
}
itimerspec ts{};
ts.it_value.tv_sec = seconds.count();
ts.it_value.tv_nsec = nanoseconds.count();
if (timerfd_settime(m_timer_fd, 0, &ts, nullptr) == -1)
{
std::string msg = "Failed to set timerfd errorno=[" + std::string{strerror(errno)} + "].";
throw std::runtime_error(msg.data());
}
}
else
{
// Setting these values to zero disables the timer.
itimerspec ts{};
ts.it_value.tv_sec = 0;
ts.it_value.tv_nsec = 0;
timerfd_settime(m_timer_fd, 0, &ts, nullptr);
}
}
}; };
template<typename return_type> template<typename return_type>

View file

@ -17,7 +17,7 @@ enum class poll_op
enum class poll_status enum class poll_status
{ {
/// The poll operation was was successful. /// The poll operation was was successful.
success, event,
/// The poll operation timed out. /// The poll operation timed out.
timeout, timeout,
/// The file descriptor had an error while polling. /// The file descriptor had an error while polling.

View file

@ -34,7 +34,7 @@ public:
{ {
friend class thread_pool; friend class thread_pool;
/** /**
* Only thread_pool's can create operations when a task is being scheduled. * Only thread_pools can create operations when a task is being scheduled.
* @param tp The thread pool that created this operation. * @param tp The thread pool that created this operation.
*/ */
explicit operation(thread_pool& tp) noexcept; explicit operation(thread_pool& tp) noexcept;

View file

@ -141,8 +141,8 @@ TEST_CASE("io_scheduler task with read poll")
auto func = [&]() -> coro::task<void> { auto func = [&]() -> coro::task<void> {
// Poll will block until there is data to read. // Poll will block until there is data to read.
/*auto status =*/co_await s.poll(trigger_fd, coro::poll_op::read); auto status = co_await s.poll(trigger_fd, coro::poll_op::read);
/*REQUIRE(status == coro::poll_status::success);*/ REQUIRE(status == coro::poll_status::event);
co_return; co_return;
}; };
@ -156,6 +156,49 @@ TEST_CASE("io_scheduler task with read poll")
close(trigger_fd); close(trigger_fd);
} }
TEST_CASE("io_scheduler task with read poll with timeout")
{
using namespace std::chrono_literals;
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
// Poll with a timeout (but don't timeout).
auto status = co_await s.poll(trigger_fd, coro::poll_op::read, 50ms);
REQUIRE(status == coro::poll_status::event);
co_return;
};
s.schedule(func());
uint64_t value{42};
write(trigger_fd, &value, sizeof(value));
s.shutdown();
REQUIRE(s.empty());
close(trigger_fd);
}
TEST_CASE("io_scheduler task with read poll timeout")
{
using namespace std::chrono_literals;
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
// Poll with a timeout (but don't timeout).
auto status = co_await s.poll(trigger_fd, coro::poll_op::read, 10ms);
REQUIRE(status == coro::poll_status::timeout);
co_return;
};
s.schedule(func());
s.shutdown();
REQUIRE(s.empty());
close(trigger_fd);
}
TEST_CASE("io_scheduler task with read") TEST_CASE("io_scheduler task with read")
{ {
constexpr uint64_t expected_value{42}; constexpr uint64_t expected_value{42};
@ -164,8 +207,10 @@ TEST_CASE("io_scheduler task with read")
auto func = [&]() -> coro::task<void> { auto func = [&]() -> coro::task<void> {
uint64_t val{0}; uint64_t val{0};
auto bytes_read = co_await s.read(trigger_fd, std::span<char>(reinterpret_cast<char*>(&val), sizeof(val))); auto [status, bytes_read] =
co_await s.read(trigger_fd, std::span<char>(reinterpret_cast<char*>(&val), sizeof(val)));
REQUIRE(status == coro::poll_status::event);
REQUIRE(bytes_read == sizeof(uint64_t)); REQUIRE(bytes_read == sizeof(uint64_t));
REQUIRE(val == expected_value); REQUIRE(val == expected_value);
co_return; co_return;
@ -179,6 +224,55 @@ TEST_CASE("io_scheduler task with read")
close(trigger_fd); close(trigger_fd);
} }
TEST_CASE("io_scheduler task with read with timeout")
{
using namespace std::chrono_literals;
constexpr uint64_t expected_value{42};
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
uint64_t val{0};
auto [status, bytes_read] =
co_await s.read(trigger_fd, std::span<char>(reinterpret_cast<char*>(&val), sizeof(val)), 50ms);
REQUIRE(status == coro::poll_status::event);
REQUIRE(bytes_read == sizeof(uint64_t));
REQUIRE(val == expected_value);
co_return;
};
s.schedule(func());
write(trigger_fd, &expected_value, sizeof(expected_value));
s.shutdown();
close(trigger_fd);
}
TEST_CASE("io_scheduler task with read timeout")
{
using namespace std::chrono_literals;
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
uint64_t val{0};
auto [status, bytes_read] =
co_await s.read(trigger_fd, std::span<char>(reinterpret_cast<char*>(&val), sizeof(val)), 10ms);
REQUIRE(status == coro::poll_status::timeout);
REQUIRE(bytes_read == 0);
REQUIRE(val == 0);
co_return;
};
s.schedule(func());
s.shutdown();
close(trigger_fd);
}
TEST_CASE("io_scheduler task with read and write same fd") TEST_CASE("io_scheduler task with read and write same fd")
{ {
// Since this test uses an eventfd, only 1 task at a time can poll/read/write to that // Since this test uses an eventfd, only 1 task at a time can poll/read/write to that
@ -192,14 +286,17 @@ TEST_CASE("io_scheduler task with read and write same fd")
coro::io_scheduler s{}; coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> { auto func = [&]() -> coro::task<void> {
auto bytes_written = co_await s.write( auto [read_status, bytes_written] = co_await s.write(
trigger_fd, std::span<const char>(reinterpret_cast<const char*>(&expected_value), sizeof(expected_value))); trigger_fd, std::span<const char>(reinterpret_cast<const char*>(&expected_value), sizeof(expected_value)));
REQUIRE(read_status == coro::poll_status::event);
REQUIRE(bytes_written == sizeof(uint64_t)); REQUIRE(bytes_written == sizeof(uint64_t));
uint64_t val{0}; uint64_t val{0};
auto bytes_read = co_await s.read(trigger_fd, std::span<char>(reinterpret_cast<char*>(&val), sizeof(val))); auto [write_status, bytes_read] =
co_await s.read(trigger_fd, std::span<char>(reinterpret_cast<char*>(&val), sizeof(val)));
REQUIRE(write_status == coro::poll_status::event);
REQUIRE(bytes_read == sizeof(uint64_t)); REQUIRE(bytes_read == sizeof(uint64_t));
REQUIRE(val == expected_value); REQUIRE(val == expected_value);
co_return; co_return;
@ -222,7 +319,9 @@ TEST_CASE("io_scheduler task with read and write pipe")
auto read_func = [&]() -> coro::task<void> { auto read_func = [&]() -> coro::task<void> {
std::string buffer(4096, '0'); std::string buffer(4096, '0');
std::span<char> view{buffer.data(), buffer.size()}; std::span<char> view{buffer.data(), buffer.size()};
auto bytes_read = co_await s.read(pipe_fd[0], view); auto [status, bytes_read] = co_await s.read(pipe_fd[0], view);
REQUIRE(status == coro::poll_status::event);
REQUIRE(bytes_read == msg.size()); REQUIRE(bytes_read == msg.size());
buffer.resize(bytes_read); buffer.resize(bytes_read);
REQUIRE(buffer == msg); REQUIRE(buffer == msg);
@ -230,7 +329,9 @@ TEST_CASE("io_scheduler task with read and write pipe")
auto write_func = [&]() -> coro::task<void> { auto write_func = [&]() -> coro::task<void> {
std::span<const char> view{msg.data(), msg.size()}; std::span<const char> view{msg.data(), msg.size()};
auto bytes_written = co_await s.write(pipe_fd[1], view); auto [status, bytes_written] = co_await s.write(pipe_fd[1], view);
REQUIRE(status == coro::poll_status::event);
REQUIRE(bytes_written == msg.size()); REQUIRE(bytes_written == msg.size());
}; };
@ -243,7 +344,7 @@ TEST_CASE("io_scheduler task with read and write pipe")
} }
static auto standalone_read(coro::io_scheduler& s, coro::io_scheduler::fd_t socket, std::span<char> buffer) static auto standalone_read(coro::io_scheduler& s, coro::io_scheduler::fd_t socket, std::span<char> buffer)
-> coro::task<ssize_t> -> coro::task<std::pair<coro::poll_status, ssize_t>>
{ {
// do other stuff in larger function // do other stuff in larger function
co_return co_await s.read(socket, buffer); co_return co_await s.read(socket, buffer);
@ -258,10 +359,11 @@ TEST_CASE("io_scheduler standalone read task")
auto func = [&]() -> coro::task<void> { auto func = [&]() -> coro::task<void> {
ssize_t v{0}; ssize_t v{0};
auto bytes_read = auto [status, bytes_read] =
co_await standalone_read(s, trigger_fd, std::span<char>(reinterpret_cast<char*>(&v), sizeof(v))); co_await standalone_read(s, trigger_fd, std::span<char>(reinterpret_cast<char*>(&v), sizeof(v)));
REQUIRE(bytes_read == sizeof(ssize_t));
REQUIRE(status == coro::poll_status::event);
REQUIRE(bytes_read == sizeof(ssize_t));
REQUIRE(v == expected_value); REQUIRE(v == expected_value);
co_return; co_return;
}; };
@ -355,7 +457,7 @@ TEST_CASE("io_scheduler with basic task")
REQUIRE(counter == expected_value); REQUIRE(counter == expected_value);
} }
TEST_CASE("io_scheduler yield for") TEST_CASE("io_scheduler scheduler_after")
{ {
constexpr std::chrono::milliseconds wait_for{50}; constexpr std::chrono::milliseconds wait_for{50};
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
@ -376,6 +478,33 @@ TEST_CASE("io_scheduler yield for")
REQUIRE(duration >= wait_for); REQUIRE(duration >= wait_for);
} }
TEST_CASE("io_scheduler schedule_at")
{
// Because schedule_at() will take its own time internally the wait_for might be off by a bit.
constexpr std::chrono::milliseconds epsilon{3};
constexpr std::chrono::milliseconds wait_for{50};
std::atomic<uint64_t> counter{0};
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
++counter;
co_return;
};
// now or in the past will be rejected.
REQUIRE_FALSE(s.schedule_at(func(), std::chrono::steady_clock::now()));
REQUIRE_FALSE(s.schedule_at(func(), std::chrono::steady_clock::now() - std::chrono::seconds{1}));
auto start = std::chrono::steady_clock::now();
s.schedule_at(func(), std::chrono::steady_clock::now() + wait_for);
s.shutdown();
auto stop = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(stop - start);
REQUIRE(counter == 1);
REQUIRE(duration >= (wait_for - epsilon));
}
TEST_CASE("io_scheduler trigger growth of internal tasks storage") TEST_CASE("io_scheduler trigger growth of internal tasks storage")
{ {
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
@ -590,3 +719,65 @@ TEST_CASE("io_scheduler schedule vector<task>")
REQUIRE(s.empty()); REQUIRE(s.empty());
REQUIRE(counter == 4); REQUIRE(counter == 4);
} }
TEST_CASE("io_scheduler yield()")
{
std::atomic<uint64_t> counter{0};
coro::io_scheduler s{};
// This task will check the counter and yield if it isn't 5.
auto make_wait_task = [&]() -> coro::task<void> {
while (counter.load(std::memory_order::relaxed) < 5)
{
std::cerr << "count = " << counter.load(std::memory_order::relaxed) << "\n";
co_await s.yield();
}
co_return;
};
// This task will increment counter by 1 and yield after each increment.
auto make_inc_task = [&]() -> coro::task<void> {
while (counter.load(std::memory_order::relaxed) < 5)
{
std::cerr << "increment!\n";
counter.fetch_add(1, std::memory_order::relaxed);
co_await s.yield();
}
co_return;
};
s.schedule(make_wait_task());
s.schedule(make_inc_task());
s.shutdown();
REQUIRE(counter == 5);
}
TEST_CASE("io_scheduler multiple timed waits")
{
std::atomic<uint64_t> counter{0};
coro::io_scheduler s{};
auto start_point = std::chrono::steady_clock::now();
auto make_task = [&]() -> coro::task<void> {
auto now = std::chrono::steady_clock::now();
auto epoch = std::chrono::duration_cast<std::chrono::milliseconds>(now - start_point);
std::cerr << "task counter = " << counter.load(std::memory_order::relaxed) << " elapsed = " << epoch.count()
<< "\n";
counter.fetch_add(1, std::memory_order::relaxed);
co_return;
};
auto start = std::chrono::steady_clock::now();
s.schedule_after(make_task(), std::chrono::milliseconds{5});
s.schedule_after(make_task(), std::chrono::milliseconds{5});
s.schedule_after(make_task(), std::chrono::milliseconds{20});
s.schedule_after(make_task(), std::chrono::milliseconds{50});
s.schedule_after(make_task(), std::chrono::milliseconds{50});
s.shutdown();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start);
REQUIRE(counter == 5);
REQUIRE(elapsed.count() >= 50);
}