1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

io_scheduler uses thread pool to schedule work (#42)

* io_scheduler uses thread pool to schedule work

fixes #41

* use task_container in bench tcp server test

* adjust benchmark for github actions CI

* fix io_scheduler tests cross thread memory boundaries

* more memory barriers

* sprinkle some shutdowns in there

* update readme
This commit is contained in:
Josh Baldwin 2021-01-24 19:34:39 -07:00 committed by GitHub
parent 95127e2f6e
commit 80fea9c49a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 1536 additions and 2087 deletions

View file

@ -11,25 +11,26 @@
**libcoro** is meant to provide low level coroutine constructs for building larger applications, the current focus is around high performance networking coroutine support.
## Overview
* C++20 coroutines!
* Modern Safe C++20 API
* Higher level coroutine constructs
** coro::task<T>
** coro::generator<T>
** coro::event
** coro::latch
** coro::mutex
** coro::sync_wait(awaitable)
*** coro::when_all(awaitable...)
* Schedulers
** coro::thread_pool for coroutine cooperative multitasking
** coro::io_scheduler for driving i/o events, uses thread_pool
*** epoll driver implemented
*** io_uring driver planned (will be required for file i/o)
* Coroutine Networking
** coro::net::dns_resolver for async dns, leverages libc-ares
** coro::tcp_client and coro::tcp_server
** coro::udp_peer
* C++20 coroutines!
* Modern Safe C++20 API
* Higher level coroutine constructs
- coro::task<T>
- coro::generator<T>
- coro::event
- coro::latch
- coro::mutex
- coro::sync_wait(awaitable)
- coro::when_all_awaitabe(awaitable...) -> coro::task<T>...
- coro::when_all(awaitable...) -> T... (Future)
* Schedulers
- coro::thread_pool for coroutine cooperative multitasking
- coro::io_scheduler for driving i/o events, uses thread_pool for coroutine execution
- epoll driver
- io_uring driver (Future, will be required for async file i/o)
* Coroutine Networking
- coro::net::dns_resolver for async dns, leverages libc-ares
- coro::net::tcp_client and coro::net::tcp_server
- coro::net::udp_peer
### A note on co_await
Its important to note with coroutines that depending on the construct used _any_ `co_await` has the
@ -104,6 +105,7 @@ This project uses gitsubmodules, to properly checkout this project use:
This project depends on the following projects:
* [libc-ares](https://github.com/c-ares/c-ares) For async DNS resolver.
* [catch2](https://github.com/catchorg/Catch2) For testing.
#### Building
mkdir Release && cd Release
@ -155,13 +157,12 @@ target_link_libraries(${PROJECT_NAME} PUBLIC libcoro)
#### Tests
The tests will automatically be run by github actions on creating a pull request. They can also be ran locally:
# Invoke via cmake:
# Invoke via cmake with all output from the tests displayed to console:
ctest -VV
# Or invoke directly, can pass the name of tests to execute, the framework used is catch2
# catch2 supports '*' wildcards to run multiple tests or comma delimited ',' test names.
# The below will run all tests with "tcp_server" prefix in their test name.
./Debug/test/libcoro_test "tcp_server*"
# Or invoke directly, can pass the name of tests to execute, the framework used is catch2.
# Tests are tagged with their group, below is howt o run all of the coro::net::tcp_server tests:
./Debug/test/libcoro_test "[tcp_server]"
### Support

View file

@ -49,6 +49,7 @@ set(LIBCORO_SOURCE_FILES
inc/coro/poll.hpp
inc/coro/shutdown.hpp
inc/coro/sync_wait.hpp src/sync_wait.cpp
inc/coro/task_container.hpp src/task_container.cpp
inc/coro/task.hpp
inc/coro/thread_pool.hpp src/thread_pool.cpp
inc/coro/when_all.hpp

View file

@ -11,25 +11,26 @@
**libcoro** is meant to provide low level coroutine constructs for building larger applications, the current focus is around high performance networking coroutine support.
## Overview
* C++20 coroutines!
* Modern Safe C++20 API
* Higher level coroutine constructs
** coro::task<T>
** coro::generator<T>
** coro::event
** coro::latch
** coro::mutex
** coro::sync_wait(awaitable)
*** coro::when_all(awaitable...)
* Schedulers
** coro::thread_pool for coroutine cooperative multitasking
** coro::io_scheduler for driving i/o events, uses thread_pool
*** epoll driver implemented
*** io_uring driver planned (will be required for file i/o)
* Coroutine Networking
** coro::net::dns_resolver for async dns, leverages libc-ares
** coro::tcp_client and coro::tcp_server
** coro::udp_peer
* C++20 coroutines!
* Modern Safe C++20 API
* Higher level coroutine constructs
- coro::task<T>
- coro::generator<T>
- coro::event
- coro::latch
- coro::mutex
- coro::sync_wait(awaitable)
- coro::when_all_awaitabe(awaitable...) -> coro::task<T>...
- coro::when_all(awaitable...) -> T... (Future)
* Schedulers
- coro::thread_pool for coroutine cooperative multitasking
- coro::io_scheduler for driving i/o events, uses thread_pool for coroutine execution
- epoll driver
- io_uring driver (Future, will be required for async file i/o)
* Coroutine Networking
- coro::net::dns_resolver for async dns, leverages libc-ares
- coro::net::tcp_client and coro::net::tcp_server
- coro::net::udp_peer
### A note on co_await
Its important to note with coroutines that depending on the construct used _any_ `co_await` has the
@ -178,6 +179,7 @@ This project uses gitsubmodules, to properly checkout this project use:
This project depends on the following projects:
* [libc-ares](https://github.com/c-ares/c-ares) For async DNS resolver.
* [catch2](https://github.com/catchorg/Catch2) For testing.
#### Building
mkdir Release && cd Release
@ -229,13 +231,12 @@ target_link_libraries(${PROJECT_NAME} PUBLIC libcoro)
#### Tests
The tests will automatically be run by github actions on creating a pull request. They can also be ran locally:
# Invoke via cmake:
# Invoke via cmake with all output from the tests displayed to console:
ctest -VV
# Or invoke directly, can pass the name of tests to execute, the framework used is catch2
# catch2 supports '*' wildcards to run multiple tests or comma delimited ',' test names.
# The below will run all tests with "tcp_server" prefix in their test name.
./Debug/test/libcoro_test "tcp_server*"
# Or invoke directly, can pass the name of tests to execute, the framework used is catch2.
# Tests are tagged with their group, below is howt o run all of the coro::net::tcp_server tests:
./Debug/test/libcoro_test "[tcp_server]"
### Support

View file

@ -22,5 +22,6 @@
#include "coro/mutex.hpp"
#include "coro/sync_wait.hpp"
#include "coro/task.hpp"
#include "coro/task_container.hpp"
#include "coro/thread_pool.hpp"
#include "coro/when_all.hpp"

View file

@ -5,6 +5,8 @@
namespace coro
{
class thread_pool;
/**
* Event is a manully triggered thread safe signal that can be co_await()'ed by multiple awaiters.
* Each awaiter should co_await the event and upon the event being set each awaiter will have their
@ -44,10 +46,17 @@ public:
auto is_set() const noexcept -> bool { return m_state.load(std::memory_order_acquire) == this; }
/**
* Sets this event and resumes all awaiters.
* Sets this event and resumes all awaiters. Note that all waiters will be resumed onto this
* thread of execution.
*/
auto set() noexcept -> void;
/**
* Sets this event and resumes all awaiters onto the given thread pool. This will distribute
* the waiters across the thread pools threads.
*/
auto set(coro::thread_pool& tp) noexcept -> void;
struct awaiter
{
/**

View file

@ -1,655 +1,133 @@
#pragma once
#include "coro/concepts/awaitable.hpp"
#include "coro/event.hpp"
#include "coro/net/socket.hpp"
#include "coro/poll.hpp"
#include "coro/shutdown.hpp"
#include "coro/task.hpp"
#include "coro/thread_pool.hpp"
#include <atomic>
#include <coroutine>
#include <list>
#include <chrono>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <span>
#include <thread>
#include <variant>
#include <vector>
#include <cstring>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/socket.h>
#include <sys/timerfd.h>
#include <sys/types.h>
#include <unistd.h>
namespace coro
{
class io_scheduler;
namespace detail
{
class resume_token_base
{
public:
resume_token_base(io_scheduler* s) noexcept;
virtual ~resume_token_base() = default;
resume_token_base(const resume_token_base&) = delete;
resume_token_base(resume_token_base&& other);
auto operator=(const resume_token_base&) -> resume_token_base& = delete;
auto operator=(resume_token_base&& other) -> resume_token_base&;
bool is_set() const noexcept { return m_state.load(std::memory_order::acquire) == this; }
struct awaiter
{
awaiter(const resume_token_base& token) noexcept : m_token(token) {}
auto await_ready() const noexcept -> bool { return m_token.is_set(); }
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool;
auto await_resume() noexcept
{ /* no-op */
}
const resume_token_base& m_token;
std::coroutine_handle<> m_awaiting_coroutine;
awaiter* m_next{nullptr};
};
auto operator co_await() const noexcept -> awaiter { return awaiter{*this}; }
auto reset() noexcept -> void;
protected:
friend struct awaiter;
io_scheduler* m_scheduler{nullptr};
mutable std::atomic<void*> m_state;
};
class poll_info;
} // namespace detail
template<typename return_type>
class resume_token final : public detail::resume_token_base
class io_scheduler : public coro::thread_pool
{
friend io_scheduler;
resume_token() : detail::resume_token_base(nullptr) {}
resume_token(io_scheduler& s) : detail::resume_token_base(&s) {}
friend detail::poll_info;
public:
~resume_token() override = default;
using clock = std::chrono::steady_clock;
using time_point = clock::time_point;
using timed_events = std::multimap<time_point, detail::poll_info*>;
resume_token(const resume_token&) = delete;
resume_token(resume_token&&) = default;
auto operator=(const resume_token&) -> resume_token& = delete;
auto operator=(resume_token&&) -> resume_token& = default;
auto resume(return_type value) noexcept -> void;
auto return_value() const& -> const return_type& { return m_return_value; }
auto return_value() && -> return_type&& { return std::move(m_return_value); }
private:
return_type m_return_value;
};
template<>
class resume_token<void> final : public detail::resume_token_base
{
friend io_scheduler;
resume_token() : detail::resume_token_base(nullptr) {}
resume_token(io_scheduler& s) : detail::resume_token_base(&s) {}
public:
~resume_token() override = default;
resume_token(const resume_token&) = delete;
resume_token(resume_token&&) = default;
auto operator=(const resume_token&) -> resume_token& = delete;
auto operator=(resume_token&&) -> resume_token& = default;
auto resume() noexcept -> void;
};
class io_scheduler
{
public:
using fd_t = int;
private:
using clock = std::chrono::steady_clock;
using time_point = clock::time_point;
using task_variant = std::variant<coro::task<void>, std::coroutine_handle<>>;
using task_queue = std::deque<task_variant>;
using timer_tokens = std::multimap<time_point, resume_token<poll_status>*>;
/// resume_token<T> needs to be able to call internal scheduler::resume()
template<typename return_type>
friend class resume_token;
class task_manager
{
public:
using task_position = std::list<std::size_t>::iterator;
task_manager(const std::size_t reserve_size, const double growth_factor);
/**
* Stores a users task and sets a continuation coroutine to automatically mark the task
* as deleted upon the coroutines completion.
* @param user_task The scheduled user's task to store since it has suspended after its
* first execution.
* @return The task just stored wrapped in the self cleanup task.
*/
auto store(coro::task<void> user_task) -> task<void>&;
/**
* Garbage collects any tasks that are marked as deleted.
* @return The number of tasks that were deleted.
*/
auto gc() -> std::size_t;
/**
* @return The number of tasks that are awaiting deletion.
*/
auto delete_task_size() const -> std::size_t { return m_tasks_to_delete.size(); }
/**
* @return True if there are no tasks awaiting deletion.
*/
auto delete_tasks_empty() const -> bool { return m_tasks_to_delete.empty(); }
/**
* @return The capacity of this task manager before it will need to grow in size.
*/
auto capacity() const -> std::size_t { return m_tasks.size(); }
private:
/**
* Grows each task container by the growth factor.
* @return The position of the free index after growing.
*/
auto grow() -> task_position;
/**
* Encapsulate the users tasks in a cleanup task which marks itself for deletion upon
* completion. Simply co_await the users task until its completed and then mark the given
* position within the task manager as being deletable. The scheduler's next iteration
* in its event loop will then free that position up to be re-used.
*
* This function will also unconditionally catch all unhandled exceptions by the user's
* task to prevent the scheduler from throwing exceptions.
* @param user_task The user's task.
* @param pos The position where the task data will be stored in the task manager.
* @return The user's task wrapped in a self cleanup task.
*/
auto make_cleanup_task(task<void> user_task, task_position pos) -> task<void>;
/// Maintains the lifetime of the tasks until they are completed.
std::vector<task<void>> m_tasks{};
/// The full set of indexes into `m_tasks`.
std::list<std::size_t> m_task_indexes{};
/// The set of tasks that have completed and need to be deleted.
std::vector<task_position> m_tasks_to_delete{};
/// The current free position within the task indexes list. Anything before
/// this point is used, itself and anything after is free.
task_position m_free_pos{};
/// The amount to grow the containers by when all spaces are taken.
double m_growth_factor{};
};
static constexpr const int m_accept_object{0};
static constexpr const void* m_accept_ptr = &m_accept_object;
static constexpr const int m_timer_object{0};
static constexpr const void* m_timer_ptr = &m_timer_object;
/**
* An operation is an awaitable type with a coroutine to resume the task scheduled on one of
* the executor threads.
*/
class operation
{
friend class io_scheduler;
/**
* Only io_schedulers can create operations when a task is being scheduled.
* @param tp The io scheduler that created this operation.
*/
explicit operation(io_scheduler& ios) noexcept : m_io_scheduler(ios) {}
public:
/**
* Operations always pause so the executing thread and be switched.
*/
auto await_ready() noexcept -> bool { return false; }
/**
* Suspending always returns to the caller (using void return of await_suspend()) and
* stores the coroutine internally for the executing thread to resume from.
*/
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
{
// m_awaiting_coroutine = awaiting_coroutine;
m_io_scheduler.resume(awaiting_coroutine);
}
/**
* no-op as this is the function called first by the io_scheduler's executing thread.
*/
auto await_resume() noexcept -> void {}
private:
/// The io_scheduler that this operation will execute on.
io_scheduler& m_io_scheduler;
};
/**
* Schedules the currently executing task onto this io_scheduler, effectively placing it at
* the end of the FIFO queue.
* `co_await s.yield()`
*/
auto schedule() -> operation { return operation{*this}; }
public:
enum class thread_strategy_t
{
/// Spawns a background thread for the scheduler to run on.
spawn,
/// Adopts this thread as the scheduler thread.
adopt,
/// Requires the user to call process_events() to drive the scheduler
manual
};
struct options
{
/// The number of tasks to reserve space for upon creating the scheduler.
std::size_t reserve_size{8};
/// The growth factor for task space when capacity is full.
double growth_factor{2};
/// The threading strategy.
thread_strategy_t thread_strategy{thread_strategy_t::spawn};
thread_strategy_t thread_strategy{thread_strategy_t::spawn};
std::function<void()> on_io_thread_start_functor{nullptr};
std::function<void()> on_io_thread_stop_functor{nullptr};
thread_pool::options pool{
.thread_count = ((std::thread::hardware_concurrency() > 1) ? (std::thread::hardware_concurrency() - 1) : 1),
.on_thread_start_functor = nullptr,
.on_thread_stop_functor = nullptr};
};
/**
* @param options Various scheduler options to tune how it behaves.
*/
io_scheduler(const options opts = options{8, 2, thread_strategy_t::spawn});
explicit io_scheduler(
options opts = options{
.thread_strategy = thread_strategy_t::spawn,
.on_io_thread_start_functor = nullptr,
.on_io_thread_stop_functor = nullptr,
.pool = {
.thread_count =
((std::thread::hardware_concurrency() > 1) ? (std::thread::hardware_concurrency() - 1) : 1),
.on_thread_start_functor = nullptr,
.on_thread_stop_functor = nullptr}});
io_scheduler(const io_scheduler&) = delete;
io_scheduler(io_scheduler&&) = delete;
auto operator=(const io_scheduler&) -> io_scheduler& = delete;
auto operator=(io_scheduler&&) -> io_scheduler& = delete;
virtual ~io_scheduler();
virtual ~io_scheduler() override;
/**
* Schedules a task to be run as soon as possible. This pushes the task into a FIFO queue.
* @param task The task to schedule as soon as possible.
* @return True if the task has been scheduled, false if scheduling failed. Currently the only
* way for this to fail is if the scheduler is trying to shutdown.
*/
auto schedule(coro::task<void> task) -> bool;
auto schedule(std::vector<task<void>> tasks) -> bool;
template<concepts::awaitable_void... tasks_type>
auto schedule(tasks_type&&... tasks) -> bool
{
if (is_shutdown())
{
return false;
}
m_size.fetch_add(sizeof...(tasks), std::memory_order::relaxed);
{
std::lock_guard<std::mutex> lk{m_accept_mutex};
((m_accept_queue.emplace_back(std::forward<tasks_type>(tasks))), ...);
}
bool expected{false};
if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
{
uint64_t value{1};
::write(m_accept_fd, &value, sizeof(value));
}
return true;
}
/**
* Schedules a task to be run after waiting for a certain period of time.
* @param task The task to schedule after waiting `after` amount of time.
* @return True if the task has been scheduled, false if scheduling failed. Currently the only
* way for this to fail is if the scheduler is trying to shutdown.
*/
auto schedule_after(coro::task<void> task, std::chrono::milliseconds after) -> bool;
/**
* Schedules a task to be run at a specific time in the future.
* @param task
* @param time
* @return True if the task is scheduled. False if time is in the past or the scheduler is
* trying to shutdown.
*/
auto schedule_at(coro::task<void> task, time_point time) -> bool;
/**
* Polls a specific file descriptor for the given poll operation.
* @param fd The file descriptor to poll.
* @param op The type of poll operation to perform.
* @param timeout The timeout for this poll operation, if timeout <= 0 then poll will block
* indefinitely until the event is triggered.
*/
auto poll(fd_t fd, poll_op op, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<poll_status>;
auto poll(const net::socket& sock, poll_op op, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<poll_status>;
/**
* This function will first poll the given `fd` to make sure it can be read from. Once notified
* that the `fd` has data available to read the given `buffer` is filled with up to the buffer's
* size of data. The number of bytes read is returned.
* @param fd The file desriptor to read from.
* @param buffer The buffer to place read bytes into.
* @param timeout The timeout for the read operation, if timeout <= 0 then read will block
* indefinitely until the event is triggered.
* @return The number of bytes read or an error code if negative.
*/
auto read(fd_t fd, std::span<char> buffer, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<std::pair<poll_status, ssize_t>>;
auto read(
const net::socket& sock,
std::span<char> buffer,
std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<std::pair<poll_status, ssize_t>>;
/**
* This function will first poll the given `fd` to make sure it can be written to. Once notified
* that the `fd` can be written to the given buffer's contents are written to the `fd`.
* @param fd The file descriptor to write the contents of `buffer` to.
* @param buffer The data to write to `fd`.
* @param timeout The timeout for the write operation, if timeout <= 0 then write will block
* indefinitely until the event is triggered.
* @return The number of bytes written or an error code if negative.
*/
auto write(
fd_t fd, const std::span<const char> buffer, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<std::pair<poll_status, ssize_t>>;
auto write(
const net::socket& sock,
const std::span<const char> buffer,
std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<std::pair<poll_status, ssize_t>>;
/**
* Immediately yields the current task and places it at the end of the queue of tasks waiting
* to be processed. This will immediately be picked up again once it naturally goes through the
* FIFO task queue. This function is useful to yielding long processing tasks to let other tasks
* get processing time.
*/
auto yield() -> coro::task<void>;
/**
* Immediately yields the current task and provides a resume token to resume this yielded
* coroutine when the async operation has completed.
*
* Normal usage of this might look like:
* \code
scheduler.yield([](coro::resume_token<void>& t) {
auto on_service_complete = [&]() {
t.resume(); // This resume call will resume the task on the scheduler thread.
};
service.do_work(on_service_complete);
});
* \endcode
* The above example will yield the current task and then through the 3rd party service's
* on complete callback function let the scheduler know that it should resume execution of the task.
*
* @tparam func Functor to invoke with the yielded coroutine handle to be resumed.
* @param f Immediately invoked functor with the yield point coroutine handle to resume with.
* @return A task to co_await until the manual `scheduler::resume(handle)` is called.
*/
template<typename return_type, std::invocable<resume_token<return_type>&> before_functor>
auto yield(before_functor before) -> coro::task<return_type>
{
resume_token<return_type> token{*this};
before(token);
co_await token;
if constexpr (std::is_same_v<return_type, void>)
{
co_return;
}
else
{
co_return token.return_value();
}
}
/**
* User provided resume token to yield the current coroutine until the token's resume is called.
* Its also possible to co_await a resume token inline without calling this yield function.
* @param token Resume token to await the current task on. Use scheduer::make_resume_token<T>() to
* generate a resume token to use on this scheduer.
*/
template<typename return_type>
auto yield(resume_token<return_type>& token) -> coro::task<void>
{
co_await token;
co_return;
}
/**
* Yields the current coroutine for `amount` of time.
* @param amount The amount of time to wait.
*/
auto yield_for(std::chrono::milliseconds amount) -> coro::task<void>;
/**
* Yields the current coroutine until `time`. If time is in the past this function will
* return immediately.
* @param time The time point in the future to yield until.
*/
auto yield_until(time_point time) -> coro::task<void>;
/**
* Makes a resume token that can be used to resume a suspended task from any thread. On resume
* the task will resume execution on this scheduler thread.
* @tparam The return type of resuming the async operation.
* @return Resume token with the given return type.
*/
template<typename return_type = void>
auto make_resume_token() -> resume_token<return_type>
{
return resume_token<return_type>(*this);
}
/**
* If running in mode thread_strategy_t::manual this function must be called at regular
* intervals to process events on the io_scheduler. This function will do nothing in any
* other thread_strategy_t mode.
* @param timeout The timeout to wait for events, use zero (default) to process only events
* that are currently ready.
* @return The number of executing tasks.
*/
auto process_events(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> std::size_t;
/**
* @return The number of active tasks still executing and unprocessed submitted tasks.
*/
auto size() const -> std::size_t { return m_size.load(std::memory_order::relaxed); }
[[nodiscard]] auto schedule_after(std::chrono::milliseconds amount) -> coro::task<void>;
[[nodiscard]] auto schedule_at(time_point time) -> coro::task<void>;
/**
* @return True if there are no tasks executing or waiting to be executed in this scheduler.
*/
auto empty() const -> bool { return size() == 0; }
[[nodiscard]] auto yield_for(std::chrono::milliseconds amount) -> coro::task<void>;
[[nodiscard]] auto yield_until(time_point time) -> coro::task<void>;
/**
* @return The maximum number of tasks this scheduler can process without growing.
*/
auto capacity() const -> std::size_t { return m_task_manager.capacity(); }
[[nodiscard]] auto poll(fd_t fd, coro::poll_op op, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<poll_status>;
/**
* Is there a thread processing this schedulers events?
* If this is in thread strategy spawn or adopt this will always be true until shutdown.
*/
auto is_running() const noexcept -> bool { return m_running.load(std::memory_order::relaxed); }
[[nodiscard]] auto poll(
const net::socket& sock, coro::poll_op op, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<poll_status>
{
return poll(sock.native_handle(), op, timeout);
}
/**
* @return True if this scheduler has been requested to shutdown.
*/
auto is_shutdown() const noexcept -> bool { return m_shutdown_requested.load(std::memory_order::relaxed); }
/**
* Requests the scheduler to finish processing all of its current tasks and shutdown.
* New tasks submitted via `scheduler::schedule()` will be rejected after this is called.
* @param wait_for_tasks This call will block until all tasks are complete if shutdown_t::sync
* is passed in, if shutdown_t::async is passed this function will tell
* the scheduler to shutdown but not wait for all tasks to complete, it returns
* immediately.
*/
virtual auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) -> void;
auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) noexcept -> void override;
private:
/// The configuration options.
options m_opts;
/// The event loop epoll file descriptor.
fd_t m_epoll_fd{-1};
/// The event loop accept new tasks and resume tasks file descriptor.
fd_t m_accept_fd{-1};
/// The event loop fd to trigger a shutdown.
fd_t m_shutdown_fd{-1};
/// The event loop timer fd for timed events, e.g. yield_for() or scheduler_after().
fd_t m_timer_fd{-1};
/// The map of time point's to resume tokens for tasks that are yielding for a period of time
/// The background io worker threads.
std::thread m_io_thread;
std::mutex m_timed_events_mutex{};
/// The map of time point's to poll infos for tasks that are yielding for a period of time
/// or for tasks that are polling with timeouts.
timer_tokens m_timer_tokens;
timed_events m_timed_events{};
/// The threading strategy this scheduler is using.
thread_strategy_t m_thread_strategy;
/// Is this scheduler currently running? Manual mode might not always be running.
std::atomic<bool> m_running{false};
/// Has the scheduler been requested to shutdown?
std::atomic<bool> m_shutdown_requested{false};
/// If running in threading mode spawn the background thread to process events.
std::thread m_scheduler_thread;
std::atomic<bool> m_io_processing{false};
auto process_events_manual(std::chrono::milliseconds timeout) -> void;
auto process_events_dedicated_thread() -> void;
auto process_events_execute(std::chrono::milliseconds timeout) -> void;
static auto event_to_poll_status(uint32_t events) -> poll_status;
/// FIFO queue for new and resumed tasks to execute.
task_queue m_accept_queue{};
std::mutex m_accept_mutex{};
auto process_event_execute(detail::poll_info* pi, poll_status status) -> void;
auto process_timeout_execute() -> void;
/// Has a thread sent an event? (E.g. avoid a kernel write/read?).
std::atomic<bool> m_event_set{false};
auto add_timer_token(time_point tp, detail::poll_info& pi) -> timed_events::iterator;
auto remove_timer_token(timed_events::iterator pos) -> void;
auto update_timeout(time_point now) -> void;
/// The total number of tasks that are being processed or suspended.
std::atomic<std::size_t> m_size{0};
static constexpr const int m_shutdown_object{0};
static constexpr const void* m_shutdown_ptr = &m_shutdown_object;
/// The maximum number of tasks to process inline before polling for more tasks.
static constexpr const std::size_t task_inline_process_amount{64};
/// Pre-allocated memory area for tasks to process.
std::array<task_variant, task_inline_process_amount> m_processing_tasks;
task_manager m_task_manager;
auto make_scheduler_after_task(coro::task<void> task, std::chrono::milliseconds wait_time) -> coro::task<void>;
template<typename return_type>
auto unsafe_yield(resume_token<return_type>& token) -> coro::task<return_type>
{
co_await token;
if constexpr (std::is_same_v<return_type, void>)
{
co_return;
}
else
{
co_return token.return_value();
}
}
auto add_timer_token(time_point tp, resume_token<poll_status>* token_ptr) -> timer_tokens::iterator;
auto remove_timer_token(timer_tokens::iterator pos) -> void;
auto resume(std::coroutine_handle<> handle) -> void;
static constexpr const int m_timer_object{0};
static constexpr const void* m_timer_ptr = &m_timer_object;
static const constexpr std::chrono::milliseconds m_default_timeout{1000};
static const constexpr std::chrono::milliseconds m_no_timeout{0};
static const constexpr std::size_t m_max_events = 8;
std::array<struct epoll_event, m_max_events> m_events{};
auto process_task_and_start(task<void>& task) -> void;
auto process_task_variant(task_variant& tv) -> void;
auto process_task_queue() -> void;
auto process_events_poll_execute(std::chrono::milliseconds user_timeout) -> void;
auto event_to_poll_status(uint32_t events) -> poll_status;
auto process_events_external_thread(std::chrono::milliseconds user_timeout) -> void;
auto process_events_dedicated_thread() -> void;
auto update_timeout(time_point now) -> void;
};
template<typename return_type>
inline auto resume_token<return_type>::resume(return_type value) noexcept -> void
{
void* old_value = m_state.exchange(this, std::memory_order::acq_rel);
if (old_value != this)
{
m_return_value = std::move(value);
auto* waiters = static_cast<awaiter*>(old_value);
while (waiters != nullptr)
{
// Intentionally not checking if this is running on the scheduler process event thread
// as it can create a stack overflow if it triggers a 'resume chain'. unsafe_yield()
// is guaranteed in this context to never be recursive and thus resuming directly
// on the process event thread should not be able to trigger a stack overflow.
auto* next = waiters->m_next;
// If scheduler is nullptr this is an unsafe_yield()
// If scheduler is present this is a yield()
if (m_scheduler == nullptr) // || m_scheduler->this_thread_is_processing_events())
{
waiters->m_awaiting_coroutine.resume();
}
else
{
m_scheduler->resume(waiters->m_awaiting_coroutine);
}
waiters = next;
}
}
}
inline auto resume_token<void>::resume() noexcept -> void
{
void* old_value = m_state.exchange(this, std::memory_order::acq_rel);
if (old_value != this)
{
auto* waiters = static_cast<awaiter*>(old_value);
while (waiters != nullptr)
{
auto* next = waiters->m_next;
if (m_scheduler == nullptr)
{
waiters->m_awaiting_coroutine.resume();
}
else
{
m_scheduler->resume(waiters->m_awaiting_coroutine);
}
waiters = next;
}
}
}
} // namespace coro

View file

@ -6,9 +6,23 @@
namespace coro
{
/**
* The latch is thread safe counter to wait for 1 or more other tasks to complete, they signal their
* completion by calling `count_down()` on the latch and upon the latch counter reaching zero the
* coroutine `co_await`ing the latch then resumes execution.
*
* This is useful for spawning many worker tasks to complete either a computationally complex task
* across a thread pool of workers, or waiting for many asynchronous results like http requests
* to complete.
*/
class latch
{
public:
/**
* Creates a latch with the given count of tasks to wait to complete.
* @param count The number of tasks to wait to complete, if this is zero or negative then the
* latch starts 'completed' immediately and execution is resumed with no suspension.
*/
latch(std::ptrdiff_t count) noexcept : m_count(count), m_event(count <= 0) {}
latch(const latch&) = delete;
@ -16,10 +30,19 @@ public:
auto operator=(const latch&) -> latch& = delete;
auto operator=(latch&&) -> latch& = delete;
/**
* @return True if the latch has been counted down to zero.
*/
auto is_ready() const noexcept -> bool { return m_event.is_set(); }
/**
* @return The number of tasks this latch is still waiting to complete.
*/
auto remaining() const noexcept -> std::size_t { return m_count.load(std::memory_order::acquire); }
/**
* @param n The number of tasks to complete towards the latch, defaults to 1.
*/
auto count_down(std::ptrdiff_t n = 1) noexcept -> void
{
if (m_count.fetch_sub(n, std::memory_order::acq_rel) <= n)
@ -31,8 +54,11 @@ public:
auto operator co_await() const noexcept -> event::awaiter { return m_event.operator co_await(); }
private:
/// The number of tasks to wait for completion before triggering the event to resume.
std::atomic<std::ptrdiff_t> m_count;
event m_event;
/// The event to trigger when the latch counter reaches zero, this resume the coroutine that
/// is co_await'ing on the latch.
event m_event;
};
} // namespace coro

View file

@ -4,6 +4,7 @@
#include "coro/net/hostname.hpp"
#include "coro/net/ip_address.hpp"
#include "coro/task.hpp"
#include "coro/task_container.hpp"
#include <ares.h>
@ -31,7 +32,7 @@ class dns_result
friend dns_resolver;
public:
explicit dns_result(coro::resume_token<void>& token, uint64_t pending_dns_requests);
dns_result(coro::io_scheduler& scheduler, coro::event& resume, uint64_t pending_dns_requests);
~dns_result() = default;
/**
@ -46,7 +47,8 @@ public:
auto ip_addresses() const -> const std::vector<coro::net::ip_address>& { return m_ip_addresses; }
private:
coro::resume_token<void>& m_token;
coro::io_scheduler& m_io_scheduler;
coro::event& m_resume;
uint64_t m_pending_dns_requests{0};
dns_status m_status{dns_status::complete};
std::vector<coro::net::ip_address> m_ip_addresses{};
@ -71,7 +73,7 @@ public:
private:
/// The io scheduler to drive the events for dns lookups.
io_scheduler& m_scheduler;
io_scheduler& m_io_scheduler;
/// The global timeout per dns lookup request.
std::chrono::milliseconds m_timeout{0};
@ -83,6 +85,8 @@ private:
/// are not setup when ares_poll() is called.
std::unordered_set<io_scheduler::fd_t> m_active_sockets{};
task_container m_task_container{};
/// Global count to track if c-ares has been initialized or cleaned up.
static uint64_t m_ares_count;
/// Critical section around the c-ares global init/cleanup to prevent heap corruption.

View file

@ -17,11 +17,6 @@
#include <variant>
#include <vector>
namespace coro
{
class io_scheduler;
} // namespace coro
namespace coro::net
{
class tcp_server;
@ -80,7 +75,7 @@ public:
auto poll(coro::poll_op op, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<poll_status>
{
co_return co_await m_io_scheduler.poll(m_socket, op, timeout);
return m_io_scheduler.poll(m_socket, op, timeout);
}
/**

View file

@ -12,7 +12,7 @@
namespace coro::net
{
class tcp_server : public io_scheduler
class tcp_server
{
public:
struct options
@ -41,7 +41,10 @@ public:
* @return The result of the poll, 'event' means the poll was successful and there is at least 1
* connection ready to be accepted.
*/
auto poll(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task<coro::poll_status>;
auto poll(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task<coro::poll_status>
{
return m_io_scheduler.poll(m_accept_socket, coro::poll_op::read, timeout);
}
/**
* Accepts an incoming tcp client connection. On failure the tcp clients socket will be set to

140
inc/coro/task_container.hpp Normal file
View file

@ -0,0 +1,140 @@
#pragma once
#include "coro/task.hpp"
#include <atomic>
#include <list>
#include <mutex>
#include <vector>
namespace coro
{
class task_container
{
public:
using task_position = std::list<std::size_t>::iterator;
struct options
{
/// The number of task spots to reserve space for upon creating the container.
std::size_t reserve_size{8};
/// The growth factor for task space in the container when capacity is full.
double growth_factor{2};
};
explicit task_container(const options opts = options{.reserve_size = 8, .growth_factor = 2});
task_container(const task_container&) = delete;
task_container(task_container&&) = delete;
auto operator=(const task_container&) -> task_container& = delete;
auto operator=(task_container&&) -> task_container& = delete;
~task_container();
enum class garbage_collect
{
/// Execute garbage collection.
yes,
/// Do not execute garbage collection.
no
};
/**
* Stores a users task and sets a continuation coroutine to automatically mark the task
* as deleted upon the coroutines completion.
* @param user_task The scheduled user's task to store since it has suspended after its
* first execution.
* @param cleanup Should the task container run garbage collect at the beginning of this store
* call? Calling at regular intervals will reduce memory usage of completed
* tasks and allow for the task container to re-use allocated space.
* @return The task just stored wrapped in the self cleanup task.
*/
auto store(coro::task<void> user_task, garbage_collect cleanup = garbage_collect::yes) -> coro::task<void>&;
/**
* Garbage collects any tasks that are marked as deleted. This frees up space to be re-used by
* the task container for newly stored tasks.
* @return The number of tasks that were deleted.
*/
auto gc() -> std::size_t;
/**
* @return The number of tasks that are awaiting deletion.
*/
auto delete_task_size() const -> std::size_t
{
std::atomic_thread_fence(std::memory_order::acquire);
return m_tasks_to_delete.size();
}
/**
* @return True if there are no tasks awaiting deletion.
*/
auto delete_tasks_empty() const -> bool
{
std::atomic_thread_fence(std::memory_order::acquire);
return m_tasks_to_delete.empty();
}
/**
* @return The number of active tasks in the container.
*/
auto size() const -> std::size_t { return m_size.load(std::memory_order::relaxed); }
/**
* @return True if there are no active tasks in the container.
*/
auto empty() const -> bool { return size() == 0; }
/**
* @return The capacity of this task manager before it will need to grow in size.
*/
auto capacity() const -> std::size_t
{
std::atomic_thread_fence(std::memory_order::acquire);
return m_tasks.size();
}
private:
/**
* Grows each task container by the growth factor.
* @return The position of the free index after growing.
*/
auto grow() -> task_position;
/**
* Interal GC call, expects the public function to lock.
*/
auto gc_internal() -> std::size_t;
/**
* Encapsulate the users tasks in a cleanup task which marks itself for deletion upon
* completion. Simply co_await the users task until its completed and then mark the given
* position within the task manager as being deletable. The scheduler's next iteration
* in its event loop will then free that position up to be re-used.
*
* This function will also unconditionally catch all unhandled exceptions by the user's
* task to prevent the scheduler from throwing exceptions.
* @param user_task The user's task.
* @param pos The position where the task data will be stored in the task manager.
* @return The user's task wrapped in a self cleanup task.
*/
auto make_cleanup_task(task<void> user_task, task_position pos) -> coro::task<void>;
/// Mutex for safely mutating the task containers across threads, expected usage is within
/// thread pools for indeterminate lifetime requests.
std::mutex m_mutex{};
/// The number of alive tasks.
std::atomic<std::size_t> m_size{};
/// Maintains the lifetime of the tasks until they are completed.
std::vector<task<void>> m_tasks{};
/// The full set of indexes into `m_tasks`.
std::list<std::size_t> m_task_indexes{};
/// The set of tasks that have completed and need to be deleted.
std::vector<task_position> m_tasks_to_delete{};
/// The current free position within the task indexes list. Anything before
/// this point is used, itself and anything after is free.
task_position m_free_pos{};
/// The amount to grow the containers by when all spaces are taken.
double m_growth_factor{};
};
} // namespace coro

View file

@ -1,5 +1,6 @@
#pragma once
#include "coro/event.hpp"
#include "coro/shutdown.hpp"
#include "coro/task.hpp"
@ -11,10 +12,13 @@
#include <mutex>
#include <optional>
#include <thread>
#include <variant>
#include <vector>
namespace coro
{
class event;
/**
* Creates a thread pool that executes arbitrary coroutine tasks in a FIFO scheduler policy.
* The thread pool by default will create an execution thread per available core on the system.
@ -41,7 +45,7 @@ public:
public:
/**
* Operations always pause so the executing thread and be switched.
* Operations always pause so the executing thread can be switched.
*/
auto await_ready() noexcept -> bool { return false; }
@ -68,34 +72,43 @@ public:
/// The number of executor threads for this thread pool. Uses the hardware concurrency
/// value by default.
uint32_t thread_count = std::thread::hardware_concurrency();
/// Functor to call on each executor thread upon starting execution.
/// Functor to call on each executor thread upon starting execution. The parameter is the
/// thread's ID assigned to it by the thread pool.
std::function<void(std::size_t)> on_thread_start_functor = nullptr;
/// Functor to call on each executor thread upon stopping execution.
/// Functor to call on each executor thread upon stopping execution. The parameter is the
/// thread's ID assigned to it by the thread pool.
std::function<void(std::size_t)> on_thread_stop_functor = nullptr;
};
/**
* @param opts Thread pool configuration options.
*/
explicit thread_pool(options opts = options{std::thread::hardware_concurrency(), nullptr, nullptr});
explicit thread_pool(
options opts = options{
.thread_count = std::thread::hardware_concurrency(),
.on_thread_start_functor = nullptr,
.on_thread_stop_functor = nullptr});
thread_pool(const thread_pool&) = delete;
thread_pool(thread_pool&&) = delete;
auto operator=(const thread_pool&) -> thread_pool& = delete;
auto operator=(thread_pool&&) -> thread_pool& = delete;
~thread_pool();
virtual ~thread_pool();
/**
* @return The number of executor threads for processing tasks.
*/
auto thread_count() const noexcept -> uint32_t { return m_threads.size(); }
/**
* Schedules the currently executing coroutine to be run on this thread pool. This must be
* called from within the coroutines function body to schedule the coroutine on the thread pool.
* @throw std::runtime_error If the thread pool is `shutdown()` scheduling new tasks is not permitted.
* @return The operation to switch from the calling scheduling thread to the executor thread
* pool thread. This will return nullopt if the schedule fails, currently the only
* way for this to fail is if `shudown()` has been called.
* pool thread.
*/
[[nodiscard]] auto schedule() noexcept -> std::optional<operation>;
[[nodiscard]] auto schedule() -> operation;
/**
* @throw std::runtime_error If the thread pool is `shutdown()` scheduling new tasks is not permitted.
@ -106,13 +119,7 @@ public:
template<typename functor, typename... arguments>
[[nodiscard]] auto schedule(functor&& f, arguments... args) -> task<decltype(f(std::forward<arguments>(args)...))>
{
auto scheduled = schedule();
if (!scheduled.has_value())
{
throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
}
co_await scheduled.value();
co_await schedule();
if constexpr (std::is_same_v<void, decltype(f(std::forward<arguments>(args)...))>)
{
@ -125,18 +132,26 @@ public:
}
}
/**
* Immediately yields the current task and places it at the end of the queue of tasks waiting
* to be processed. This will immediately be picked up again once it naturally goes through the
* FIFO task queue. This function is useful to yielding long processing tasks to let other tasks
* get processing time.
*/
[[nodiscard]] auto yield() -> operation { return schedule(); }
/**
* Shutsdown the thread pool. This will finish any tasks scheduled prior to calling this
* function but will prevent the thread pool from scheduling any new tasks.
* @param wait_for_tasks Should this function block until all remaining scheduled tasks have
* completed? Pass in sync to wait, or async to not block.
*/
auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) noexcept -> void;
virtual auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) noexcept -> void;
/**
* @return The number of tasks waiting in the task queue + the executing tasks.
*/
auto size() const noexcept -> std::size_t { return m_size.load(std::memory_order::relaxed); }
auto size() const noexcept -> std::size_t { return m_size.load(std::memory_order::acquire); }
/**
* @return True if the task queue is empty and zero tasks are currently executing.
@ -162,9 +177,11 @@ private:
/// The configuration options.
options m_opts;
protected:
/// Has the thread pool been requested to shut down?
std::atomic<bool> m_shutdown_requested{false};
private:
/// The background executor threads.
std::vector<std::jthread> m_threads;
@ -172,15 +189,14 @@ private:
std::mutex m_wait_mutex;
/// Condition variable for each executor thread to wait on when no tasks are available.
std::condition_variable_any m_wait_cv;
/// Mutex to guard the queue of FIFO tasks.
std::mutex m_queue_mutex;
/// FIFO queue of tasks waiting to be executed.
std::deque<operation*> m_queue;
std::deque<std::coroutine_handle<>> m_queue;
protected:
/// The number of tasks in the queue + currently executing.
std::atomic<std::size_t> m_size{0};
private:
/**
* Each background thread runs from this function.
* @param stop_token Token which signals when shutdown() has been called.
@ -189,9 +205,20 @@ private:
auto executor(std::stop_token stop_token, std::size_t idx) -> void;
/**
* @param op Schedules the given operation to be executed upon the first available thread.
* @param handle Schedules the given coroutine to be executed upon the first available thread.
*/
auto schedule_impl(operation* op) noexcept -> void;
auto schedule_impl(std::coroutine_handle<> handle) noexcept -> void;
protected:
/// Required to resume all waiters of the event onto a thread_pool.
friend event;
/**
* Schedules any coroutine that is ready to be resumed.
* @param handle The coroutine handle to schedule.
*/
auto resume(std::coroutine_handle<> handle) noexcept -> void;
auto resume(const std::vector<std::coroutine_handle<>>& handles) noexcept -> void;
};
} // namespace coro

View file

@ -1,4 +1,5 @@
#include "coro/event.hpp"
#include "coro/thread_pool.hpp"
namespace coro
{
@ -23,6 +24,23 @@ auto event::set() noexcept -> void
}
}
auto event::set(coro::thread_pool& tp) noexcept -> void
{
// Exchange the state to this, if the state was previously not this, then traverse the list
// of awaiters and resume their coroutines.
void* old_value = m_state.exchange(this, std::memory_order::acq_rel);
if (old_value != this)
{
auto* waiters = static_cast<awaiter*>(old_value);
while (waiters != nullptr)
{
auto* next = waiters->m_next;
tp.resume(waiters->m_awaiting_coroutine);
waiters = next;
}
}
}
auto event::awaiter::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
{
const void* const set_state = &m_event;

View file

@ -1,171 +1,98 @@
#include "coro/io_scheduler.hpp"
#include <iostream>
#include <atomic>
#include <cstring>
#include <optional>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/socket.h>
#include <sys/timerfd.h>
#include <sys/types.h>
#include <unistd.h>
using namespace std::chrono_literals;
namespace coro
{
detail::resume_token_base::resume_token_base(io_scheduler* s) noexcept : m_scheduler(s), m_state(nullptr)
namespace detail
{
}
detail::resume_token_base::resume_token_base(resume_token_base&& other)
/**
* Poll Info encapsulates everything about a poll operation for the event as well as its paired
* timeout. This is important since coroutines that are waiting on an event or timeout do not
* immediately execute, they are re-scheduled onto the thread pool, so its possible its pair
* event or timeout also triggers while the coroutine is still waiting to resume. This means that
* the first one to happen, the event itself or its timeout, needs to disable the other pair item
* prior to resuming the coroutine.
*
* Finally, its also important to note that the event and its paired timeout could happen during
* the same epoll_wait and possibly trigger the coroutine to start twice. Only one can win, so the
* first one processed sets m_processed to true and any subsequent events in the same epoll batch
* are effectively discarded.
*/
struct poll_info
{
m_scheduler = other.m_scheduler;
m_state = other.m_state.exchange(nullptr);
poll_info() = default;
~poll_info() = default;
other.m_scheduler = nullptr;
}
poll_info(const poll_info&) = delete;
poll_info(poll_info&&) = delete;
auto operator=(const poll_info&) -> poll_info& = delete;
auto operator=(poll_info&&) -> poll_info& = delete;
auto detail::resume_token_base::awaiter::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
{
const void* const set_state = &m_token;
m_awaiting_coroutine = awaiting_coroutine;
// This value will update if other threads write to it via acquire.
void* old_value = m_token.m_state.load(std::memory_order::acquire);
do
struct poll_awaiter
{
// Resume immediately if already in the set state.
if (old_value == set_state)
explicit poll_awaiter(poll_info& pi) noexcept : m_pi(pi) {}
auto await_ready() const noexcept -> bool { return false; }
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
{
return false;
m_pi.m_awaiting_coroutine = awaiting_coroutine;
std::atomic_thread_fence(std::memory_order::release);
}
auto await_resume() noexcept -> coro::poll_status { return m_pi.m_poll_status; }
m_next = static_cast<awaiter*>(old_value);
} while (!m_token.m_state.compare_exchange_weak(
old_value, this, std::memory_order::release, std::memory_order::acquire));
poll_info& m_pi;
};
return true;
}
auto operator co_await() noexcept -> poll_awaiter { return poll_awaiter{*this}; }
auto detail::resume_token_base::reset() noexcept -> void
{
void* old_value = this;
m_state.compare_exchange_strong(old_value, nullptr, std::memory_order::acquire);
}
/// The file descriptor being polled on. This is needed so that if the timeout occurs first then
/// the event loop can immediately disable the event within epoll.
io_scheduler::fd_t m_fd{-1};
/// The timeout's position in the timeout map. A poll() with no timeout or yield() this is empty.
/// This is needed so that if the event occurs first then the event loop can immediately disable
/// the timeout within epoll.
std::optional<io_scheduler::timed_events::iterator> m_timer_pos{std::nullopt};
/// The awaiting coroutine for this poll info to resume upon event or timeout.
std::coroutine_handle<> m_awaiting_coroutine;
/// The status of the poll operation.
coro::poll_status m_poll_status{coro::poll_status::error};
/// Did the timeout and event trigger at the same time on the same epoll_wait call?
/// Once this is set to true all future events on this poll info are null and void.
bool m_processed{false};
};
auto detail::resume_token_base::operator=(resume_token_base&& other) -> resume_token_base&
{
if (std::addressof(other) != this)
{
m_scheduler = other.m_scheduler;
m_state = other.m_state.exchange(nullptr);
} // namespace detail
other.m_scheduler = nullptr;
}
return *this;
}
io_scheduler::task_manager::task_manager(const std::size_t reserve_size, const double growth_factor)
: m_growth_factor(growth_factor)
{
m_tasks.resize(reserve_size);
for (std::size_t i = 0; i < reserve_size; ++i)
{
m_task_indexes.emplace_back(i);
}
m_free_pos = m_task_indexes.begin();
}
auto io_scheduler::task_manager::store(coro::task<void> user_task) -> task<void>&
{
// Only grow if completely full and attempting to add more.
if (m_free_pos == m_task_indexes.end())
{
m_free_pos = grow();
}
// Store the task inside a cleanup task for self deletion.
auto index = *m_free_pos;
m_tasks[index] = make_cleanup_task(std::move(user_task), m_free_pos);
// Mark the current used slot as used.
std::advance(m_free_pos, 1);
return m_tasks[index];
}
auto io_scheduler::task_manager::gc() -> std::size_t
{
std::size_t deleted{0};
if (!m_tasks_to_delete.empty())
{
for (const auto& pos : m_tasks_to_delete)
{
// This doesn't actually 'delete' the task, it'll get overwritten when a
// new user task claims the free space. It could be useful to actually
// delete the tasks so the coroutine stack frames are destroyed. The advantage
// of letting a new task replace and old one though is that its a 1:1 exchange
// on delete and create, rather than a large pause here to delete all the
// completed tasks.
// Put the deleted position at the end of the free indexes list.
m_task_indexes.splice(m_task_indexes.end(), m_task_indexes, pos);
}
deleted = m_tasks_to_delete.size();
m_tasks_to_delete.clear();
}
return deleted;
}
auto io_scheduler::task_manager::grow() -> task_position
{
// Save an index at the current last item.
auto last_pos = std::prev(m_task_indexes.end());
std::size_t new_size = m_tasks.size() * m_growth_factor;
for (std::size_t i = m_tasks.size(); i < new_size; ++i)
{
m_task_indexes.emplace_back(i);
}
m_tasks.resize(new_size);
// Set the free pos to the item just after the previous last item.
return std::next(last_pos);
}
auto io_scheduler::task_manager::make_cleanup_task(task<void> user_task, task_position pos) -> task<void>
{
try
{
co_await user_task;
}
catch (const std::runtime_error& e)
{
// TODO: what would be a good way to report this to the user...? Catching here is required
// since the co_await will unwrap the unhandled exception on the task. The scheduler thread
// should really not throw unhandled exceptions, otherwise it'll take the application down.
// The user's task should ideally be wrapped in a catch all and handle it themselves.
std::cerr << "scheduler user_task had an unhandled exception e.what()= " << e.what() << "\n";
}
m_tasks_to_delete.push_back(pos);
co_return;
}
io_scheduler::io_scheduler(const options opts)
: m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)),
m_accept_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)),
m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)),
m_thread_strategy(opts.thread_strategy),
m_task_manager(opts.reserve_size, opts.growth_factor)
io_scheduler::io_scheduler(options opts)
: thread_pool(std::move(opts.pool)),
m_opts(std::move(opts)),
m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)),
m_shutdown_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)),
m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC))
{
epoll_event e{};
e.events = EPOLLIN;
e.data.ptr = const_cast<void*>(m_accept_ptr);
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_accept_fd, &e);
e.data.ptr = const_cast<void*>(m_shutdown_ptr);
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_shutdown_fd, &e);
e.data.ptr = const_cast<void*>(m_timer_ptr);
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_timer_fd, &e);
if (m_thread_strategy == thread_strategy_t::spawn)
if (m_opts.thread_strategy == thread_strategy_t::spawn)
{
m_scheduler_thread = std::thread([this] { process_events_dedicated_thread(); });
}
else if (m_thread_strategy == thread_strategy_t::adopt)
{
process_events_dedicated_thread();
m_io_thread = std::thread([this]() { process_events_dedicated_thread(); });
}
// else manual mode, the user must call process_events.
}
@ -173,16 +100,17 @@ io_scheduler::io_scheduler(const options opts)
io_scheduler::~io_scheduler()
{
shutdown();
if (m_io_thread.joinable())
{
m_io_thread.join();
}
if (m_epoll_fd != -1)
{
close(m_epoll_fd);
m_epoll_fd = -1;
}
if (m_accept_fd != -1)
{
close(m_accept_fd);
m_accept_fd = -1;
}
if (m_timer_fd != -1)
{
close(m_timer_fd);
@ -190,196 +118,38 @@ io_scheduler::~io_scheduler()
}
}
auto io_scheduler::schedule(coro::task<void> task) -> bool
auto io_scheduler::process_events(std::chrono::milliseconds timeout) -> std::size_t
{
if (is_shutdown())
{
return false;
}
// This function intentionally does not check to see if its executing on the thread that is
// processing events. If the given task recursively generates tasks it will result in a
// stack overflow very quickly. Instead it takes the long path of adding it to the FIFO
// queue and processing through the normal pipeline. This simplifies the code and also makes
// the order in which newly submitted tasks are more fair in regards to FIFO.
m_size.fetch_add(1, std::memory_order::relaxed);
{
std::lock_guard<std::mutex> lk{m_accept_mutex};
m_accept_queue.emplace_back(std::move(task));
}
// Send an event if one isn't already set. We use strong here to avoid spurious failures
// but if it fails due to it actually being set we don't want to retry.
bool expected{false};
if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
{
uint64_t value{1};
::write(m_accept_fd, &value, sizeof(value));
}
return true;
process_events_manual(timeout);
return m_size.load(std::memory_order::relaxed);
}
auto io_scheduler::schedule(std::vector<task<void>> tasks) -> bool
auto io_scheduler::schedule_after(std::chrono::milliseconds amount) -> coro::task<void>
{
if (is_shutdown())
{
return false;
}
m_size.fetch_add(tasks.size(), std::memory_order::relaxed);
{
std::lock_guard<std::mutex> lk{m_accept_mutex};
m_accept_queue.insert(
m_accept_queue.end(), std::make_move_iterator(tasks.begin()), std::make_move_iterator(tasks.end()));
// std::move(tasks.begin(), tasks.end(), std::back_inserter(m_accept_queue));
}
tasks.clear();
bool expected{false};
if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
{
uint64_t value{1};
::write(m_accept_fd, &value, sizeof(value));
}
return true;
return yield_for(amount);
}
auto io_scheduler::schedule_after(coro::task<void> task, std::chrono::milliseconds after) -> bool
auto io_scheduler::schedule_at(time_point time) -> coro::task<void>
{
if (m_shutdown_requested.load(std::memory_order::relaxed))
{
return false;
}
return schedule(make_scheduler_after_task(std::move(task), after));
}
auto io_scheduler::schedule_at(coro::task<void> task, time_point time) -> bool
{
auto now = clock::now();
// If the requested time is in the past (or now!) bail out!
if (time <= now)
{
return false;
}
auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
return schedule_after(std::move(task), amount);
}
auto io_scheduler::poll(fd_t fd, poll_op op, std::chrono::milliseconds timeout) -> coro::task<poll_status>
{
// Setup two events, a timeout event and the actual poll for op event.
// Whichever triggers first will delete the other to guarantee only one wins.
// The resume token will be set by the scheduler to what the event turned out to be.
using namespace std::chrono_literals;
bool timeout_requested = (timeout > 0ms);
resume_token<poll_status> token{};
timer_tokens::iterator timer_pos;
if (timeout_requested)
{
timer_pos = add_timer_token(clock::now() + timeout, &token);
}
epoll_event e{};
e.events = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLET | EPOLLRDHUP;
e.data.ptr = &token;
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e);
auto status = co_await unsafe_yield<poll_status>(token);
switch (status)
{
// The event triggered first, delete the timeout.
case poll_status::event:
if (timeout_requested)
{
remove_timer_token(timer_pos);
}
break;
default:
// Deleting the event is done regardless below in epoll_ctl()
break;
}
epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, nullptr);
co_return status;
}
auto io_scheduler::poll(const net::socket& sock, poll_op op, std::chrono::milliseconds timeout)
-> coro::task<poll_status>
{
return poll(sock.native_handle(), op, timeout);
}
auto io_scheduler::read(fd_t fd, std::span<char> buffer, std::chrono::milliseconds timeout)
-> coro::task<std::pair<poll_status, ssize_t>>
{
auto status = co_await poll(fd, poll_op::read, timeout);
switch (status)
{
case poll_status::event:
co_return {status, ::read(fd, buffer.data(), buffer.size())};
default:
co_return {status, 0};
}
}
auto io_scheduler::read(const net::socket& sock, std::span<char> buffer, std::chrono::milliseconds timeout)
-> coro::task<std::pair<poll_status, ssize_t>>
{
return read(sock.native_handle(), buffer, timeout);
}
auto io_scheduler::write(fd_t fd, const std::span<const char> buffer, std::chrono::milliseconds timeout)
-> coro::task<std::pair<poll_status, ssize_t>>
{
auto status = co_await poll(fd, poll_op::write, timeout);
switch (status)
{
case poll_status::event:
co_return {status, ::write(fd, buffer.data(), buffer.size())};
default:
co_return {status, 0};
}
}
auto io_scheduler::write(const net::socket& sock, const std::span<const char> buffer, std::chrono::milliseconds timeout)
-> coro::task<std::pair<poll_status, ssize_t>>
{
return write(sock.native_handle(), buffer, timeout);
}
auto io_scheduler::yield() -> coro::task<void>
{
co_await schedule();
co_return;
return yield_until(time);
}
auto io_scheduler::yield_for(std::chrono::milliseconds amount) -> coro::task<void>
{
// If the requested amount of time is negative or zero just return.
using namespace std::chrono_literals;
if (amount <= 0ms)
{
co_return;
co_await schedule();
}
else
{
// Yielding does not requiring setting the timer position on the poll info since
// it doesn't have a corresponding 'event' that can trigger, it always waits for
// the timeout to occur before resuming.
resume_token<poll_status> token{};
add_timer_token(clock::now() + amount, &token);
// Wait for the token timer to trigger.
co_await token;
detail::poll_info pi{};
add_timer_token(clock::now() + amount, pi);
co_await pi;
}
co_return;
}
@ -387,147 +157,94 @@ auto io_scheduler::yield_until(time_point time) -> coro::task<void>
{
auto now = clock::now();
// If the requested time is in the past (or now!) just return.
// If the requested time is in the past (or now!) bail out!
if (time <= now)
{
co_return;
}
auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
co_await yield_for(amount);
co_return;
}
auto io_scheduler::process_events(std::chrono::milliseconds timeout) -> std::size_t
{
process_events_external_thread(timeout);
return m_size.load(std::memory_order::relaxed);
}
auto io_scheduler::shutdown(shutdown_t wait_for_tasks) -> void
{
if (!m_shutdown_requested.exchange(true, std::memory_order::release))
{
// Signal the event loop to stop asap.
uint64_t value{1};
::write(m_accept_fd, &value, sizeof(value));
if (wait_for_tasks == shutdown_t::sync && m_scheduler_thread.joinable())
{
m_scheduler_thread.join();
}
}
}
auto io_scheduler::make_scheduler_after_task(coro::task<void> task, std::chrono::milliseconds wait_time)
-> coro::task<void>
{
// Wait for the period requested, and then resume their task.
co_await yield_for(wait_time);
co_await task;
co_return;
}
auto io_scheduler::add_timer_token(time_point tp, resume_token<poll_status>* token_ptr) -> timer_tokens::iterator
{
auto pos = m_timer_tokens.emplace(tp, token_ptr);
// If this item was inserted as the smallest time point, update the timeout.
if (pos == m_timer_tokens.begin())
{
update_timeout(clock::now());
}
return pos;
}
auto io_scheduler::remove_timer_token(timer_tokens::iterator pos) -> void
{
auto is_first = (m_timer_tokens.begin() == pos);
m_timer_tokens.erase(pos);
// If this was the first item, update the timeout. It would be acceptable to just let it
// also fire the timeout as the event loop will ignore it since nothing will have timed
// out but it feels like the right thing to do to update it to the correct timeout value.
if (is_first)
{
update_timeout(clock::now());
}
}
auto io_scheduler::resume(std::coroutine_handle<> handle) -> void
{
{
std::lock_guard<std::mutex> lk{m_accept_mutex};
m_accept_queue.emplace_back(handle);
}
// Signal to the event loop there is a task to resume if one hasn't already been sent.
bool expected{false};
if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
{
uint64_t value{1};
::write(m_accept_fd, &value, sizeof(value));
}
}
auto io_scheduler::process_task_and_start(task<void>& task) -> void
{
m_task_manager.store(std::move(task)).resume();
}
auto io_scheduler::process_task_variant(task_variant& tv) -> void
{
if (std::holds_alternative<coro::task<void>>(tv))
{
auto& task = std::get<coro::task<void>>(tv);
// Store the users task and immediately start executing it.
process_task_and_start(task);
co_await schedule();
}
else
{
auto handle = std::get<std::coroutine_handle<>>(tv);
// The cleanup wrapper task will catch all thrown exceptions unconditionally.
handle.resume();
auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
detail::poll_info pi{};
add_timer_token(now + amount, pi);
co_await pi;
}
co_return;
}
auto io_scheduler::poll(fd_t fd, coro::poll_op op, std::chrono::milliseconds timeout) -> coro::task<poll_status>
{
// Setup two events, a timeout event and the actual poll for op event.
// Whichever triggers first will delete the other to guarantee only one wins.
// The resume token will be set by the scheduler to what the event turned out to be.
bool timeout_requested = (timeout > 0ms);
detail::poll_info pi{};
pi.m_fd = fd;
if (timeout_requested)
{
pi.m_timer_pos = add_timer_token(clock::now() + timeout, pi);
}
epoll_event e{};
e.events = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLRDHUP;
e.data.ptr = &pi;
if (epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e) == -1)
{
std::cerr << "epoll ctl error on fd " << fd << "\n";
}
// The event loop will 'clean-up' whichever event didn't win since the coroutine is scheduled
// onto the thread poll its possible the other type of event could trigger while its waiting
// to execute again, thus restarting the coroutine twice, that would be quite bad.
co_return co_await pi;
}
auto io_scheduler::shutdown(shutdown_t wait_for_tasks) noexcept -> void
{
thread_pool::shutdown(wait_for_tasks);
// Signal the event loop to stop asap, triggering the event fd is safe.
uint64_t value{1};
::write(m_shutdown_fd, &value, sizeof(value));
}
auto io_scheduler::process_events_manual(std::chrono::milliseconds timeout) -> void
{
bool expected{false};
if (m_io_processing.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
{
process_events_execute(timeout);
m_io_processing.exchange(false, std::memory_order::release);
}
}
auto io_scheduler::process_task_queue() -> void
auto io_scheduler::process_events_dedicated_thread() -> void
{
std::size_t amount{0};
if (m_opts.on_io_thread_start_functor != nullptr)
{
std::lock_guard<std::mutex> lk{m_accept_mutex};
while (!m_accept_queue.empty() && amount < task_inline_process_amount)
{
m_processing_tasks[amount] = std::move(m_accept_queue.front());
m_accept_queue.pop_front();
++amount;
}
m_opts.on_io_thread_start_functor();
}
// The queue is empty, we are done here.
if (amount == 0)
m_io_processing.exchange(true, std::memory_order::release);
// Execute tasks until stopped or there are no more tasks to complete.
while (!m_shutdown_requested.load(std::memory_order::relaxed) || m_size.load(std::memory_order::relaxed) > 0)
{
return;
process_events_execute(m_default_timeout);
}
m_io_processing.exchange(false, std::memory_order::release);
for (std::size_t i = 0; i < amount; ++i)
if (m_opts.on_io_thread_stop_functor != nullptr)
{
process_task_variant(m_processing_tasks[i]);
m_opts.on_io_thread_stop_functor();
}
}
auto io_scheduler::process_events_poll_execute(std::chrono::milliseconds user_timeout) -> void
auto io_scheduler::process_events_execute(std::chrono::milliseconds timeout) -> void
{
// Need to acquire m_accept_queue size to determine if there are any pending tasks.
std::atomic_thread_fence(std::memory_order::acquire);
bool tasks_ready = !m_accept_queue.empty();
auto timeout = (tasks_ready) ? m_no_timeout : user_timeout;
// Poll is run every iteration to make sure 'waiting' events are properly put into
// the FIFO queue for when they are ready.
auto event_count = epoll_wait(m_epoll_fd, m_events.data(), m_max_events, timeout.count());
if (event_count > 0)
{
@ -536,72 +253,23 @@ auto io_scheduler::process_events_poll_execute(std::chrono::milliseconds user_ti
epoll_event& event = m_events[i];
void* handle_ptr = event.data.ptr;
if (handle_ptr == m_accept_ptr)
if (handle_ptr == m_timer_ptr)
{
uint64_t value{0};
::read(m_accept_fd, &value, sizeof(value));
(void)value; // discard, the read merely resets the eventfd counter to zero.
// Let any threads scheduling work know that the event set has been consumed.
// Important to do this after the accept file descriptor has been read.
// This needs to succeed so best practice is to loop compare exchange weak.
bool expected{true};
while (!m_event_set.compare_exchange_weak(
expected, false, std::memory_order::release, std::memory_order::relaxed))
{
}
tasks_ready = true;
// Process all events that have timed out.
process_timeout_execute();
}
else if (handle_ptr == m_timer_ptr)
else if (handle_ptr == m_shutdown_ptr) [[unlikely]]
{
// If the timer fd triggered, loop and call every task that has a wait time <= now.
while (!m_timer_tokens.empty())
{
// Now is continuously calculated since resuming tasks could take a fairly
// significant amount of time and might 'trigger' more timeouts.
auto now = clock::now();
auto first = m_timer_tokens.begin();
auto [tp, token_ptr] = *first;
if (tp <= now)
{
// Important to erase first so if any timers are updated after resume
// this timer won't be taken into account.
m_timer_tokens.erase(first);
// Every event triggered on the timer tokens is *always* a timeout.
token_ptr->resume(poll_status::timeout);
}
else
{
break;
}
}
// Update the time to the next smallest time point, re-take the current now time
// since processing tasks could shit the time.
update_timeout(clock::now());
// Nothing to do , just needed to wake-up and smell the flowers
}
else
{
// Individual poll task wake-up, this will queue the coroutines waiting
// on the resume token into the FIFO queue for processing.
auto* token_ptr = static_cast<resume_token<poll_status>*>(handle_ptr);
token_ptr->resume(event_to_poll_status(event.events));
process_event_execute(static_cast<detail::poll_info*>(handle_ptr), event_to_poll_status(event.events));
}
}
}
if (tasks_ready)
{
process_task_queue();
}
if (!m_task_manager.delete_tasks_empty())
{
m_size.fetch_sub(m_task_manager.gc(), std::memory_order::relaxed);
}
}
auto io_scheduler::event_to_poll_status(uint32_t events) -> poll_status
@ -622,34 +290,133 @@ auto io_scheduler::event_to_poll_status(uint32_t events) -> poll_status
throw std::runtime_error{"invalid epoll state"};
}
auto io_scheduler::process_events_external_thread(std::chrono::milliseconds user_timeout) -> void
auto io_scheduler::process_event_execute(detail::poll_info* pi, poll_status status) -> void
{
// Do not allow two threads to process events at the same time.
bool expected{false};
if (m_running.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
if (!pi->m_processed)
{
process_events_poll_execute(user_timeout);
m_running.exchange(false, std::memory_order::release);
std::atomic_thread_fence(std::memory_order::acquire);
// Its possible the event and the timeout occurred in the same epoll, make sure only one
// is ever processed, the other is discarded.
pi->m_processed = true;
// Given a valid fd always remove it from epoll so the next poll can blindly EPOLL_CTL_ADD.
if (pi->m_fd != -1)
{
epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, pi->m_fd, nullptr);
}
// Since this event triggered, remove its corresponding timeout if it has one.
if (pi->m_timer_pos.has_value())
{
remove_timer_token(pi->m_timer_pos.value());
}
pi->m_poll_status = status;
while (pi->m_awaiting_coroutine == nullptr)
{
std::atomic_thread_fence(std::memory_order::acquire);
}
resume(pi->m_awaiting_coroutine);
}
}
auto io_scheduler::process_events_dedicated_thread() -> void
auto io_scheduler::process_timeout_execute() -> void
{
m_running.exchange(true, std::memory_order::release);
// Execute tasks until stopped or there are more tasks to complete.
while (!m_shutdown_requested.load(std::memory_order::relaxed) || m_size.load(std::memory_order::relaxed) > 0)
std::vector<detail::poll_info*> poll_infos{};
auto now = clock::now();
{
process_events_poll_execute(m_default_timeout);
std::scoped_lock lk{m_timed_events_mutex};
while (!m_timed_events.empty())
{
auto first = m_timed_events.begin();
auto [tp, pi] = *first;
if (tp <= now)
{
m_timed_events.erase(first);
poll_infos.emplace_back(pi);
}
else
{
break;
}
}
}
std::vector<std::coroutine_handle<>> handles{};
handles.reserve(poll_infos.size());
for (auto pi : poll_infos)
{
if (!pi->m_processed)
{
// Its possible the event and the timeout occurred in the same epoll, make sure only one
// is ever processed, the other is discarded.
pi->m_processed = true;
// Since this timed out, remove its corresponding event if it has one.
if (pi->m_fd != -1)
{
epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, pi->m_fd, nullptr);
}
while (pi->m_awaiting_coroutine == nullptr)
{
std::atomic_thread_fence(std::memory_order::acquire);
// std::cerr << "process_event_execute() has a nullptr event\n";
}
handles.emplace_back(pi->m_awaiting_coroutine);
pi->m_poll_status = coro::poll_status::timeout;
}
}
// Resume all timed out coroutines.
resume(handles);
// Update the time to the next smallest time point, re-take the current now time
// since updating and resuming tasks could shift the time.
update_timeout(clock::now());
}
auto io_scheduler::add_timer_token(time_point tp, detail::poll_info& pi) -> timed_events::iterator
{
std::scoped_lock lk{m_timed_events_mutex};
auto pos = m_timed_events.emplace(tp, &pi);
// If this item was inserted as the smallest time point, update the timeout.
if (pos == m_timed_events.begin())
{
update_timeout(clock::now());
}
return pos;
}
auto io_scheduler::remove_timer_token(timed_events::iterator pos) -> void
{
{
std::scoped_lock lk{m_timed_events_mutex};
auto is_first = (m_timed_events.begin() == pos);
m_timed_events.erase(pos);
// If this was the first item, update the timeout. It would be acceptable to just let it
// also fire the timeout as the event loop will ignore it since nothing will have timed
// out but it feels like the right thing to do to update it to the correct timeout value.
if (is_first)
{
update_timeout(clock::now());
}
}
m_running.exchange(false, std::memory_order::release);
}
auto io_scheduler::update_timeout(time_point now) -> void
{
using namespace std::chrono_literals;
if (!m_timer_tokens.empty())
if (!m_timed_events.empty())
{
auto& [tp, task] = *m_timer_tokens.begin();
auto& [tp, pi] = *m_timed_events.begin();
auto amount = tp - now;
@ -658,7 +425,7 @@ auto io_scheduler::update_timeout(time_point now) -> void
auto nanoseconds = std::chrono::duration_cast<std::chrono::nanoseconds>(amount);
// As a safeguard if both values end up as zero (or negative) then trigger the timeout
// immediately as zero disarms timerfd according to the man pages and negative valeues
// immediately as zero disarms timerfd according to the man pages and negative values
// will result in an error return value.
if (seconds <= 0s)
{
@ -676,8 +443,7 @@ auto io_scheduler::update_timeout(time_point now) -> void
if (timerfd_settime(m_timer_fd, 0, &ts, nullptr) == -1)
{
std::string msg = "Failed to set timerfd errorno=[" + std::string{strerror(errno)} + "].";
throw std::runtime_error(msg.data());
std::cerr << "Failed to set timerfd errorno=[" << std::string{strerror(errno)} << "].";
}
}
else
@ -686,7 +452,10 @@ auto io_scheduler::update_timeout(time_point now) -> void
itimerspec ts{};
ts.it_value.tv_sec = 0;
ts.it_value.tv_nsec = 0;
timerfd_settime(m_timer_fd, 0, &ts, nullptr);
if (timerfd_settime(m_timer_fd, 0, &ts, nullptr) == -1)
{
std::cerr << "Failed to set timerfd errorno=[" << std::string{strerror(errno)} << "].";
}
}
}

View file

@ -15,7 +15,7 @@ auto mutex::try_lock() -> bool
auto mutex::unlock() -> void
{
m_state.exchange(false, std::memory_order::release);
// Get the next waiter before releasing the lock.
awaiter* next{nullptr};
{
std::scoped_lock lk{m_waiter_mutex};
@ -26,6 +26,12 @@ auto mutex::unlock() -> void
}
}
// Unlock the mutex
m_state.exchange(false, std::memory_order::release);
// If there was a awaiter, resume it. Here would be good place to _resume_ the waiter onto
// the thread pool to distribute the work, this currently implementation will end up having
// every waiter on the mutex jump onto a single thread.
if (next != nullptr)
{
next->m_awaiting_coroutine.resume();
@ -50,16 +56,17 @@ auto mutex::awaiter::await_suspend(std::coroutine_handle<> awaiting_coroutine) n
return false;
}
// Ok its still held, add ourself to the wiater list.
// Ok its still held, add ourself to the waiter list.
m_mutex.m_waiter_list.emplace_back(this);
}
// The mutex is still locked and we've added this to the waiter list, suspend now.
return true;
}
auto mutex::awaiter::await_resume() noexcept -> scoped_lock
{
return scoped_lock(m_mutex);
return scoped_lock{m_mutex};
}
} // namespace coro

View file

@ -35,18 +35,19 @@ auto ares_dns_callback(void* arg, int status, int /*timeouts*/, struct hostent*
if (result.m_pending_dns_requests == 0)
{
result.m_token.resume();
result.m_resume.set(result.m_io_scheduler);
}
}
dns_result::dns_result(coro::resume_token<void>& token, uint64_t pending_dns_requests)
: m_token(token),
dns_result::dns_result(coro::io_scheduler& scheduler, coro::event& resume, uint64_t pending_dns_requests)
: m_io_scheduler(scheduler),
m_resume(resume),
m_pending_dns_requests(pending_dns_requests)
{
}
dns_resolver::dns_resolver(io_scheduler& scheduler, std::chrono::milliseconds timeout)
: m_scheduler(scheduler),
: m_io_scheduler(scheduler),
m_timeout(timeout)
{
{
@ -89,17 +90,19 @@ dns_resolver::~dns_resolver()
auto dns_resolver::host_by_name(const net::hostname& hn) -> coro::task<std::unique_ptr<dns_result>>
{
auto token = m_scheduler.make_resume_token<void>();
auto result_ptr = std::make_unique<dns_result>(token, 2);
coro::event resume_event{};
auto result_ptr = std::make_unique<dns_result>(m_io_scheduler, resume_event, 2);
ares_gethostbyname(m_ares_channel, hn.data().data(), AF_INET, ares_dns_callback, result_ptr.get());
ares_gethostbyname(m_ares_channel, hn.data().data(), AF_INET6, ares_dns_callback, result_ptr.get());
std::vector<coro::task<void>> poll_tasks{};
// Add all required poll calls for ares to kick off the dns requests.
ares_poll();
// Suspend until this specific result is completed by ares.
co_await m_scheduler.yield(token);
co_await resume_event;
co_return result_ptr;
}
@ -138,21 +141,22 @@ auto dns_resolver::ares_poll() -> void
}
}
std::vector<coro::task<void>> poll_tasks{};
for (size_t i = 0; i < new_sockets; ++i)
{
io_scheduler::fd_t fd = static_cast<io_scheduler::fd_t>(ares_sockets[i]);
auto fd = static_cast<io_scheduler::fd_t>(ares_sockets[i]);
// If this socket is not currently actively polling, start polling!
if (m_active_sockets.emplace(fd).second)
{
m_scheduler.schedule(make_poll_task(fd, poll_ops[i]));
m_task_container.store(make_poll_task(fd, poll_ops[i])).resume();
}
}
}
auto dns_resolver::make_poll_task(io_scheduler::fd_t fd, poll_op ops) -> coro::task<void>
{
auto result = co_await m_scheduler.poll(fd, ops, m_timeout);
auto result = co_await m_io_scheduler.poll(fd, ops, m_timeout);
switch (result)
{
case poll_status::event:

View file

@ -1,7 +1,4 @@
#include "coro/net/tcp_client.hpp"
#include "coro/io_scheduler.hpp"
#include <ares.h>
namespace coro::net
{

View file

@ -13,11 +13,6 @@ tcp_server::tcp_server(io_scheduler& scheduler, options opts)
{
}
auto tcp_server::poll(std::chrono::milliseconds timeout) -> coro::task<coro::poll_status>
{
co_return co_await m_io_scheduler.poll(m_accept_socket, coro::poll_op::read, timeout);
}
auto tcp_server::accept() -> coro::net::tcp_client
{
sockaddr_in client{};

123
src/task_container.cpp Normal file
View file

@ -0,0 +1,123 @@
#include "coro/task_container.hpp"
#include <iostream>
namespace coro
{
task_container::task_container(const options opts) : m_growth_factor(opts.growth_factor)
{
m_tasks.resize(opts.reserve_size);
for (std::size_t i = 0; i < opts.reserve_size; ++i)
{
m_task_indexes.emplace_back(i);
}
m_free_pos = m_task_indexes.begin();
}
task_container::~task_container()
{
// TODO: Not entirely sure how to best do this as this could hold up the thread that could
// be finishing the remaining tasks..
while (!empty())
{
gc();
}
}
auto task_container::store(coro::task<void> user_task, garbage_collect cleanup) -> coro::task<void>&
{
std::scoped_lock lk{m_mutex};
if (cleanup == garbage_collect::yes)
{
gc_internal();
}
// Only grow if completely full and attempting to add more.
if (m_free_pos == m_task_indexes.end())
{
m_free_pos = grow();
}
// Store the task inside a cleanup task for self deletion.
auto index = *m_free_pos;
m_tasks[index] = make_cleanup_task(std::move(user_task), m_free_pos);
// Mark the current used slot as used.
std::advance(m_free_pos, 1);
return m_tasks[index];
}
auto task_container::gc() -> std::size_t
{
std::scoped_lock lk{m_mutex};
return gc_internal();
}
auto task_container::grow() -> task_position
{
// Save an index at the current last item.
auto last_pos = std::prev(m_task_indexes.end());
std::size_t new_size = m_tasks.size() * m_growth_factor;
for (std::size_t i = m_tasks.size(); i < new_size; ++i)
{
m_task_indexes.emplace_back(i);
}
m_tasks.resize(new_size);
// Set the free pos to the item just after the previous last item.
return std::next(last_pos);
}
auto task_container::gc_internal() -> std::size_t
{
std::size_t deleted{0};
if (!m_tasks_to_delete.empty())
{
for (const auto& pos : m_tasks_to_delete)
{
// This doesn't actually 'delete' the task, it'll get overwritten when a
// new user task claims the free space. It could be useful to actually
// delete the tasks so the coroutine stack frames are destroyed. The advantage
// of letting a new task replace and old one though is that its a 1:1 exchange
// on delete and create, rather than a large pause here to delete all the
// completed tasks.
// Put the deleted position at the end of the free indexes list.
m_task_indexes.splice(m_task_indexes.end(), m_task_indexes, pos);
}
deleted = m_tasks_to_delete.size();
m_tasks_to_delete.clear();
}
return deleted;
}
auto task_container::make_cleanup_task(task<void> user_task, task_position pos) -> coro::task<void>
{
try
{
co_await user_task;
}
catch (const std::exception& e)
{
// TODO: what would be a good way to report this to the user...? Catching here is required
// since the co_await will unwrap the unhandled exception on the task.
// The user's task should ideally be wrapped in a catch all and handle it themselves, but
// that cannot be guaranteed.
std::cerr << "task_container user_task had an unhandled exception e.what()= " << e.what() << "\n";
}
catch (...)
{
// don't crash if they throw something that isn't derived from std::exception
std::cerr << "task_container user_task had unhandle exception, not derived from std::exception.\n";
}
{
std::scoped_lock lk{m_mutex};
m_tasks_to_delete.push_back(pos);
}
co_return;
}
} // namespace coro

View file

@ -9,7 +9,7 @@ thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp)
auto thread_pool::operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
{
m_awaiting_coroutine = awaiting_coroutine;
m_thread_pool.schedule_impl(this);
m_thread_pool.schedule_impl(m_awaiting_coroutine);
// void return on await_suspend suspends the _this_ coroutine, which is now scheduled on the
// thread pool and returns control to the caller. They could be sync_wait'ing or go do
@ -31,15 +31,15 @@ thread_pool::~thread_pool()
shutdown();
}
auto thread_pool::schedule() noexcept -> std::optional<operation>
auto thread_pool::schedule() -> operation
{
if (!m_shutdown_requested.load(std::memory_order::relaxed))
{
m_size.fetch_add(1, std::memory_order_relaxed);
return {operation{*this}};
m_size.fetch_add(1, std::memory_order::relaxed);
return operation{*this};
}
return std::nullopt;
throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
}
auto thread_pool::shutdown(shutdown_t wait_for_tasks) noexcept -> void
@ -71,48 +71,26 @@ auto thread_pool::executor(std::stop_token stop_token, std::size_t idx) -> void
m_opts.on_thread_start_functor(idx);
}
while (true)
while (!stop_token.stop_requested())
{
// Wait until the queue has operations to execute or shutdown has been requested.
while (true)
{
std::unique_lock<std::mutex> lk{m_wait_mutex};
m_wait_cv.wait(lk, stop_token, [this] { return !m_queue.empty(); });
}
// Continue to pull operations from the global queue until its empty.
while (true)
{
operation* op{nullptr};
{
std::lock_guard<std::mutex> lk{m_queue_mutex};
if (!m_queue.empty())
{
op = m_queue.front();
m_queue.pop_front();
}
else
{
break; // while true, the queue is currently empty
}
}
if (op != nullptr && op->m_awaiting_coroutine != nullptr)
{
if (!op->m_awaiting_coroutine.done())
{
op->m_awaiting_coroutine.resume();
}
m_size.fetch_sub(1, std::memory_order::relaxed);
}
else
if (m_queue.empty())
{
lk.unlock(); // would happen on scope destruction, but being explicit/faster(?)
break;
}
}
if (stop_token.stop_requested())
{
break; // while(true);
auto handle = m_queue.front();
m_queue.pop_front();
lk.unlock(); // Not needed for processing the coroutine.
handle.resume();
m_size.fetch_sub(1, std::memory_order::release);
}
}
@ -122,11 +100,49 @@ auto thread_pool::executor(std::stop_token stop_token, std::size_t idx) -> void
}
}
auto thread_pool::schedule_impl(operation* op) noexcept -> void
auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
{
if (handle == nullptr)
{
std::lock_guard<std::mutex> lk{m_queue_mutex};
m_queue.emplace_back(op);
return;
}
{
std::scoped_lock lk{m_wait_mutex};
m_queue.emplace_back(handle);
}
m_wait_cv.notify_one();
}
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> void
{
if (handle == nullptr)
{
return;
}
m_size.fetch_add(1, std::memory_order::relaxed);
schedule_impl(handle);
}
auto thread_pool::resume(const std::vector<std::coroutine_handle<>>& handles) noexcept -> void
{
m_size.fetch_add(handles.size(), std::memory_order::relaxed);
{
std::scoped_lock lk{m_wait_mutex};
for (const auto& handle : handles)
{
if (handle != nullptr) [[likely]]
{
m_queue.emplace_back(handle);
}
else
{
m_size.fetch_sub(1, std::memory_order::release);
}
}
}
m_wait_cv.notify_one();

View file

@ -27,7 +27,7 @@ static auto print_stats(const std::string& bench_name, uint64_t operations, sc::
std::cout << " ops/sec: " << std::fixed << ops_per_sec << "\n";
}
TEST_CASE("benchmark counter func direct call")
TEST_CASE("benchmark counter func direct call", "[benchmark]")
{
constexpr std::size_t iterations = default_iterations;
std::atomic<uint64_t> counter{0};
@ -47,7 +47,7 @@ TEST_CASE("benchmark counter func direct call")
REQUIRE(counter == iterations);
}
TEST_CASE("benchmark counter func coro::sync_wait(awaitable)")
TEST_CASE("benchmark counter func coro::sync_wait(awaitable)", "[benchmark]")
{
constexpr std::size_t iterations = default_iterations;
uint64_t counter{0};
@ -64,7 +64,7 @@ TEST_CASE("benchmark counter func coro::sync_wait(awaitable)")
REQUIRE(counter == iterations);
}
TEST_CASE("benchmark counter func coro::sync_wait(coro::when_all_awaitable(awaitable)) x10")
TEST_CASE("benchmark counter func coro::sync_wait(coro::when_all_awaitable(awaitable)) x10", "[benchmark]")
{
constexpr std::size_t iterations = default_iterations;
uint64_t counter{0};
@ -84,7 +84,7 @@ TEST_CASE("benchmark counter func coro::sync_wait(coro::when_all_awaitable(await
REQUIRE(counter == iterations);
}
TEST_CASE("benchmark thread_pool{1} counter task")
TEST_CASE("benchmark thread_pool{1} counter task", "[benchmark]")
{
constexpr std::size_t iterations = default_iterations;
@ -92,7 +92,7 @@ TEST_CASE("benchmark thread_pool{1} counter task")
std::atomic<uint64_t> counter{0};
auto make_task = [](coro::thread_pool& tp, std::atomic<uint64_t>& c) -> coro::task<void> {
co_await tp.schedule().value();
co_await tp.schedule();
c.fetch_add(1, std::memory_order::relaxed);
co_return;
};
@ -115,7 +115,7 @@ TEST_CASE("benchmark thread_pool{1} counter task")
REQUIRE(tp.empty());
}
TEST_CASE("benchmark thread_pool{2} counter task")
TEST_CASE("benchmark thread_pool{2} counter task", "[benchmark]")
{
constexpr std::size_t iterations = default_iterations;
@ -123,7 +123,7 @@ TEST_CASE("benchmark thread_pool{2} counter task")
std::atomic<uint64_t> counter{0};
auto make_task = [](coro::thread_pool& tp, std::atomic<uint64_t>& c) -> coro::task<void> {
co_await tp.schedule().value();
co_await tp.schedule();
c.fetch_add(1, std::memory_order::relaxed);
co_return;
};
@ -141,51 +141,25 @@ TEST_CASE("benchmark thread_pool{2} counter task")
tp.shutdown();
print_stats("benchmark thread_pool{n} counter task", iterations, start, sc::now());
print_stats("benchmark thread_pool{2} counter task", iterations, start, sc::now());
REQUIRE(counter == iterations);
REQUIRE(tp.empty());
}
TEST_CASE("benchmark counter task io_scheduler")
{
constexpr std::size_t iterations = default_iterations;
coro::io_scheduler s1{};
std::atomic<uint64_t> counter{0};
auto func = [&]() -> coro::task<void> {
counter.fetch_add(1, std::memory_order::relaxed);
co_return;
};
auto start = sc::now();
for (std::size_t i = 0; i < iterations; ++i)
{
s1.schedule(func());
}
s1.shutdown();
print_stats("benchmark counter task through io_scheduler", iterations, start, sc::now());
REQUIRE(s1.empty());
REQUIRE(counter == iterations);
}
TEST_CASE("benchmark counter task io_scheduler yield -> resume from main")
TEST_CASE("benchmark counter task scheduler{1} yield", "[benchmark]")
{
constexpr std::size_t iterations = default_iterations;
constexpr std::size_t ops = iterations * 2; // the external resume is still a resume op
coro::io_scheduler s{};
std::vector<coro::resume_token<void>> tokens{};
for (std::size_t i = 0; i < iterations; ++i)
{
tokens.emplace_back(s.make_resume_token<void>());
}
coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
std::atomic<uint64_t> counter{0};
std::atomic<uint64_t> counter{0};
std::vector<coro::task<void>> tasks{};
tasks.reserve(iterations);
auto wait_func = [&](std::size_t index) -> coro::task<void> {
co_await s.yield<void>(tokens[index]);
auto make_task = [&]() -> coro::task<void> {
co_await s.schedule();
co_await s.yield();
counter.fetch_add(1, std::memory_order::relaxed);
co_return;
};
@ -194,44 +168,79 @@ TEST_CASE("benchmark counter task io_scheduler yield -> resume from main")
for (std::size_t i = 0; i < iterations; ++i)
{
s.schedule(wait_func(i));
tasks.emplace_back(make_task());
}
for (std::size_t i = 0; i < iterations; ++i)
{
tokens[i].resume();
}
s.shutdown();
coro::sync_wait(coro::when_all_awaitable(tasks));
auto stop = sc::now();
print_stats("benchmark counter task io_scheduler yield -> resume from main", ops, start, stop);
print_stats("benchmark counter task scheduler{1} yield", ops, start, stop);
REQUIRE(s.empty());
REQUIRE(counter == iterations);
}
TEST_CASE("benchmark counter task io_scheduler yield -> resume from coroutine")
TEST_CASE("benchmark counter task scheduler{1} yield_for", "[benchmark]")
{
constexpr std::size_t iterations = default_iterations;
constexpr std::size_t ops = iterations * 2; // each iteration executes 2 coroutines.
constexpr std::size_t ops = iterations * 2; // the external resume is still a resume op
coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
std::atomic<uint64_t> counter{0};
std::vector<coro::task<void>> tasks{};
tasks.reserve(iterations);
auto make_task = [&]() -> coro::task<void> {
co_await s.schedule();
co_await s.yield_for(std::chrono::milliseconds{1});
counter.fetch_add(1, std::memory_order::relaxed);
co_return;
};
auto start = sc::now();
coro::io_scheduler s{};
std::vector<coro::resume_token<void>> tokens{};
for (std::size_t i = 0; i < iterations; ++i)
{
tokens.emplace_back(s.make_resume_token<void>());
tasks.emplace_back(make_task());
}
coro::sync_wait(coro::when_all_awaitable(tasks));
auto stop = sc::now();
print_stats("benchmark counter task scheduler{1} yield", ops, start, stop);
REQUIRE(s.empty());
REQUIRE(counter == iterations);
}
TEST_CASE("benchmark counter task scheduler await event from another coroutine", "[benchmark]")
{
constexpr std::size_t iterations = default_iterations;
constexpr std::size_t ops = iterations * 3; // two tasks + event resume
coro::io_scheduler s{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
std::vector<std::unique_ptr<coro::event>> events{};
events.reserve(iterations);
for (std::size_t i = 0; i < iterations; ++i)
{
events.emplace_back(std::make_unique<coro::event>());
}
std::vector<coro::task<void>> tasks{};
tasks.reserve(iterations * 2); // one for wait, one for resume
std::atomic<uint64_t> counter{0};
auto wait_func = [&](std::size_t index) -> coro::task<void> {
co_await s.yield<void>(tokens[index]);
co_await s.schedule();
co_await* events[index];
counter.fetch_add(1, std::memory_order::relaxed);
co_return;
};
auto resume_func = [&](std::size_t index) -> coro::task<void> {
tokens[index].resume();
co_await s.schedule();
events[index]->set();
co_return;
};
@ -239,123 +248,56 @@ TEST_CASE("benchmark counter task io_scheduler yield -> resume from coroutine")
for (std::size_t i = 0; i < iterations; ++i)
{
s.schedule(wait_func(i));
s.schedule(resume_func(i));
tasks.emplace_back(wait_func(i));
tasks.emplace_back(resume_func(i));
}
s.shutdown();
coro::sync_wait(coro::when_all_awaitable(tasks));
auto stop = sc::now();
print_stats("benchmark counter task io_scheduler yield -> resume from coroutine", ops, start, stop);
print_stats("benchmark counter task scheduler await event from another coroutine", ops, start, stop);
REQUIRE(s.empty());
REQUIRE(counter == iterations);
}
TEST_CASE("benchmark counter task io_scheduler resume from coroutine -> yield")
TEST_CASE("benchmark tcp_server echo server", "[benchmark]")
{
constexpr std::size_t iterations = default_iterations;
constexpr std::size_t ops = iterations * 2; // each iteration executes 2 coroutines.
coro::io_scheduler s{};
std::vector<coro::resume_token<void>> tokens{};
for (std::size_t i = 0; i < iterations; ++i)
{
tokens.emplace_back(s.make_resume_token<void>());
}
std::atomic<uint64_t> counter{0};
auto wait_func = [&](std::size_t index) -> coro::task<void> {
co_await s.yield<void>(tokens[index]);
counter.fetch_add(1, std::memory_order::relaxed);
co_return;
};
auto resume_func = [&](std::size_t index) -> coro::task<void> {
tokens[index].resume();
co_return;
};
auto start = sc::now();
for (std::size_t i = 0; i < iterations; ++i)
{
s.schedule(resume_func(i));
s.schedule(wait_func(i));
}
s.shutdown();
auto stop = sc::now();
print_stats("benchmark counter task io_scheduler resume from coroutine -> yield", ops, start, stop);
REQUIRE(s.empty());
REQUIRE(counter == iterations);
}
TEST_CASE("benchmark counter task io_scheduler yield (all) -> resume (all) from coroutine with reserve")
{
constexpr std::size_t iterations = default_iterations;
constexpr std::size_t ops = iterations * 2; // each iteration executes 2 coroutines.
coro::io_scheduler s{coro::io_scheduler::options{.reserve_size = iterations}};
std::vector<coro::resume_token<void>> tokens{};
for (std::size_t i = 0; i < iterations; ++i)
{
tokens.emplace_back(s.make_resume_token<void>());
}
std::atomic<uint64_t> counter{0};
auto wait_func = [&](std::size_t index) -> coro::task<void> {
co_await s.yield<void>(tokens[index]);
counter.fetch_add(1, std::memory_order::relaxed);
co_return;
};
auto resume_func = [&](std::size_t index) -> coro::task<void> {
tokens[index].resume();
co_return;
};
auto start = sc::now();
for (std::size_t i = 0; i < iterations; ++i)
{
s.schedule(wait_func(i));
}
for (std::size_t i = 0; i < iterations; ++i)
{
s.schedule(resume_func(i));
}
s.shutdown();
auto stop = sc::now();
print_stats("benchmark counter task io_scheduler yield -> resume from coroutine with reserve", ops, start, stop);
REQUIRE(s.empty());
REQUIRE(counter == iterations);
}
TEST_CASE("benchmark tcp_server echo server")
{
/**
* This test *requires* two schedulers since polling on read/write of the sockets involved
* will reset/trample on each other when each side of the client + server go to poll().
*/
const constexpr std::size_t connections = 64;
const constexpr std::size_t connections = 16;
const constexpr std::size_t messages_per_connection = 10'000;
const constexpr std::size_t ops = connections * messages_per_connection;
const std::string msg = "im a data point in a stream of bytes";
coro::io_scheduler server_scheduler{};
coro::io_scheduler client_scheduler{};
const constexpr std::size_t server_count = 1;
const constexpr std::size_t client_count = 1;
std::atomic<bool> listening{false};
const constexpr std::size_t server_thread_count = 1;
const constexpr std::size_t client_thread_count = 1;
auto make_on_connection_task = [&](coro::net::tcp_client client) -> coro::task<void> {
std::atomic<uint64_t> listening{0};
std::atomic<uint64_t> accepted{0};
std::atomic<uint64_t> clients_completed{0};
std::atomic<uint64_t> server_id{0};
struct server
{
uint64_t id;
coro::io_scheduler scheduler{
coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = server_thread_count}}};
coro::task_container task_container{};
uint64_t live_clients{0};
coro::event wait_for_clients{};
};
struct client
{
coro::io_scheduler scheduler{
coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = client_thread_count}}};
std::vector<coro::task<void>> tasks{};
};
auto make_on_connection_task = [&](server& s, coro::net::tcp_client client) -> coro::task<void> {
std::string in(64, '\0');
// Echo the messages until the socket is closed. a 'done' message arrives.
@ -370,7 +312,6 @@ TEST_CASE("benchmark tcp_server echo server")
REQUIRE(rspan.empty());
break;
}
REQUIRE(rstatus == coro::net::recv_status::ok);
in.resize(rspan.size());
@ -380,39 +321,55 @@ TEST_CASE("benchmark tcp_server echo server")
REQUIRE(remaining.empty());
}
s.live_clients--;
if (s.live_clients == 0)
{
s.wait_for_clients.set();
}
co_return;
};
auto make_server_task = [&]() -> coro::task<void> {
coro::net::tcp_server server{server_scheduler};
auto make_server_task = [&](server& s) -> coro::task<void> {
co_await s.scheduler.schedule();
listening = true;
coro::net::tcp_server server{s.scheduler};
uint64_t accepted{0};
while (accepted < connections)
listening++;
while (accepted.load(std::memory_order::acquire) < connections)
{
auto pstatus = co_await server.poll();
REQUIRE(pstatus == coro::poll_status::event);
auto pstatus = co_await server.poll(std::chrono::milliseconds{1});
if (pstatus == coro::poll_status::event)
{
auto c = server.accept();
if (c.socket().is_valid())
{
accepted.fetch_add(1, std::memory_order::release);
auto client = server.accept();
REQUIRE(client.socket().is_valid());
server_scheduler.schedule(make_on_connection_task(std::move(client)));
++accepted;
s.live_clients++;
s.task_container.store(make_on_connection_task(s, std::move(c))).resume();
}
}
}
co_await s.wait_for_clients;
co_return;
};
auto make_client_task = [&]() -> coro::task<void> {
coro::net::tcp_client client{client_scheduler};
std::mutex g_histogram_mutex;
std::map<std::chrono::milliseconds, uint64_t> g_histogram;
auto cstatus = co_await client.connect();
auto make_client_task = [&](client& c) -> coro::task<void> {
co_await c.scheduler.schedule();
std::map<std::chrono::milliseconds, uint64_t> histogram;
coro::net::tcp_client client{c.scheduler};
auto cstatus = co_await client.connect(); // std::chrono::seconds{1});
REQUIRE(cstatus == coro::net::connect_status::connected);
for (size_t i = 1; i <= messages_per_connection; ++i)
{
auto req_start = std::chrono::steady_clock::now();
auto [sstatus, remaining] = client.send(msg);
REQUIRE(sstatus == coro::net::send_status::ok);
REQUIRE(remaining.empty());
@ -426,41 +383,76 @@ TEST_CASE("benchmark tcp_server echo server")
REQUIRE(rspan.size() == msg.size());
response.resize(rspan.size());
REQUIRE(response == msg);
auto req_stop = std::chrono::steady_clock::now();
histogram[std::chrono::duration_cast<std::chrono::milliseconds>(req_stop - req_start)]++;
}
{
std::scoped_lock lk{g_histogram_mutex};
for (auto [ms, count] : histogram)
{
g_histogram[ms] += count;
}
}
clients_completed.fetch_add(1);
co_return;
};
auto start = sc::now();
// Create the server to accept incoming tcp connections.
server_scheduler.schedule(make_server_task());
std::vector<std::thread> server_threads{};
for (size_t i = 0; i < server_count; ++i)
{
server_threads.emplace_back(std::thread{[&]() {
server s{};
s.id = server_id++;
coro::sync_wait(make_server_task(s));
s.scheduler.shutdown();
}});
}
// The server can take a small bit of time to start up, if we don't wait for it to notify then
// the first few connections can easily fail to connect causing this test to fail.
while (!listening)
while (listening != server_count)
{
std::this_thread::sleep_for(std::chrono::milliseconds{1});
}
// Spawn N client connections.
for (size_t i = 0; i < connections; ++i)
// Spawn N client connections across a set number of clients.
std::vector<std::thread> client_threads{};
std::vector<client> clients{};
for (size_t i = 0; i < client_count; ++i)
{
REQUIRE(client_scheduler.schedule(make_client_task()));
client_threads.emplace_back(std::thread{[&]() {
client c{};
for (size_t i = 0; i < connections / client_count; ++i)
{
c.tasks.emplace_back(make_client_task(c));
}
coro::sync_wait(coro::when_all_awaitable(c.tasks));
c.scheduler.shutdown();
}});
}
// Wait for all the connections to complete their work.
while (!client_scheduler.empty())
for (auto& ct : client_threads)
{
std::this_thread::sleep_for(std::chrono::milliseconds{1});
ct.join();
}
for (auto& st : server_threads)
{
st.join();
}
auto stop = sc::now();
print_stats("benchmark tcp_client and tcp_server", ops, start, stop);
server_scheduler.shutdown();
REQUIRE(server_scheduler.empty());
client_scheduler.shutdown();
REQUIRE(client_scheduler.empty());
for (const auto& [ms, count] : g_histogram)
{
std::cerr << ms.count() << " : " << count << "\n";
}
}

View file

@ -6,14 +6,11 @@
TEST_CASE("dns_resolver basic")
{
coro::io_scheduler scheduler{
coro::io_scheduler::options{.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn}};
coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
coro::net::dns_resolver dns_resolver{scheduler, std::chrono::milliseconds{5000}};
std::atomic<bool> done{false};
auto make_host_by_name_task = [&](coro::net::hostname hn) -> coro::task<void> {
co_await scheduler.schedule();
auto result_ptr = co_await std::move(dns_resolver.host_by_name(hn));
if (result_ptr->status() == coro::net::dns_status::complete)
@ -24,17 +21,10 @@ TEST_CASE("dns_resolver basic")
}
}
done = true;
co_return;
};
scheduler.schedule(make_host_by_name_task(coro::net::hostname{"www.example.com"}));
while (!done)
{
std::this_thread::sleep_for(std::chrono::milliseconds{10});
}
coro::sync_wait(make_host_by_name_task(coro::net::hostname{"www.example.com"}));
scheduler.shutdown();
REQUIRE(scheduler.empty());

View file

@ -2,53 +2,66 @@
#include <coro/coro.hpp>
TEST_CASE("tcp_server ping server")
#include <iostream>
TEST_CASE("tcp_server ping server", "[tcp_server]")
{
const std::string client_msg{"Hello from client"};
const std::string server_msg{"Reply from server!"};
coro::io_scheduler scheduler{};
coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
auto make_client_task = [&]() -> coro::task<void> {
co_await scheduler.schedule();
coro::net::tcp_client client{scheduler};
std::cerr << "client connect\n";
auto cstatus = co_await client.connect();
REQUIRE(cstatus == coro::net::connect_status::connected);
// Skip polling for write, should really only poll if the write is partial, shouldn't be
// required for this test.
std::cerr << "client send()\n";
auto [sstatus, remaining] = client.send(client_msg);
REQUIRE(sstatus == coro::net::send_status::ok);
REQUIRE(remaining.empty());
// Poll for the server's response.
std::cerr << "client poll(read)\n";
auto pstatus = co_await client.poll(coro::poll_op::read);
REQUIRE(pstatus == coro::poll_status::event);
std::string buffer(256, '\0');
std::cerr << "client recv()\n";
auto [rstatus, rspan] = client.recv(buffer);
REQUIRE(rstatus == coro::net::recv_status::ok);
REQUIRE(rspan.size() == server_msg.length());
buffer.resize(rspan.size());
REQUIRE(buffer == server_msg);
std::cerr << "client return\n";
co_return;
};
auto make_server_task = [&]() -> coro::task<void> {
co_await scheduler.schedule();
coro::net::tcp_server server{scheduler};
// Poll for client connection.
std::cerr << "server poll(accept)\n";
auto pstatus = co_await server.poll();
REQUIRE(pstatus == coro::poll_status::event);
std::cerr << "server accept()\n";
auto client = server.accept();
REQUIRE(client.socket().is_valid());
// Poll for client request.
std::cerr << "server poll(read)\n";
pstatus = co_await client.poll(coro::poll_op::read);
REQUIRE(pstatus == coro::poll_status::event);
std::string buffer(256, '\0');
std::cerr << "server recv()\n";
auto [rstatus, rspan] = client.recv(buffer);
REQUIRE(rstatus == coro::net::recv_status::ok);
REQUIRE(rspan.size() == client_msg.size());
@ -56,16 +69,14 @@ TEST_CASE("tcp_server ping server")
REQUIRE(buffer == client_msg);
// Respond to client.
std::cerr << "server send()\n";
auto [sstatus, remaining] = client.send(server_msg);
REQUIRE(sstatus == coro::net::send_status::ok);
REQUIRE(remaining.empty());
std::cerr << "server return\n";
co_return;
};
scheduler.schedule(make_server_task());
scheduler.schedule(make_client_task());
while (!scheduler.empty())
{
std::this_thread::sleep_for(std::chrono::milliseconds{1});
}
coro::sync_wait(coro::when_all_awaitable(make_server_task(), make_client_task()));
}

View file

@ -6,9 +6,10 @@ TEST_CASE("udp one way")
{
const std::string msg{"aaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbcccccccccccccccccc"};
coro::io_scheduler scheduler{};
coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
auto make_send_task = [&]() -> coro::task<void> {
co_await scheduler.schedule();
coro::net::udp_peer peer{scheduler};
coro::net::udp_peer::info peer_info{};
@ -20,6 +21,7 @@ TEST_CASE("udp one way")
};
auto make_recv_task = [&]() -> coro::task<void> {
co_await scheduler.schedule();
coro::net::udp_peer::info self_info{.address = coro::net::ip_address::from_string("0.0.0.0")};
coro::net::udp_peer self{scheduler, self_info};
@ -39,8 +41,7 @@ TEST_CASE("udp one way")
co_return;
};
scheduler.schedule(make_recv_task());
scheduler.schedule(make_send_task());
coro::sync_wait(coro::when_all_awaitable(make_recv_task(), make_send_task()));
}
TEST_CASE("udp echo peers")
@ -48,7 +49,7 @@ TEST_CASE("udp echo peers")
const std::string peer1_msg{"Hello from peer1!"};
const std::string peer2_msg{"Hello from peer2!!"};
coro::io_scheduler scheduler{};
coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
auto make_peer_task = [&scheduler](
uint16_t my_port,
@ -56,6 +57,7 @@ TEST_CASE("udp echo peers")
bool send_first,
const std::string my_msg,
const std::string peer_msg) -> coro::task<void> {
co_await scheduler.schedule();
coro::net::udp_peer::info my_info{.address = coro::net::ip_address::from_string("0.0.0.0"), .port = my_port};
coro::net::udp_peer::info peer_info{
.address = coro::net::ip_address::from_string("127.0.0.1"), .port = peer_port};
@ -108,6 +110,7 @@ TEST_CASE("udp echo peers")
co_return;
};
scheduler.schedule(make_peer_task(8081, 8080, false, peer2_msg, peer1_msg));
scheduler.schedule(make_peer_task(8080, 8081, true, peer1_msg, peer2_msg));
coro::sync_wait(coro::when_all_awaitable(
make_peer_task(8081, 8080, false, peer2_msg, peer1_msg),
make_peer_task(8080, 8081, true, peer1_msg, peer2_msg)));
}

View file

@ -5,7 +5,7 @@
#include <chrono>
#include <thread>
TEST_CASE("event single awaiter")
TEST_CASE("event single awaiter", "[event]")
{
coro::event e{};
@ -36,7 +36,7 @@ auto consumer(const coro::event& event) -> coro::task<uint64_t>
co_return 42;
}
TEST_CASE("event one watcher")
TEST_CASE("event one watcher", "[event]")
{
coro::event e{};
@ -49,7 +49,7 @@ TEST_CASE("event one watcher")
REQUIRE(value.promise().return_value() == 42);
}
TEST_CASE("event multiple watchers")
TEST_CASE("event multiple watchers", "[event]")
{
coro::event e{};
@ -70,7 +70,7 @@ TEST_CASE("event multiple watchers")
REQUIRE(value3.promise().return_value() == 42);
}
TEST_CASE("event reset")
TEST_CASE("event reset", "[event]")
{
coro::event e{};

View file

@ -2,7 +2,7 @@
#include <coro/coro.hpp>
TEST_CASE("generator single yield")
TEST_CASE("generator single yield", "[generator]")
{
std::string msg{"Hello World Generator!"};
auto func = [&]() -> coro::generator<std::string> { co_yield msg; };
@ -13,7 +13,7 @@ TEST_CASE("generator single yield")
}
}
TEST_CASE("generator infinite incrementing integer yield")
TEST_CASE("generator infinite incrementing integer yield", "[generator]")
{
constexpr const int64_t max = 1024;

File diff suppressed because it is too large Load diff

View file

@ -5,7 +5,7 @@
#include <chrono>
#include <thread>
TEST_CASE("latch count=0")
TEST_CASE("latch count=0", "[latch]")
{
coro::latch l{0};
@ -19,7 +19,7 @@ TEST_CASE("latch count=0")
REQUIRE(task.promise().return_value() == 42);
}
TEST_CASE("latch count=1")
TEST_CASE("latch count=1", "[latch]")
{
coro::latch l{1};
@ -37,7 +37,7 @@ TEST_CASE("latch count=1")
REQUIRE(task.promise().return_value() == 1);
}
TEST_CASE("latch count=1 count_down=5")
TEST_CASE("latch count=1 count_down=5", "[latch]")
{
coro::latch l{1};
@ -55,7 +55,7 @@ TEST_CASE("latch count=1 count_down=5")
REQUIRE(task.promise().return_value() == 1);
}
TEST_CASE("latch count=5 count_down=1 x5")
TEST_CASE("latch count=5 count_down=1 x5", "[latch]")
{
coro::latch l{5};
@ -81,7 +81,7 @@ TEST_CASE("latch count=5 count_down=1 x5")
REQUIRE(task.promise().return_value() == 5);
}
TEST_CASE("latch count=5 count_down=5")
TEST_CASE("latch count=5 count_down=5", "[latch]")
{
coro::latch l{5};

View file

@ -5,7 +5,7 @@
#include <chrono>
#include <thread>
TEST_CASE("mutex single waiter not locked")
TEST_CASE("mutex single waiter not locked", "[mutex]")
{
std::vector<uint64_t> output;
@ -29,7 +29,7 @@ TEST_CASE("mutex single waiter not locked")
REQUIRE(output[0] == 1);
}
TEST_CASE("mutex many waiters until event")
TEST_CASE("mutex many waiters until event", "[mutex]")
{
std::atomic<uint64_t> value{0};
std::vector<coro::task<void>> tasks;
@ -40,7 +40,7 @@ TEST_CASE("mutex many waiters until event")
coro::event e; // triggers the blocking thread to release the lock
auto make_task = [&](uint64_t id) -> coro::task<void> {
co_await tp.schedule().value();
co_await tp.schedule();
std::cerr << "id = " << id << " waiting to acquire the lock\n";
auto scoped_lock = co_await m.lock();
std::cerr << "id = " << id << " lock acquired\n";
@ -50,7 +50,7 @@ TEST_CASE("mutex many waiters until event")
};
auto make_block_task = [&]() -> coro::task<void> {
co_await tp.schedule().value();
co_await tp.schedule();
std::cerr << "block task acquiring lock\n";
auto scoped_lock = co_await m.lock();
std::cerr << "block task acquired lock, waiting on event\n";
@ -59,7 +59,7 @@ TEST_CASE("mutex many waiters until event")
};
auto make_set_task = [&]() -> coro::task<void> {
co_await tp.schedule().value();
co_await tp.schedule();
std::cerr << "set task setting event\n";
e.set();
co_return;

View file

@ -2,7 +2,7 @@
#include <coro/coro.hpp>
TEST_CASE("sync_wait simple integer return")
TEST_CASE("sync_wait simple integer return", "[sync_wait]")
{
auto func = []() -> coro::task<int> { co_return 11; };
@ -10,7 +10,7 @@ TEST_CASE("sync_wait simple integer return")
REQUIRE(result == 11);
}
TEST_CASE("sync_wait void")
TEST_CASE("sync_wait void", "[sync_wait]")
{
std::string output;
@ -23,7 +23,7 @@ TEST_CASE("sync_wait void")
REQUIRE(output == "hello from sync_wait<void>\n");
}
TEST_CASE("sync_wait task co_await single")
TEST_CASE("sync_wait task co_await single", "[sync_wait]")
{
auto answer = []() -> coro::task<int> {
std::cerr << "\tThinking deep thoughts...\n";
@ -47,7 +47,7 @@ TEST_CASE("sync_wait task co_await single")
REQUIRE(output == 1337);
}
TEST_CASE("sync_wait task that throws")
TEST_CASE("sync_wait task that throws", "[sync_wait]")
{
auto f = []() -> coro::task<uint64_t> {
throw std::runtime_error("I always throw!");

View file

@ -5,7 +5,7 @@
#include <chrono>
#include <thread>
TEST_CASE("task hello world")
TEST_CASE("task hello world", "[task]")
{
using task_type = coro::task<std::string>;
@ -28,7 +28,7 @@ TEST_CASE("task hello world")
REQUIRE(w.promise().return_value().empty());
}
TEST_CASE("task void")
TEST_CASE("task void", "[task]")
{
using namespace std::chrono_literals;
using task_type = coro::task<>;
@ -42,7 +42,7 @@ TEST_CASE("task void")
REQUIRE(t.is_ready());
}
TEST_CASE("task exception thrown")
TEST_CASE("task exception thrown", "[task]")
{
using task_type = coro::task<std::string>;
@ -71,7 +71,7 @@ TEST_CASE("task exception thrown")
REQUIRE(thrown);
}
TEST_CASE("task in a task")
TEST_CASE("task in a task", "[task]")
{
auto outer_task = []() -> coro::task<> {
auto inner_task = []() -> coro::task<int> {
@ -91,7 +91,7 @@ TEST_CASE("task in a task")
REQUIRE(outer_task.is_ready());
}
TEST_CASE("task in a task in a task")
TEST_CASE("task in a task in a task", "[task]")
{
auto task1 = []() -> coro::task<> {
std::cerr << "task1 start\n";
@ -121,7 +121,7 @@ TEST_CASE("task in a task in a task")
REQUIRE(task1.is_ready());
}
TEST_CASE("task multiple suspends return void")
TEST_CASE("task multiple suspends return void", "[task]")
{
auto task = []() -> coro::task<void> {
co_await std::suspend_always{};
@ -144,7 +144,7 @@ TEST_CASE("task multiple suspends return void")
REQUIRE(task.is_ready());
}
TEST_CASE("task multiple suspends return integer")
TEST_CASE("task multiple suspends return integer", "[task]")
{
auto task = []() -> coro::task<int> {
co_await std::suspend_always{};
@ -167,7 +167,7 @@ TEST_CASE("task multiple suspends return integer")
REQUIRE(task.promise().return_value() == 11);
}
TEST_CASE("task resume from promise to coroutine handles of different types")
TEST_CASE("task resume from promise to coroutine handles of different types", "[task]")
{
auto task1 = [&]() -> coro::task<int> {
std::cerr << "Task ran\n";
@ -199,7 +199,7 @@ TEST_CASE("task resume from promise to coroutine handles of different types")
REQUIRE(coro_handle2.done());
}
TEST_CASE("task throws void")
TEST_CASE("task throws void", "[task]")
{
auto task = []() -> coro::task<void> {
throw std::runtime_error{"I always throw."};
@ -211,7 +211,7 @@ TEST_CASE("task throws void")
REQUIRE_THROWS_AS(task.promise().return_value(), std::runtime_error);
}
TEST_CASE("task throws non-void l-value")
TEST_CASE("task throws non-void l-value", "[task]")
{
auto task = []() -> coro::task<int> {
throw std::runtime_error{"I always throw."};
@ -223,7 +223,7 @@ TEST_CASE("task throws non-void l-value")
REQUIRE_THROWS_AS(task.promise().return_value(), std::runtime_error);
}
TEST_CASE("task throws non-void r-value")
TEST_CASE("task throws non-void r-value", "[task]")
{
struct type
{

View file

@ -4,12 +4,12 @@
#include <iostream>
TEST_CASE("thread_pool one worker one task")
TEST_CASE("thread_pool one worker one task", "[thread_pool]")
{
coro::thread_pool tp{coro::thread_pool::options{1}};
auto func = [&tp]() -> coro::task<uint64_t> {
co_await tp.schedule().value(); // Schedule this coroutine on the scheduler.
co_await tp.schedule(); // Schedule this coroutine on the scheduler.
co_return 42;
};
@ -17,12 +17,12 @@ TEST_CASE("thread_pool one worker one task")
REQUIRE(result == 42);
}
TEST_CASE("thread_pool one worker many tasks tuple")
TEST_CASE("thread_pool one worker many tasks tuple", "[thread_pool]")
{
coro::thread_pool tp{coro::thread_pool::options{1}};
auto f = [&tp]() -> coro::task<uint64_t> {
co_await tp.schedule().value(); // Schedule this coroutine on the scheduler.
co_await tp.schedule(); // Schedule this coroutine on the scheduler.
co_return 50;
};
@ -35,12 +35,12 @@ TEST_CASE("thread_pool one worker many tasks tuple")
REQUIRE(counter == 250);
}
TEST_CASE("thread_pool one worker many tasks vector")
TEST_CASE("thread_pool one worker many tasks vector", "[thread_pool]")
{
coro::thread_pool tp{coro::thread_pool::options{1}};
auto f = [&tp]() -> coro::task<uint64_t> {
co_await tp.schedule().value(); // Schedule this coroutine on the scheduler.
co_await tp.schedule(); // Schedule this coroutine on the scheduler.
co_return 50;
};
@ -62,13 +62,13 @@ TEST_CASE("thread_pool one worker many tasks vector")
REQUIRE(counter == 150);
}
TEST_CASE("thread_pool N workers 100k tasks")
TEST_CASE("thread_pool N workers 100k tasks", "[thread_pool]")
{
constexpr const std::size_t iterations = 100'000;
coro::thread_pool tp{};
auto make_task = [](coro::thread_pool& tp) -> coro::task<uint64_t> {
co_await tp.schedule().value();
co_await tp.schedule();
co_return 1;
};
@ -91,15 +91,15 @@ TEST_CASE("thread_pool N workers 100k tasks")
REQUIRE(counter == iterations);
}
TEST_CASE("thread_pool 1 worker task spawns another task")
TEST_CASE("thread_pool 1 worker task spawns another task", "[thread_pool]")
{
coro::thread_pool tp{coro::thread_pool::options{1}};
auto f1 = [](coro::thread_pool& tp) -> coro::task<uint64_t> {
co_await tp.schedule().value();
co_await tp.schedule();
auto f2 = [](coro::thread_pool& tp) -> coro::task<uint64_t> {
co_await tp.schedule().value();
co_await tp.schedule();
co_return 5;
};
@ -109,18 +109,19 @@ TEST_CASE("thread_pool 1 worker task spawns another task")
REQUIRE(coro::sync_wait(f1(tp)) == 6);
}
TEST_CASE("thread_pool shutdown")
TEST_CASE("thread_pool shutdown", "[thread_pool]")
{
coro::thread_pool tp{coro::thread_pool::options{1}};
auto f = [](coro::thread_pool& tp) -> coro::task<bool> {
auto scheduled = tp.schedule();
if (!scheduled.has_value())
try
{
co_await tp.schedule();
}
catch (...)
{
co_return true;
}
co_await scheduled.value();
co_return false;
};
@ -129,7 +130,7 @@ TEST_CASE("thread_pool shutdown")
REQUIRE(coro::sync_wait(f(tp)) == true);
}
TEST_CASE("thread_pool schedule functor")
TEST_CASE("thread_pool schedule functor", "[thread_pool]")
{
coro::thread_pool tp{coro::thread_pool::options{1}};
@ -143,7 +144,7 @@ TEST_CASE("thread_pool schedule functor")
REQUIRE_THROWS(coro::sync_wait(tp.schedule(f)));
}
TEST_CASE("thread_pool schedule functor return_type = void")
TEST_CASE("thread_pool schedule functor return_type = void", "[thread_pool]")
{
coro::thread_pool tp{coro::thread_pool::options{1}};
@ -158,7 +159,7 @@ TEST_CASE("thread_pool schedule functor return_type = void")
REQUIRE_THROWS(coro::sync_wait(tp.schedule(f, std::ref(counter))));
}
TEST_CASE("thread_pool event jump threads")
TEST_CASE("thread_pool event jump threads", "[thread_pool]")
{
// This test verifies that the thread that sets the event ends up executing every waiter on the event
@ -168,7 +169,7 @@ TEST_CASE("thread_pool event jump threads")
coro::event e{};
auto make_tp1_task = [&]() -> coro::task<void> {
co_await tp1.schedule().value();
co_await tp1.schedule();
auto before_thread_id = std::this_thread::get_id();
std::cerr << "before event thread_id = " << before_thread_id << "\n";
co_await e;
@ -181,7 +182,7 @@ TEST_CASE("thread_pool event jump threads")
};
auto make_tp2_task = [&]() -> coro::task<void> {
co_await tp2.schedule().value();
co_await tp2.schedule();
std::this_thread::sleep_for(std::chrono::milliseconds{10});
std::cerr << "setting event\n";
e.set();

View file

@ -2,7 +2,7 @@
#include <coro/coro.hpp>
TEST_CASE("when_all_awaitable single task with tuple container")
TEST_CASE("when_all_awaitable single task with tuple container", "[when_all]")
{
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> { co_return amount; };
@ -15,7 +15,7 @@ TEST_CASE("when_all_awaitable single task with tuple container")
REQUIRE(counter == 100);
}
TEST_CASE("when_all_awaitable multiple tasks with tuple container")
TEST_CASE("when_all_awaitable multiple tasks with tuple container", "[when_all]")
{
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> { co_return amount; };
@ -28,7 +28,7 @@ TEST_CASE("when_all_awaitable multiple tasks with tuple container")
REQUIRE(counter == 170);
}
TEST_CASE("when_all_awaitable single task with vector container")
TEST_CASE("when_all_awaitable single task with vector container", "[when_all]")
{
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> { co_return amount; };
@ -47,7 +47,7 @@ TEST_CASE("when_all_awaitable single task with vector container")
REQUIRE(counter == 100);
}
TEST_CASE("when_all_ready multple task withs vector container")
TEST_CASE("when_all_ready multple task withs vector container", "[when_all]")
{
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> { co_return amount; };