1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

io_scheduler inline support (#79)

* io_scheduler inline support

* add debug info for io_scheduler size issue

* move poll info into its own file

* cleanup for feature

* Fix valgrind introduced use after free with inline processing

Running the coroutines inline with event processing caused
a use after free bug with valgrind detected in the inline
tcp server/client benchmark code.  Basically if an event
and a timeout occured in the same time period because the
inline processing would resume _inline_ with the event or the
timeout -- if the timeout and event occured in the same epoll_wait()
function call then the second one's coroutine stackframe would
already be destroyed upon resuming it so the poll_info->processed
check would be reading already free'ed memory.

The solution to this was to introduce a vector of coroutine handles
which are appended into on each epoll_wait() iteration of events
and timeouts, and only then after the events and timeouts are
deduplicated are the coroutine handles resumed.

This new vector has elided a malloc in the timeout function, but
there is still a malloc to extract the poll infos from the timeout
multimap data structure.  The vector is also on the class member
list and is only ever cleared, it is possible with a monster set
of timeouts that this vector could grow extremely large, but
I think that is worth the price of not re-allocating it.
This commit is contained in:
Josh Baldwin 2021-04-11 15:07:01 -06:00 committed by GitHub
parent 6220b61c68
commit e9b225e42f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 1240 additions and 671 deletions

View file

@ -2,9 +2,9 @@ cmake_minimum_required(VERSION 3.0)
project(libcoro CXX)
# Set the githooks directory to auto format and update the readme.
message("git config core.hooksPath .githooks")
message("${PROJECT_NAME} ${CMAKE_CURRENT_SOURCE_DIR} -> git config --local core.hooksPath .githooks")
execute_process(
COMMAND git config core.hooksPath .githooks
COMMAND git config --local core.hooksPath .githooks
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
)
@ -25,8 +25,11 @@ add_subdirectory(vendor/c-ares/c-ares)
set(LIBCORO_SOURCE_FILES
inc/coro/concepts/awaitable.hpp
inc/coro/concepts/buffer.hpp
inc/coro/concepts/executor.hpp
inc/coro/concepts/promise.hpp
inc/coro/concepts/range_of.hpp
inc/coro/detail/poll_info.hpp
inc/coro/detail/void_value.hpp
inc/coro/net/connect.hpp src/net/connect.cpp
@ -44,6 +47,7 @@ set(LIBCORO_SOURCE_FILES
inc/coro/coro.hpp
inc/coro/event.hpp src/event.cpp
inc/coro/fd.hpp
inc/coro/generator.hpp
inc/coro/io_scheduler.hpp src/io_scheduler.cpp
inc/coro/latch.hpp
@ -51,11 +55,10 @@ set(LIBCORO_SOURCE_FILES
inc/coro/poll.hpp
inc/coro/ring_buffer.hpp
inc/coro/semaphore.hpp src/semaphore.cpp
inc/coro/shared_mutex.hpp src/shared_mutex.cpp
inc/coro/shutdown.hpp
inc/coro/shared_mutex.hpp
inc/coro/stop_signal.hpp
inc/coro/sync_wait.hpp src/sync_wait.cpp
inc/coro/task_container.hpp src/task_container.cpp
inc/coro/task_container.hpp
inc/coro/task.hpp
inc/coro/thread_pool.hpp src/thread_pool.cpp
inc/coro/when_all.hpp

View file

@ -23,7 +23,7 @@ concept awaiter = requires(type t, std::coroutine_handle<> c)
std::same_as<decltype(t.await_suspend(c)), void> ||
std::same_as<decltype(t.await_suspend(c)), bool> ||
std::same_as<decltype(t.await_suspend(c)), std::coroutine_handle<>>;
{t.await_resume()};
{ t.await_resume() };
};
/**

View file

@ -0,0 +1,29 @@
#pragma once
#include "coro/concepts/awaitable.hpp"
#include <concepts>
#include <coroutine>
// #include <type_traits>
// #include <utility>
namespace coro::concepts
{
template<typename type>
concept executor = requires(type t, std::coroutine_handle<> c)
{
{
t.schedule()
}
->coro::concepts::awaiter;
{
t.yield()
}
->coro::concepts::awaiter;
{
t.resume(c)
}
->std::same_as<void>;
};
} // namespace coro::concepts

View file

@ -0,0 +1,20 @@
#pragma once
#include <concepts>
#include <ranges>
namespace coro::concepts
{
/**
* Concept to require that the range contains a specific type of value.
*/
template<class T, class V>
concept range_of = std::ranges::range<T>&& std::is_same_v<V, std::ranges::range_value_t<T>>;
/**
* Concept to require that a sized range contains a specific type of value.
*/
template<class T, class V>
concept sized_range_of = std::ranges::sized_range<T>&& std::is_same_v<V, std::ranges::range_value_t<T>>;
} // namespace coro::concepts

View file

@ -2,7 +2,9 @@
#include "coro/concepts/awaitable.hpp"
#include "coro/concepts/buffer.hpp"
#include "coro/concepts/executor.hpp"
#include "coro/concepts/promise.hpp"
#include "coro/concepts/range_of.hpp"
#include "coro/net/connect.hpp"
#include "coro/net/dns_resolver.hpp"
@ -25,7 +27,6 @@
#include "coro/ring_buffer.hpp"
#include "coro/semaphore.hpp"
#include "coro/shared_mutex.hpp"
#include "coro/shutdown.hpp"
#include "coro/stop_signal.hpp"
#include "coro/sync_wait.hpp"
#include "coro/task.hpp"

View file

@ -0,0 +1,74 @@
#pragma once
#include "coro/fd.hpp"
#include "coro/poll.hpp"
#include <atomic>
#include <chrono>
#include <coroutine>
#include <map>
#include <optional>
namespace coro::detail
{
/**
* Poll Info encapsulates everything about a poll operation for the event as well as its paired
* timeout. This is important since coroutines that are waiting on an event or timeout do not
* immediately execute, they are re-scheduled onto the thread pool, so its possible its pair
* event or timeout also triggers while the coroutine is still waiting to resume. This means that
* the first one to happen, the event itself or its timeout, needs to disable the other pair item
* prior to resuming the coroutine.
*
* Finally, its also important to note that the event and its paired timeout could happen during
* the same epoll_wait and possibly trigger the coroutine to start twice. Only one can win, so the
* first one processed sets m_processed to true and any subsequent events in the same epoll batch
* are effectively discarded.
*/
struct poll_info
{
using clock = std::chrono::steady_clock;
using time_point = clock::time_point;
using timed_events = std::multimap<time_point, detail::poll_info*>;
poll_info() = default;
~poll_info() = default;
poll_info(const poll_info&) = delete;
poll_info(poll_info&&) = delete;
auto operator=(const poll_info&) -> poll_info& = delete;
auto operator=(poll_info&&) -> poll_info& = delete;
struct poll_awaiter
{
explicit poll_awaiter(poll_info& pi) noexcept : m_pi(pi) {}
auto await_ready() const noexcept -> bool { return false; }
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
{
m_pi.m_awaiting_coroutine = awaiting_coroutine;
std::atomic_thread_fence(std::memory_order::release);
}
auto await_resume() noexcept -> coro::poll_status { return m_pi.m_poll_status; }
poll_info& m_pi;
};
auto operator co_await() noexcept -> poll_awaiter { return poll_awaiter{*this}; }
/// The file descriptor being polled on. This is needed so that if the timeout occurs first then
/// the event loop can immediately disable the event within epoll.
fd_t m_fd{-1};
/// The timeout's position in the timeout map. A poll() with no timeout or yield() this is empty.
/// This is needed so that if the event occurs first then the event loop can immediately disable
/// the timeout within epoll.
std::optional<timed_events::iterator> m_timer_pos{std::nullopt};
/// The awaiting coroutine for this poll info to resume upon event or timeout.
std::coroutine_handle<> m_awaiting_coroutine;
/// The status of the poll operation.
coro::poll_status m_poll_status{coro::poll_status::error};
/// Did the timeout and event trigger at the same time on the same epoll_wait call?
/// Once this is set to true all future events on this poll info are null and void.
bool m_processed{false};
};
} // namespace coro::detail

View file

@ -1,5 +1,7 @@
#pragma once
#include "coro/concepts/executor.hpp"
#include <atomic>
#include <coroutine>
@ -27,36 +29,6 @@ t2: resume()
class event
{
public:
/**
* Creates an event with the given initial state of being set or not set.
* @param initially_set By default all events start as not set, but if needed this parameter can
* set the event to already be triggered.
*/
explicit event(bool initially_set = false) noexcept;
~event() = default;
event(const event&) = delete;
event(event&&) = delete;
auto operator=(const event&) -> event& = delete;
auto operator=(event&&) -> event& = delete;
/**
* @return True if this event is currently in the set state.
*/
auto is_set() const noexcept -> bool { return m_state.load(std::memory_order_acquire) == this; }
/**
* Sets this event and resumes all awaiters. Note that all waiters will be resumed onto this
* thread of execution.
*/
auto set() noexcept -> void;
/**
* Sets this event and resumes all awaiters onto the given thread pool. This will distribute
* the waiters across the thread pools threads.
*/
auto set(coro::thread_pool& tp) noexcept -> void;
struct awaiter
{
/**
@ -90,6 +62,50 @@ public:
awaiter* m_next{nullptr};
};
/**
* Creates an event with the given initial state of being set or not set.
* @param initially_set By default all events start as not set, but if needed this parameter can
* set the event to already be triggered.
*/
explicit event(bool initially_set = false) noexcept;
~event() = default;
event(const event&) = delete;
event(event&&) = delete;
auto operator=(const event&) -> event& = delete;
auto operator=(event&&) -> event& = delete;
/**
* @return True if this event is currently in the set state.
*/
auto is_set() const noexcept -> bool { return m_state.load(std::memory_order_acquire) == this; }
/**
* Sets this event and resumes all awaiters. Note that all waiters will be resumed onto this
* thread of execution.
*/
auto set() noexcept -> void;
/**
* Sets this event and resumes all awaiters onto the given thread pool. This will distribute
* the waiters across the thread pools threads.
*/
template<concepts::executor executor_type>
auto set(executor_type& e) noexcept -> void
{
void* old_value = m_state.exchange(this, std::memory_order::acq_rel);
if (old_value != this)
{
auto* waiters = static_cast<awaiter*>(old_value);
while (waiters != nullptr)
{
auto* next = waiters->m_next;
e.resume(waiters->m_awaiting_coroutine);
waiters = next;
}
}
}
/**
* @return An awaiter struct to suspend and resume this coroutine for when the event is set.
*/

7
inc/coro/fd.hpp Normal file
View file

@ -0,0 +1,7 @@
#pragma once
namespace coro
{
using fd_t = int;
} // namespace coro

View file

@ -1,6 +1,7 @@
#pragma once
#include "coro/event.hpp"
#include "coro/detail/poll_info.hpp"
#include "coro/fd.hpp"
#include "coro/net/socket.hpp"
#include "coro/poll.hpp"
#include "coro/thread_pool.hpp"
@ -8,35 +9,44 @@
#include <chrono>
#include <functional>
#include <map>
#include <optional>
#include <sys/eventfd.h>
#include <thread>
#include <vector>
namespace coro
{
namespace detail
class io_scheduler
{
class poll_info;
} // namespace detail
class io_scheduler : public coro::thread_pool
{
friend detail::poll_info;
using clock = std::chrono::steady_clock;
using time_point = clock::time_point;
using timed_events = std::multimap<time_point, detail::poll_info*>;
using clock = detail::poll_info::clock;
using time_point = detail::poll_info::time_point;
using timed_events = detail::poll_info::timed_events;
public:
using fd_t = int;
class schedule_operation;
friend schedule_operation;
enum class thread_strategy_t
{
/// Spawns a background thread for the scheduler to run on.
/// Spawns a dedicated background thread for the scheduler to run on.
spawn,
/// Requires the user to call process_events() to drive the scheduler
/// Requires the user to call process_events() to drive the scheduler.
manual
};
enum class execution_strategy_t
{
/// Tasks will be FIFO queued to be executed on a thread pool. This is better for tasks that
/// are long lived and will use lots of CPU because long lived tasks will block other i/o
/// operations while they complete. This strategy is generally better for lower latency
/// requirements at the cost of throughput.
process_tasks_on_thread_pool,
/// Tasks will be executed inline on the io scheduler thread. This is better for short tasks
/// that can be quickly processed and not block other i/o operations for very long. This
/// strategy is generally better for higher throughput at the cost of latency.
process_tasks_inline
};
struct options
{
/// Should the io scheduler spawn a dedicated event processor?
@ -50,6 +60,10 @@ public:
.thread_count = ((std::thread::hardware_concurrency() > 1) ? (std::thread::hardware_concurrency() - 1) : 1),
.on_thread_start_functor = nullptr,
.on_thread_stop_functor = nullptr};
/// If inline task processing is enabled then the io worker will resume tasks on its thread
/// rather than scheduling them to be picked up by the thread pool.
const execution_strategy_t execution_strategy{execution_strategy_t::process_tasks_on_thread_pool};
};
explicit io_scheduler(
@ -57,18 +71,19 @@ public:
.thread_strategy = thread_strategy_t::spawn,
.on_io_thread_start_functor = nullptr,
.on_io_thread_stop_functor = nullptr,
.pool = {
.thread_count =
((std::thread::hardware_concurrency() > 1) ? (std::thread::hardware_concurrency() - 1) : 1),
.on_thread_start_functor = nullptr,
.on_thread_stop_functor = nullptr}});
.pool =
{.thread_count =
((std::thread::hardware_concurrency() > 1) ? (std::thread::hardware_concurrency() - 1) : 1),
.on_thread_start_functor = nullptr,
.on_thread_stop_functor = nullptr},
.execution_strategy = execution_strategy_t::process_tasks_on_thread_pool});
io_scheduler(const io_scheduler&) = delete;
io_scheduler(io_scheduler&&) = delete;
auto operator=(const io_scheduler&) -> io_scheduler& = delete;
auto operator=(io_scheduler&&) -> io_scheduler& = delete;
virtual ~io_scheduler() override;
~io_scheduler();
/**
* Given a thread_strategy_t::manual this function should be called at regular intervals to
@ -82,6 +97,61 @@ public:
*/
auto process_events(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> std::size_t;
class schedule_operation
{
friend class io_scheduler;
explicit schedule_operation(io_scheduler& scheduler) noexcept : m_scheduler(scheduler) {}
public:
/**
* Operations always pause so the executing thread can be switched.
*/
auto await_ready() noexcept -> bool { return false; }
/**
* Suspending always returns to the caller (using void return of await_suspend()) and
* stores the coroutine internally for the executing thread to resume from.
*/
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
{
if (m_scheduler.m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
{
m_scheduler.m_size.fetch_add(1, std::memory_order::release);
{
std::scoped_lock lk{m_scheduler.m_scheduled_tasks_mutex};
m_scheduler.m_scheduled_tasks.emplace_back(awaiting_coroutine);
}
// Trigger the event to wake-up the scheduler if this event isn't currently triggered.
bool expected{false};
if (m_scheduler.m_schedule_fd_triggered.compare_exchange_strong(
expected, true, std::memory_order::release, std::memory_order::relaxed))
{
eventfd_t value{1};
eventfd_write(m_scheduler.m_schedule_fd, value);
}
}
else
{
m_scheduler.m_thread_pool->resume(awaiting_coroutine);
}
}
/**
* no-op as this is the function called first by the thread pool's executing thread.
*/
auto await_resume() noexcept -> void {}
private:
/// The thread pool that this operation will execute on.
io_scheduler& m_scheduler;
};
/**
* Schedules the current task onto this io_scheduler for execution.
*/
auto schedule() -> schedule_operation { return schedule_operation{*this}; }
/**
* Schedules the current task to run after the given amount of time has elapsed.
* @param amount The amount of time to wait before resuming execution of this task.
@ -96,6 +166,11 @@ public:
*/
[[nodiscard]] auto schedule_at(time_point time) -> coro::task<void>;
/**
* Yields the current task to the end of the queue of waiting tasks.
*/
[[nodiscard]] auto yield() -> schedule_operation { return schedule_operation{*this}; };
/**
* Yields the current task for the given amount of time.
* @param amount The amount of time to yield for before resuming executino of this task.
@ -137,14 +212,57 @@ public:
}
/**
* Starts the shutdown of the io scheduler. All currently executing and pending tasks will complete
* prior to shutting down.
* @param wait_for_tasks Given shutdown_t::sync this function will block until all oustanding
* tasks are completed. Given shutdown_t::async this function will trigger
* the shutdown process but return immediately. In this case the io_scheduler's
* destructor will block if any background threads haven't joined.
* Resumes execution of a direct coroutine handle on this io scheduler.
* @param handle The coroutine handle to resume execution.
*/
auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) noexcept -> void override;
auto resume(std::coroutine_handle<> handle) -> void
{
if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
{
{
std::scoped_lock lk{m_scheduled_tasks_mutex};
m_scheduled_tasks.emplace_back(handle);
}
bool expected{false};
if (m_schedule_fd_triggered.compare_exchange_strong(
expected, true, std::memory_order::release, std::memory_order::relaxed))
{
eventfd_t value{1};
eventfd_write(m_schedule_fd, value);
}
}
else
{
m_thread_pool->resume(handle);
}
}
/**
* @return The number of tasks waiting in the task queue + the executing tasks.
*/
auto size() const noexcept -> std::size_t
{
if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
{
return m_size.load(std::memory_order::acquire);
}
else
{
return m_size.load(std::memory_order::acquire) + m_thread_pool->size();
}
}
/**
* @return True if the task queue is empty and zero tasks are currently executing.
*/
auto empty() const noexcept -> bool { return size() == 0; }
/**
* Starts the shutdown of the io scheduler. All currently executing and pending tasks will complete
* prior to shutting down. This call is blocking and will not return until all tasks complete.
*/
auto shutdown() noexcept -> void;
private:
/// The configuration options.
@ -156,27 +274,35 @@ private:
fd_t m_shutdown_fd{-1};
/// The event loop timer fd for timed events, e.g. yield_for() or scheduler_after().
fd_t m_timer_fd{-1};
/// The schedule file descriptor if the scheduler is in inline processing mode.
fd_t m_schedule_fd{-1};
std::atomic<bool> m_schedule_fd_triggered{false};
/// The number of tasks executing or awaiting events in this io scheduler.
std::atomic<std::size_t> m_size{0};
/// The background io worker threads.
std::thread m_io_thread;
/// Thread pool for executing tasks when not in inline mode.
std::unique_ptr<thread_pool> m_thread_pool{nullptr};
std::mutex m_timed_events_mutex{};
/// The map of time point's to poll infos for tasks that are yielding for a period of time
/// or for tasks that are polling with timeouts.
timed_events m_timed_events{};
/// Has the io_scheduler been requested to shut down?
std::atomic<bool> m_shutdown_requested{false};
std::atomic<bool> m_io_processing{false};
auto process_events_manual(std::chrono::milliseconds timeout) -> void;
auto process_events_dedicated_thread() -> void;
auto process_events_execute(std::chrono::milliseconds timeout) -> void;
static auto event_to_poll_status(uint32_t events) -> poll_status;
auto process_event_execute(detail::poll_info* pi, poll_status status) -> void;
auto process_timeout_execute() -> void;
auto add_timer_token(time_point tp, detail::poll_info& pi) -> timed_events::iterator;
auto remove_timer_token(timed_events::iterator pos) -> void;
auto update_timeout(time_point now) -> void;
auto process_scheduled_execute_inline() -> void;
std::mutex m_scheduled_tasks_mutex{};
std::vector<std::coroutine_handle<>> m_scheduled_tasks{};
static constexpr const int m_shutdown_object{0};
static constexpr const void* m_shutdown_ptr = &m_shutdown_object;
@ -184,10 +310,21 @@ private:
static constexpr const int m_timer_object{0};
static constexpr const void* m_timer_ptr = &m_timer_object;
static constexpr const int m_schedule_object{0};
static constexpr const void* m_schedule_ptr = &m_schedule_object;
static const constexpr std::chrono::milliseconds m_default_timeout{1000};
static const constexpr std::chrono::milliseconds m_no_timeout{0};
static const constexpr std::size_t m_max_events = 8;
static const constexpr std::size_t m_max_events = 16;
std::array<struct epoll_event, m_max_events> m_events{};
std::vector<std::coroutine_handle<>> m_handles_to_resume{};
auto process_event_execute(detail::poll_info* pi, poll_status status) -> void;
auto process_timeout_execute() -> void;
auto add_timer_token(time_point tp, detail::poll_info& pi) -> timed_events::iterator;
auto remove_timer_token(timed_events::iterator pos) -> void;
auto update_timeout(time_point now) -> void;
};
} // namespace coro

View file

@ -1,5 +1,6 @@
#pragma once
#include "coro/fd.hpp"
#include "coro/io_scheduler.hpp"
#include "coro/net/hostname.hpp"
#include "coro/net/ip_address.hpp"
@ -83,9 +84,9 @@ private:
/// This is the set of sockets that are currently being actively polled so multiple poll tasks
/// are not setup when ares_poll() is called.
std::unordered_set<io_scheduler::fd_t> m_active_sockets{};
std::unordered_set<fd_t> m_active_sockets{};
task_container m_task_container;
task_container<io_scheduler> m_task_container;
/// Global count to track if c-ares has been initialized or cleaned up.
static uint64_t m_ares_count;
@ -93,7 +94,7 @@ private:
static std::mutex m_ares_mutex;
auto ares_poll() -> void;
auto make_poll_task(io_scheduler::fd_t fd, poll_op ops) -> coro::task<void>;
auto make_poll_task(fd_t fd, poll_op ops) -> coro::task<void>;
};
} // namespace coro::net

View file

@ -1,28 +1,31 @@
#pragma once
#include "coro/concepts/executor.hpp"
#include <atomic>
#include <coroutine>
#include <mutex>
namespace coro
{
template<concepts::executor executor_type>
class shared_mutex;
class thread_pool;
/**
* A scoped RAII lock holder for a coro::shared_mutex. It will call the appropriate unlock() or
* unlock_shared() based on how the coro::shared_mutex was originally acquired, either shared or
* exclusive modes.
*/
template<concepts::executor executor_type>
class shared_scoped_lock
{
public:
shared_scoped_lock(shared_mutex& sm, bool exclusive) : m_shared_mutex(&sm), m_exclusive(exclusive) {}
shared_scoped_lock(shared_mutex<executor_type>& sm, bool exclusive) : m_shared_mutex(&sm), m_exclusive(exclusive) {}
/**
* Unlocks the mutex upon this shared scoped lock destructing.
*/
~shared_scoped_lock();
~shared_scoped_lock() { unlock(); }
shared_scoped_lock(const shared_scoped_lock&) = delete;
shared_scoped_lock(shared_scoped_lock&& other)
@ -45,22 +48,38 @@ public:
/**
* Unlocks the shared mutex prior to this lock going out of scope.
*/
auto unlock() -> void;
auto unlock() -> void
{
if (m_shared_mutex != nullptr)
{
if (m_exclusive)
{
m_shared_mutex->unlock();
}
else
{
m_shared_mutex->unlock_shared();
}
m_shared_mutex = nullptr;
}
}
private:
shared_mutex* m_shared_mutex{nullptr};
bool m_exclusive{false};
shared_mutex<executor_type>* m_shared_mutex{nullptr};
bool m_exclusive{false};
};
template<concepts::executor executor_type>
class shared_mutex
{
public:
/**
* @param tp The thread pool for when multiple shared waiters can be woken up at the same time,
* each shared waiter will be scheduled to immediately run on this thread pool in
* parralell.
* @param e The thread pool for when multiple shared waiters can be woken up at the same time,
* each shared waiter will be scheduled to immediately run on this thread pool in
* parallel.
*/
explicit shared_mutex(coro::thread_pool& tp);
explicit shared_mutex(executor_type& e) : m_executor(e) {}
~shared_mutex() = default;
shared_mutex(const shared_mutex&) = delete;
@ -72,9 +91,66 @@ public:
{
lock_operation(shared_mutex& sm, bool exclusive) : m_shared_mutex(sm), m_exclusive(exclusive) {}
auto await_ready() const noexcept -> bool;
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool;
auto await_resume() noexcept -> shared_scoped_lock { return shared_scoped_lock{m_shared_mutex, m_exclusive}; }
auto await_ready() const noexcept -> bool
{
if (m_exclusive)
{
return m_shared_mutex.try_lock();
}
else
{
return m_shared_mutex.try_lock_shared();
}
}
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
{
std::unique_lock lk{m_shared_mutex.m_mutex};
// Its possible the lock has been released between await_ready() and await_suspend(), double
// check and make sure we are not going to suspend when nobody holds the lock.
if (m_exclusive)
{
if (m_shared_mutex.try_lock_locked(lk))
{
return false;
}
}
else
{
if (m_shared_mutex.try_lock_shared_locked(lk))
{
return false;
}
}
// For sure the lock is currently held in a manner that it cannot be acquired, suspend ourself
// at the end of the waiter list.
if (m_shared_mutex.m_tail_waiter == nullptr)
{
m_shared_mutex.m_head_waiter = this;
m_shared_mutex.m_tail_waiter = this;
}
else
{
m_shared_mutex.m_tail_waiter->m_next = this;
m_shared_mutex.m_tail_waiter = this;
}
// If this is an exclusive lock acquire then mark it as so so that shared locks after this
// exclusive one will also suspend so this exclusive lock doens't get starved.
if (m_exclusive)
{
++m_shared_mutex.m_exclusive_waiters;
}
m_awaiting_coroutine = awaiting_coroutine;
return true;
}
auto await_resume() noexcept -> shared_scoped_lock<executor_type>
{
return shared_scoped_lock{m_shared_mutex, m_exclusive};
}
private:
friend class shared_mutex;
@ -99,12 +175,27 @@ public:
/**
* @return True if the lock could immediately be acquired in a shared state.
*/
auto try_lock_shared() -> bool;
auto try_lock_shared() -> bool
{
// To acquire the shared lock the state must be one of two states:
// 1) unlocked
// 2) shared locked with zero exclusive waiters
// Zero exclusive waiters prevents exclusive starvation if shared locks are
// always continuously happening.
std::unique_lock lk{m_mutex};
return try_lock_shared_locked(lk);
}
/**
* @return True if the lock could immediately be acquired in an exclusive state.
*/
auto try_lock() -> bool;
auto try_lock() -> bool
{
// To acquire the exclusive lock the state must be unlocked.
std::unique_lock lk{m_mutex};
return try_lock_locked(lk);
}
/**
* Unlocks a single shared state user. *REQUIRES* that the lock was first acquired exactly once
@ -114,7 +205,24 @@ public:
* If the shared user count drops to zero and this lock has an exclusive waiter then the exclusive
* waiter acquires the lock.
*/
auto unlock_shared() -> void;
auto unlock_shared() -> void
{
std::unique_lock lk{m_mutex};
--m_shared_users;
// Only wake waiters from shared state if all shared users have completed.
if (m_shared_users == 0)
{
if (m_head_waiter != nullptr)
{
wake_waiters(lk);
}
else
{
m_state = state::unlocked;
}
}
}
/**
* Unlocks the mutex from its exclusive state. If there is a following exclusive watier then
@ -122,7 +230,18 @@ public:
* shared waiters acquire the lock in a shared state in parallel and are resumed on the original
* thread pool this shared mutex was created with.
*/
auto unlock() -> void;
auto unlock() -> void
{
std::unique_lock lk{m_mutex};
if (m_head_waiter != nullptr)
{
wake_waiters(lk);
}
else
{
m_state = state::unlocked;
}
}
private:
friend class lock_operation;
@ -134,8 +253,8 @@ private:
locked_exclusive
};
/// This thread pool is for resuming multiple shared waiters.
coro::thread_pool& m_thread_pool;
/// This executor is for resuming multiple shared waiters.
executor_type& m_executor;
std::mutex m_mutex;
@ -150,10 +269,87 @@ private:
lock_operation* m_head_waiter{nullptr};
lock_operation* m_tail_waiter{nullptr};
auto try_lock_shared_locked(std::unique_lock<std::mutex>& lk) -> bool;
auto try_lock_locked(std::unique_lock<std::mutex>& lk) -> bool;
auto try_lock_shared_locked(std::unique_lock<std::mutex>& lk) -> bool
{
if (m_state == state::unlocked)
{
// If the shared mutex is unlocked put it into shared mode and add ourself as using the lock.
m_state = state::locked_shared;
++m_shared_users;
lk.unlock();
return true;
}
else if (m_state == state::locked_shared && m_exclusive_waiters == 0)
{
// If the shared mutex is in a shared locked state and there are no exclusive waiters
// the add ourself as using the lock.
++m_shared_users;
lk.unlock();
return true;
}
auto wake_waiters(std::unique_lock<std::mutex>& lk) -> void;
// If the lock is in shared mode but there are exclusive waiters then we will also wait so
// the writers are not starved.
// If the lock is in exclusive mode already then we need to wait.
return false;
}
auto try_lock_locked(std::unique_lock<std::mutex>& lk) -> bool
{
if (m_state == state::unlocked)
{
m_state = state::locked_exclusive;
lk.unlock();
return true;
}
return false;
}
auto wake_waiters(std::unique_lock<std::mutex>& lk) -> void
{
// First determine what the next lock state will be based on the first waiter.
if (m_head_waiter->m_exclusive)
{
// If its exclusive then only this waiter can be woken up.
m_state = state::locked_exclusive;
lock_operation* to_resume = m_head_waiter;
m_head_waiter = m_head_waiter->m_next;
--m_exclusive_waiters;
if (m_head_waiter == nullptr)
{
m_tail_waiter = nullptr;
}
// Since this is an exclusive lock waiting we can resume it directly.
lk.unlock();
to_resume->m_awaiting_coroutine.resume();
}
else
{
// If its shared then we will scan forward and awake all shared waiters onto the given
// thread pool so they can run in parallel.
m_state = state::locked_shared;
do
{
lock_operation* to_resume = m_head_waiter;
m_head_waiter = m_head_waiter->m_next;
if (m_head_waiter == nullptr)
{
m_tail_waiter = nullptr;
}
++m_shared_users;
m_executor.resume(to_resume->m_awaiting_coroutine);
} while (m_head_waiter != nullptr && !m_head_waiter->m_exclusive);
// Cannot unlock until the entire set of shared waiters has been traversed. I think this
// makes more sense than allocating space for all the shared waiters, unlocking, and then
// resuming in a batch?
lk.unlock();
}
}
};
} // namespace coro

View file

@ -1,13 +0,0 @@
#pragma once
namespace coro
{
enum class shutdown_t
{
/// Synchronously wait for all tasks to complete when calling shutdown.
sync,
/// Asynchronously let tasks finish on the background thread on shutdown.
async
};
} // namespace coro

View file

@ -1,16 +1,17 @@
#pragma once
#include "coro/concepts/executor.hpp"
#include "coro/task.hpp"
#include <atomic>
#include <iostream>
#include <list>
#include <mutex>
#include <vector>
namespace coro
{
class thread_pool;
template<concepts::executor executor_type>
class task_container
{
public:
@ -25,16 +26,33 @@ public:
};
/**
* @param tp Tasks started in the container are scheduled onto this thread pool. For tasks created
* @param e Tasks started in the container are scheduled onto this executor. For tasks created
* from a coro::io_scheduler, this would usually be that coro::io_scheduler instance.
* @param opts Task container options.
*/
task_container(thread_pool& tp, const options opts = options{.reserve_size = 8, .growth_factor = 2});
task_container(executor_type& e, const options opts = options{.reserve_size = 8, .growth_factor = 2})
: m_growth_factor(opts.growth_factor),
m_executor(e)
{
m_tasks.resize(opts.reserve_size);
for (std::size_t i = 0; i < opts.reserve_size; ++i)
{
m_task_indexes.emplace_back(i);
}
m_free_pos = m_task_indexes.begin();
}
task_container(const task_container&) = delete;
task_container(task_container&&) = delete;
auto operator=(const task_container&) -> task_container& = delete;
auto operator=(task_container&&) -> task_container& = delete;
~task_container();
~task_container()
{
// This will hang the current thread.. but if tasks are not complete thats also pretty bad.
while (!empty())
{
garbage_collect();
}
}
enum class garbage_collect_t
{
@ -51,14 +69,44 @@ public:
* call? Calling at regular intervals will reduce memory usage of completed
* tasks and allow for the task container to re-use allocated space.
*/
auto start(coro::task<void> user_task, garbage_collect_t cleanup = garbage_collect_t::yes) -> void;
auto start(coro::task<void> user_task, garbage_collect_t cleanup = garbage_collect_t::yes) -> void
{
m_size.fetch_add(1, std::memory_order::relaxed);
std::scoped_lock lk{m_mutex};
if (cleanup == garbage_collect_t::yes)
{
gc_internal();
}
// Only grow if completely full and attempting to add more.
if (m_free_pos == m_task_indexes.end())
{
m_free_pos = grow();
}
// Store the task inside a cleanup task for self deletion.
auto index = *m_free_pos;
m_tasks[index] = make_cleanup_task(std::move(user_task), m_free_pos);
// Mark the current used slot as used.
std::advance(m_free_pos, 1);
// Start executing from the cleanup task to schedule the user's task onto the thread pool.
m_tasks[index].resume();
}
/**
* Garbage collects any tasks that are marked as deleted. This frees up space to be re-used by
* the task container for newly stored tasks.
* @return The number of tasks that were deleted.
*/
auto garbage_collect() -> std::size_t;
auto garbage_collect() -> std::size_t
{
std::scoped_lock lk{m_mutex};
return gc_internal();
}
/**
* @return The number of tasks that are awaiting deletion.
@ -104,19 +152,59 @@ public:
* This does not shut down the task container, but can be used when shutting down, or if your
* logic requires all the tasks contained within to complete, it is similar to coro::latch.
*/
auto garbage_collect_and_yield_until_empty() -> coro::task<void>;
auto garbage_collect_and_yield_until_empty() -> coro::task<void>
{
while (!empty())
{
garbage_collect();
co_await m_executor.yield();
}
}
private:
/**
* Grows each task container by the growth factor.
* @return The position of the free index after growing.
*/
auto grow() -> task_position;
auto grow() -> task_position
{
// Save an index at the current last item.
auto last_pos = std::prev(m_task_indexes.end());
std::size_t new_size = m_tasks.size() * m_growth_factor;
for (std::size_t i = m_tasks.size(); i < new_size; ++i)
{
m_task_indexes.emplace_back(i);
}
m_tasks.resize(new_size);
// Set the free pos to the item just after the previous last item.
return std::next(last_pos);
}
/**
* Interal GC call, expects the public function to lock.
*/
auto gc_internal() -> std::size_t;
auto gc_internal() -> std::size_t
{
std::size_t deleted{0};
if (!m_tasks_to_delete.empty())
{
for (const auto& pos : m_tasks_to_delete)
{
// This doesn't actually 'delete' the task, it'll get overwritten when a
// new user task claims the free space. It could be useful to actually
// delete the tasks so the coroutine stack frames are destroyed. The advantage
// of letting a new task replace and old one though is that its a 1:1 exchange
// on delete and create, rather than a large pause here to delete all the
// completed tasks.
// Put the deleted position at the end of the free indexes list.
m_task_indexes.splice(m_task_indexes.end(), m_task_indexes, pos);
}
deleted = m_tasks_to_delete.size();
m_tasks_to_delete.clear();
}
return deleted;
}
/**
* Encapsulate the users tasks in a cleanup task which marks itself for deletion upon
@ -130,7 +218,37 @@ private:
* @param pos The position where the task data will be stored in the task manager.
* @return The user's task wrapped in a self cleanup task.
*/
auto make_cleanup_task(task<void> user_task, task_position pos) -> coro::task<void>;
auto make_cleanup_task(task<void> user_task, task_position pos) -> coro::task<void>
{
// Immediately move the task onto the executor.
co_await m_executor.schedule();
try
{
// Await the users task to complete.
co_await user_task;
}
catch (const std::exception& e)
{
// TODO: what would be a good way to report this to the user...? Catching here is required
// since the co_await will unwrap the unhandled exception on the task.
// The user's task should ideally be wrapped in a catch all and handle it themselves, but
// that cannot be guaranteed.
std::cerr << "coro::task_container user_task had an unhandled exception e.what()= " << e.what() << "\n";
}
catch (...)
{
// don't crash if they throw something that isn't derived from std::exception
std::cerr << "coro::task_container user_task had unhandle exception, not derived from std::exception.\n";
}
std::scoped_lock lk{m_mutex};
m_tasks_to_delete.push_back(pos);
// This has to be done within scope lock to make sure this coroutine task completes before the
// task container object destructs -- if it was waiting on .empty() to become true.
m_size.fetch_sub(1, std::memory_order::relaxed);
co_return;
}
/// Mutex for safely mutating the task containers across threads, expected usage is within
/// thread pools for indeterminate lifetime requests.
@ -148,8 +266,8 @@ private:
task_position m_free_pos{};
/// The amount to grow the containers by when all spaces are taken.
double m_growth_factor{};
/// The thread pool to schedule tasks that have just started.
thread_pool& m_thread_pool;
/// The executor to schedule tasks that have just started.
executor_type& m_executor;
};
} // namespace coro

View file

@ -1,7 +1,7 @@
#pragma once
#include "coro/concepts/range_of.hpp"
#include "coro/event.hpp"
#include "coro/shutdown.hpp"
#include "coro/task.hpp"
#include <atomic>
@ -11,15 +11,13 @@
#include <functional>
#include <mutex>
#include <optional>
#include <ranges>
#include <thread>
#include <variant>
#include <vector>
namespace coro
{
class event;
class shared_mutex;
/**
* Creates a thread pool that executes arbitrary coroutine tasks in a FIFO scheduler policy.
* The thread pool by default will create an execution thread per available core on the system.
@ -133,6 +131,46 @@ public:
}
}
/**
* Schedules any coroutine handle that is ready to be resumed.
* @param handle The coroutine handle to schedule.
*/
auto resume(std::coroutine_handle<> handle) noexcept -> void;
/**
* Schedules the set of coroutine handles that are ready to be resumed.
* @param handles The coroutine handles to schedule.
*/
template<coro::concepts::range_of<std::coroutine_handle<>> range_type>
auto resume(const range_type& handles) noexcept -> void
{
m_size.fetch_add(std::size(handles), std::memory_order::release);
size_t null_handles{0};
{
std::scoped_lock lk{m_wait_mutex};
for (const auto& handle : handles)
{
if (handle != nullptr) [[likely]]
{
m_queue.emplace_back(handle);
}
else
{
++null_handles;
}
}
}
if (null_handles > 0)
{
m_size.fetch_sub(null_handles, std::memory_order::release);
}
m_wait_cv.notify_one();
}
/**
* Immediately yields the current task and places it at the end of the queue of tasks waiting
* to be processed. This will immediately be picked up again once it naturally goes through the
@ -143,11 +181,10 @@ public:
/**
* Shutsdown the thread pool. This will finish any tasks scheduled prior to calling this
* function but will prevent the thread pool from scheduling any new tasks.
* @param wait_for_tasks Should this function block until all remaining scheduled tasks have
* completed? Pass in sync to wait, or async to not block.
* function but will prevent the thread pool from scheduling any new tasks. This call is
* blocking and will wait until all inflight tasks are completed before returnin.
*/
virtual auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) noexcept -> void;
auto shutdown() noexcept -> void;
/**
* @return The number of tasks waiting in the task queue + the executing tasks.
@ -198,22 +235,10 @@ private:
*/
auto schedule_impl(std::coroutine_handle<> handle) noexcept -> void;
protected:
/// The number of tasks in the queue + currently executing.
std::atomic<std::size_t> m_size{0};
/// Has the thread pool been requested to shut down?
std::atomic<bool> m_shutdown_requested{false};
/// Required to resume all waiters of the event onto a thread_pool.
friend event;
friend shared_mutex;
/**
* Schedules any coroutine that is ready to be resumed.
* @param handle The coroutine handle to schedule.
*/
auto resume(std::coroutine_handle<> handle) noexcept -> void;
auto resume(const std::vector<std::coroutine_handle<>>& handles) noexcept -> void;
};
} // namespace coro

View file

@ -24,22 +24,22 @@ auto event::set() noexcept -> void
}
}
auto event::set(coro::thread_pool& tp) noexcept -> void
{
// Exchange the state to this, if the state was previously not this, then traverse the list
// of awaiters and resume their coroutines.
void* old_value = m_state.exchange(this, std::memory_order::acq_rel);
if (old_value != this)
{
auto* waiters = static_cast<awaiter*>(old_value);
while (waiters != nullptr)
{
auto* next = waiters->m_next;
tp.resume(waiters->m_awaiting_coroutine);
waiters = next;
}
}
}
// auto event::set(coro::thread_pool& tp) noexcept -> void
// {
// // Exchange the state to this, if the state was previously not this, then traverse the list
// // of awaiters and resume their coroutines.
// void* old_value = m_state.exchange(this, std::memory_order::acq_rel);
// if (old_value != this)
// {
// auto* waiters = static_cast<awaiter*>(old_value);
// while (waiters != nullptr)
// {
// auto* next = waiters->m_next;
// tp.resume(waiters->m_awaiting_coroutine);
// waiters = next;
// }
// }
// }
auto event::awaiter::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
{

View file

@ -14,73 +14,18 @@ using namespace std::chrono_literals;
namespace coro
{
namespace detail
{
/**
* Poll Info encapsulates everything about a poll operation for the event as well as its paired
* timeout. This is important since coroutines that are waiting on an event or timeout do not
* immediately execute, they are re-scheduled onto the thread pool, so its possible its pair
* event or timeout also triggers while the coroutine is still waiting to resume. This means that
* the first one to happen, the event itself or its timeout, needs to disable the other pair item
* prior to resuming the coroutine.
*
* Finally, its also important to note that the event and its paired timeout could happen during
* the same epoll_wait and possibly trigger the coroutine to start twice. Only one can win, so the
* first one processed sets m_processed to true and any subsequent events in the same epoll batch
* are effectively discarded.
*/
struct poll_info
{
poll_info() = default;
~poll_info() = default;
poll_info(const poll_info&) = delete;
poll_info(poll_info&&) = delete;
auto operator=(const poll_info&) -> poll_info& = delete;
auto operator=(poll_info&&) -> poll_info& = delete;
struct poll_awaiter
{
explicit poll_awaiter(poll_info& pi) noexcept : m_pi(pi) {}
auto await_ready() const noexcept -> bool { return false; }
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void
{
m_pi.m_awaiting_coroutine = awaiting_coroutine;
std::atomic_thread_fence(std::memory_order::release);
}
auto await_resume() noexcept -> coro::poll_status { return m_pi.m_poll_status; }
poll_info& m_pi;
};
auto operator co_await() noexcept -> poll_awaiter { return poll_awaiter{*this}; }
/// The file descriptor being polled on. This is needed so that if the timeout occurs first then
/// the event loop can immediately disable the event within epoll.
io_scheduler::fd_t m_fd{-1};
/// The timeout's position in the timeout map. A poll() with no timeout or yield() this is empty.
/// This is needed so that if the event occurs first then the event loop can immediately disable
/// the timeout within epoll.
std::optional<io_scheduler::timed_events::iterator> m_timer_pos{std::nullopt};
/// The awaiting coroutine for this poll info to resume upon event or timeout.
std::coroutine_handle<> m_awaiting_coroutine;
/// The status of the poll operation.
coro::poll_status m_poll_status{coro::poll_status::error};
/// Did the timeout and event trigger at the same time on the same epoll_wait call?
/// Once this is set to true all future events on this poll info are null and void.
bool m_processed{false};
};
} // namespace detail
io_scheduler::io_scheduler(options opts)
: thread_pool(std::move(opts.pool)),
m_opts(std::move(opts)),
: m_opts(std::move(opts)),
m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)),
m_shutdown_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)),
m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC))
m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)),
m_schedule_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK))
{
if (opts.execution_strategy == execution_strategy_t::process_tasks_on_thread_pool)
{
m_thread_pool = std::make_unique<thread_pool>(std::move(m_opts.pool));
}
epoll_event e{};
e.events = EPOLLIN;
@ -90,6 +35,9 @@ io_scheduler::io_scheduler(options opts)
e.data.ptr = const_cast<void*>(m_timer_ptr);
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_timer_fd, &e);
e.data.ptr = const_cast<void*>(m_schedule_ptr);
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_schedule_fd, &e);
if (m_opts.thread_strategy == thread_strategy_t::spawn)
{
m_io_thread = std::thread([this]() { process_events_dedicated_thread(); });
@ -116,12 +64,17 @@ io_scheduler::~io_scheduler()
close(m_timer_fd);
m_timer_fd = -1;
}
if (m_schedule_fd != -1)
{
close(m_schedule_fd);
m_schedule_fd = -1;
}
}
auto io_scheduler::process_events(std::chrono::milliseconds timeout) -> std::size_t
{
process_events_manual(timeout);
return m_size.load(std::memory_order::relaxed);
return size();
}
auto io_scheduler::schedule_after(std::chrono::milliseconds amount) -> coro::task<void>
@ -142,6 +95,11 @@ auto io_scheduler::yield_for(std::chrono::milliseconds amount) -> coro::task<voi
}
else
{
// Yield/timeout tasks are considered live in the scheduler and must be accounted for. Note
// that if the user gives an invalid amount and schedule() is directly called it will account
// for the scheduled task there.
m_size.fetch_add(1, std::memory_order::release);
// Yielding does not requiring setting the timer position on the poll info since
// it doesn't have a corresponding 'event' that can trigger, it always waits for
// the timeout to occur before resuming.
@ -149,6 +107,8 @@ auto io_scheduler::yield_for(std::chrono::milliseconds amount) -> coro::task<voi
detail::poll_info pi{};
add_timer_token(clock::now() + amount, pi);
co_await pi;
m_size.fetch_sub(1, std::memory_order::release);
}
co_return;
}
@ -164,17 +124,25 @@ auto io_scheduler::yield_until(time_point time) -> coro::task<void>
}
else
{
m_size.fetch_add(1, std::memory_order::release);
auto amount = std::chrono::duration_cast<std::chrono::milliseconds>(time - now);
detail::poll_info pi{};
add_timer_token(now + amount, pi);
co_await pi;
m_size.fetch_sub(1, std::memory_order::release);
}
co_return;
}
auto io_scheduler::poll(fd_t fd, coro::poll_op op, std::chrono::milliseconds timeout) -> coro::task<poll_status>
{
// Because the size will drop when this coroutine suspends every poll needs to undo the subtraction
// on the number of active tasks in the scheduler. When this task is resumed by the event loop.
m_size.fetch_add(1, std::memory_order::release);
// Setup two events, a timeout event and the actual poll for op event.
// Whichever triggers first will delete the other to guarantee only one wins.
// The resume token will be set by the scheduler to what the event turned out to be.
@ -200,16 +168,30 @@ auto io_scheduler::poll(fd_t fd, coro::poll_op op, std::chrono::milliseconds tim
// The event loop will 'clean-up' whichever event didn't win since the coroutine is scheduled
// onto the thread poll its possible the other type of event could trigger while its waiting
// to execute again, thus restarting the coroutine twice, that would be quite bad.
co_return co_await pi;
auto result = co_await pi;
m_size.fetch_sub(1, std::memory_order::release);
co_return result;
}
auto io_scheduler::shutdown(shutdown_t wait_for_tasks) noexcept -> void
auto io_scheduler::shutdown() noexcept -> void
{
thread_pool::shutdown(wait_for_tasks);
// Only allow shutdown to occur once.
if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
{
if (m_thread_pool != nullptr)
{
m_thread_pool->shutdown();
}
// Signal the event loop to stop asap, triggering the event fd is safe.
uint64_t value{1};
::write(m_shutdown_fd, &value, sizeof(value));
// Signal the event loop to stop asap, triggering the event fd is safe.
uint64_t value{1};
::write(m_shutdown_fd, &value, sizeof(value));
if (m_io_thread.joinable())
{
m_io_thread.join();
}
}
}
auto io_scheduler::process_events_manual(std::chrono::milliseconds timeout) -> void
@ -231,7 +213,7 @@ auto io_scheduler::process_events_dedicated_thread() -> void
m_io_processing.exchange(true, std::memory_order::release);
// Execute tasks until stopped or there are no more tasks to complete.
while (!m_shutdown_requested.load(std::memory_order::relaxed) || m_size.load(std::memory_order::relaxed) > 0)
while (!m_shutdown_requested.load(std::memory_order::acquire) || size() > 0)
{
process_events_execute(m_default_timeout);
}
@ -258,18 +240,45 @@ auto io_scheduler::process_events_execute(std::chrono::milliseconds timeout) ->
// Process all events that have timed out.
process_timeout_execute();
}
else if (handle_ptr == m_schedule_ptr)
{
// Process scheduled coroutines.
process_scheduled_execute_inline();
}
else if (handle_ptr == m_shutdown_ptr) [[unlikely]]
{
// Nothing to do , just needed to wake-up and smell the flowers
}
else
{
// Individual poll task wake-up, this will queue the coroutines waiting
// on the resume token into the FIFO queue for processing.
// Individual poll task wake-up.
process_event_execute(static_cast<detail::poll_info*>(handle_ptr), event_to_poll_status(event.events));
}
}
}
// Its important to not resume any handles until the full set is accounted for. If a timeout
// and an event for the same handle happen in the same epoll_wait() call then inline processing
// will destruct the poll_info object before the second event is handled. This is also possible
// with thread pool processing, but probably has an extremely low chance of occuring due to
// the thread switch required. If m_max_events == 1 this would be unnecessary.
if (!m_handles_to_resume.empty())
{
if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline)
{
for (auto& handle : m_handles_to_resume)
{
handle.resume();
}
}
else
{
m_thread_pool->resume(m_handles_to_resume);
}
m_handles_to_resume.clear();
}
}
auto io_scheduler::event_to_poll_status(uint32_t events) -> poll_status
@ -290,6 +299,30 @@ auto io_scheduler::event_to_poll_status(uint32_t events) -> poll_status
throw std::runtime_error{"invalid epoll state"};
}
auto io_scheduler::process_scheduled_execute_inline() -> void
{
std::vector<std::coroutine_handle<>> tasks{};
{
// Acquire the entire list, and then reset it.
std::scoped_lock lk{m_scheduled_tasks_mutex};
tasks.swap(m_scheduled_tasks);
// Clear the schedule eventfd if this is a scheduled task.
eventfd_t value{0};
eventfd_read(m_shutdown_fd, &value);
// Clear the in memory flag to reduce eventfd_* calls on scheduling.
m_schedule_fd_triggered.exchange(false, std::memory_order::release);
}
// This set of handles can be safely resumed now since they do not have a corresponding timeout event.
for (auto& task : tasks)
{
task.resume();
}
m_size.fetch_sub(tasks.size(), std::memory_order::release);
}
auto io_scheduler::process_event_execute(detail::poll_info* pi, poll_status status) -> void
{
if (!pi->m_processed)
@ -317,7 +350,8 @@ auto io_scheduler::process_event_execute(detail::poll_info* pi, poll_status stat
{
std::atomic_thread_fence(std::memory_order::acquire);
}
resume(pi->m_awaiting_coroutine);
m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine);
}
}
@ -345,8 +379,6 @@ auto io_scheduler::process_timeout_execute() -> void
}
}
std::vector<std::coroutine_handle<>> handles{};
handles.reserve(poll_infos.size());
for (auto pi : poll_infos)
{
if (!pi->m_processed)
@ -367,14 +399,11 @@ auto io_scheduler::process_timeout_execute() -> void
// std::cerr << "process_event_execute() has a nullptr event\n";
}
handles.emplace_back(pi->m_awaiting_coroutine);
m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine);
pi->m_poll_status = coro::poll_status::timeout;
}
}
// Resume all timed out coroutines.
resume(handles);
// Update the time to the next smallest time point, re-take the current now time
// since updating and resuming tasks could shift the time.
update_timeout(clock::now());

View file

@ -145,7 +145,7 @@ auto dns_resolver::ares_poll() -> void
std::vector<coro::task<void>> poll_tasks{};
for (size_t i = 0; i < new_sockets; ++i)
{
auto fd = static_cast<io_scheduler::fd_t>(ares_sockets[i]);
auto fd = static_cast<fd_t>(ares_sockets[i]);
// If this socket is not currently actively polling, start polling!
if (m_active_sockets.emplace(fd).second)
@ -155,7 +155,7 @@ auto dns_resolver::ares_poll() -> void
}
}
auto dns_resolver::make_poll_task(io_scheduler::fd_t fd, poll_op ops) -> coro::task<void>
auto dns_resolver::make_poll_task(fd_t fd, poll_op ops) -> coro::task<void>
{
auto result = co_await m_io_scheduler.poll(fd, ops, m_timeout);
switch (result)

View file

@ -1,221 +0,0 @@
#include "coro/shared_mutex.hpp"
#include "coro/thread_pool.hpp"
namespace coro
{
shared_scoped_lock::~shared_scoped_lock()
{
unlock();
}
auto shared_scoped_lock::unlock() -> void
{
if (m_shared_mutex != nullptr)
{
if (m_exclusive)
{
m_shared_mutex->unlock();
}
else
{
m_shared_mutex->unlock_shared();
}
m_shared_mutex = nullptr;
}
}
shared_mutex::shared_mutex(coro::thread_pool& tp) : m_thread_pool(tp)
{
}
auto shared_mutex::lock_operation::await_ready() const noexcept -> bool
{
if (m_exclusive)
{
return m_shared_mutex.try_lock();
}
else
{
return m_shared_mutex.try_lock_shared();
}
}
auto shared_mutex::lock_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
{
std::unique_lock lk{m_shared_mutex.m_mutex};
// Its possible the lock has been released between await_ready() and await_suspend(), double
// check and make sure we are not going to suspend when nobody holds the lock.
if (m_exclusive)
{
if (m_shared_mutex.try_lock_locked(lk))
{
return false;
}
}
else
{
if (m_shared_mutex.try_lock_shared_locked(lk))
{
return false;
}
}
// For sure the lock is currently held in a manner that it cannot be acquired, suspend ourself
// at the end of the waiter list.
if (m_shared_mutex.m_tail_waiter == nullptr)
{
m_shared_mutex.m_head_waiter = this;
m_shared_mutex.m_tail_waiter = this;
}
else
{
m_shared_mutex.m_tail_waiter->m_next = this;
m_shared_mutex.m_tail_waiter = this;
}
// If this is an exclusive lock acquire then mark it as so so that shared locks after this
// exclusive one will also suspend so this exclusive lock doens't get starved.
if (m_exclusive)
{
++m_shared_mutex.m_exclusive_waiters;
}
m_awaiting_coroutine = awaiting_coroutine;
return true;
}
auto shared_mutex::try_lock_shared() -> bool
{
// To acquire the shared lock the state must be one of two states:
// 1) unlocked
// 2) shared locked with zero exclusive waiters
// Zero exclusive waiters prevents exclusive starvation if shared locks are
// always continuously happening.
std::unique_lock lk{m_mutex};
return try_lock_shared_locked(lk);
}
auto shared_mutex::try_lock() -> bool
{
// To acquire the exclusive lock the state must be unlocked.
std::unique_lock lk{m_mutex};
return try_lock_locked(lk);
}
auto shared_mutex::unlock_shared() -> void
{
std::unique_lock lk{m_mutex};
--m_shared_users;
// Only wake waiters from shared state if all shared users have completed.
if (m_shared_users == 0)
{
if (m_head_waiter != nullptr)
{
wake_waiters(lk);
}
else
{
m_state = state::unlocked;
}
}
}
auto shared_mutex::unlock() -> void
{
std::unique_lock lk{m_mutex};
if (m_head_waiter != nullptr)
{
wake_waiters(lk);
}
else
{
m_state = state::unlocked;
}
}
auto shared_mutex::try_lock_shared_locked(std::unique_lock<std::mutex>& lk) -> bool
{
if (m_state == state::unlocked)
{
// If the shared mutex is unlocked put it into shared mode and add ourself as using the lock.
m_state = state::locked_shared;
++m_shared_users;
lk.unlock();
return true;
}
else if (m_state == state::locked_shared && m_exclusive_waiters == 0)
{
// If the shared mutex is in a shared locked state and there are no exclusive waiters
// the add ourself as using the lock.
++m_shared_users;
lk.unlock();
return true;
}
// If the lock is in shared mode but there are exclusive waiters then we will also wait so
// the writers are not starved.
// If the lock is in exclusive mode already then we need to wait.
return false;
}
auto shared_mutex::try_lock_locked(std::unique_lock<std::mutex>& lk) -> bool
{
if (m_state == state::unlocked)
{
m_state = state::locked_exclusive;
lk.unlock();
return true;
}
return false;
}
auto shared_mutex::wake_waiters(std::unique_lock<std::mutex>& lk) -> void
{
// First determine what the next lock state will be based on the first waiter.
if (m_head_waiter->m_exclusive)
{
// If its exclusive then only this waiter can be woken up.
m_state = state::locked_exclusive;
lock_operation* to_resume = m_head_waiter;
m_head_waiter = m_head_waiter->m_next;
--m_exclusive_waiters;
if (m_head_waiter == nullptr)
{
m_tail_waiter = nullptr;
}
// Since this is an exclusive lock waiting we can resume it directly.
lk.unlock();
to_resume->m_awaiting_coroutine.resume();
}
else
{
// If its shared then we will scan forward and awake all shared waiters onto the given
// thread pool so they can run in parallel.
m_state = state::locked_shared;
do
{
lock_operation* to_resume = m_head_waiter;
m_head_waiter = m_head_waiter->m_next;
if (m_head_waiter == nullptr)
{
m_tail_waiter = nullptr;
}
++m_shared_users;
m_thread_pool.resume(to_resume->m_awaiting_coroutine);
} while (m_head_waiter != nullptr && !m_head_waiter->m_exclusive);
// Cannot unlock until the entire set of shared waiters has been traversed. I think this
// makes more sense than allocating space for all the shared waiters, unlocking, and then
// resuming in a batch?
lk.unlock();
}
}
} // namespace coro

View file

@ -1,141 +0,0 @@
#include "coro/task_container.hpp"
#include "coro/thread_pool.hpp"
#include <iostream>
namespace coro
{
task_container::task_container(thread_pool& tp, const options opts)
: m_growth_factor(opts.growth_factor),
m_thread_pool(tp)
{
m_tasks.resize(opts.reserve_size);
for (std::size_t i = 0; i < opts.reserve_size; ++i)
{
m_task_indexes.emplace_back(i);
}
m_free_pos = m_task_indexes.begin();
}
task_container::~task_container()
{
// This will hang the current thread.. but if tasks are not complete thats also pretty bad.
while (!empty())
{
garbage_collect();
}
}
auto task_container::start(coro::task<void> user_task, garbage_collect_t cleanup) -> void
{
m_size.fetch_add(1, std::memory_order::relaxed);
std::scoped_lock lk{m_mutex};
if (cleanup == garbage_collect_t::yes)
{
gc_internal();
}
// Only grow if completely full and attempting to add more.
if (m_free_pos == m_task_indexes.end())
{
m_free_pos = grow();
}
// Store the task inside a cleanup task for self deletion.
auto index = *m_free_pos;
m_tasks[index] = make_cleanup_task(std::move(user_task), m_free_pos);
// Mark the current used slot as used.
std::advance(m_free_pos, 1);
// Start executing from the cleanup task to schedule the user's task onto the thread pool.
m_tasks[index].resume();
}
auto task_container::garbage_collect() -> std::size_t
{
std::scoped_lock lk{m_mutex};
return gc_internal();
}
auto task_container::garbage_collect_and_yield_until_empty() -> coro::task<void>
{
while (!empty())
{
garbage_collect();
co_await m_thread_pool.yield();
}
}
auto task_container::grow() -> task_position
{
// Save an index at the current last item.
auto last_pos = std::prev(m_task_indexes.end());
std::size_t new_size = m_tasks.size() * m_growth_factor;
for (std::size_t i = m_tasks.size(); i < new_size; ++i)
{
m_task_indexes.emplace_back(i);
}
m_tasks.resize(new_size);
// Set the free pos to the item just after the previous last item.
return std::next(last_pos);
}
auto task_container::gc_internal() -> std::size_t
{
std::size_t deleted{0};
if (!m_tasks_to_delete.empty())
{
for (const auto& pos : m_tasks_to_delete)
{
// This doesn't actually 'delete' the task, it'll get overwritten when a
// new user task claims the free space. It could be useful to actually
// delete the tasks so the coroutine stack frames are destroyed. The advantage
// of letting a new task replace and old one though is that its a 1:1 exchange
// on delete and create, rather than a large pause here to delete all the
// completed tasks.
// Put the deleted position at the end of the free indexes list.
m_task_indexes.splice(m_task_indexes.end(), m_task_indexes, pos);
}
deleted = m_tasks_to_delete.size();
m_tasks_to_delete.clear();
}
return deleted;
}
auto task_container::make_cleanup_task(task<void> user_task, task_position pos) -> coro::task<void>
{
// Immediately move the task onto the thread pool.
co_await m_thread_pool.schedule();
try
{
// Await the users task to complete.
co_await user_task;
}
catch (const std::exception& e)
{
// TODO: what would be a good way to report this to the user...? Catching here is required
// since the co_await will unwrap the unhandled exception on the task.
// The user's task should ideally be wrapped in a catch all and handle it themselves, but
// that cannot be guaranteed.
std::cerr << "coro::task_container user_task had an unhandled exception e.what()= " << e.what() << "\n";
}
catch (...)
{
// don't crash if they throw something that isn't derived from std::exception
std::cerr << "coro::task_container user_task had unhandle exception, not derived from std::exception.\n";
}
std::scoped_lock lk{m_mutex};
m_tasks_to_delete.push_back(pos);
// This has to be done within scope lock to make sure this coroutine task completes before the
// task container object destructs -- if it was waiting on .empty() to become true.
m_size.fetch_sub(1, std::memory_order::relaxed);
co_return;
}
} // namespace coro

View file

@ -1,5 +1,7 @@
#include "coro/thread_pool.hpp"
#include <iostream>
namespace coro
{
thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp)
@ -35,30 +37,39 @@ auto thread_pool::schedule() -> operation
{
if (!m_shutdown_requested.load(std::memory_order::relaxed))
{
m_size.fetch_add(1, std::memory_order::relaxed);
m_size.fetch_add(1, std::memory_order::release);
return operation{*this};
}
throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks.");
}
auto thread_pool::shutdown(shutdown_t wait_for_tasks) noexcept -> void
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> void
{
if (!m_shutdown_requested.exchange(true, std::memory_order::release))
if (handle == nullptr)
{
return;
}
m_size.fetch_add(1, std::memory_order::release);
schedule_impl(handle);
}
auto thread_pool::shutdown() noexcept -> void
{
// Only allow shutdown to occur once.
if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false)
{
for (auto& thread : m_threads)
{
thread.request_stop();
}
if (wait_for_tasks == shutdown_t::sync)
for (auto& thread : m_threads)
{
for (auto& thread : m_threads)
if (thread.joinable())
{
if (thread.joinable())
{
thread.join();
}
thread.join();
}
}
}
@ -115,37 +126,4 @@ auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void
m_wait_cv.notify_one();
}
auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> void
{
if (handle == nullptr)
{
return;
}
m_size.fetch_add(1, std::memory_order::relaxed);
schedule_impl(handle);
}
auto thread_pool::resume(const std::vector<std::coroutine_handle<>>& handles) noexcept -> void
{
m_size.fetch_add(handles.size(), std::memory_order::relaxed);
{
std::scoped_lock lk{m_wait_mutex};
for (const auto& handle : handles)
{
if (handle != nullptr) [[likely]]
{
m_queue.emplace_back(handle);
}
else
{
m_size.fetch_sub(1, std::memory_order::release);
}
}
}
m_wait_cv.notify_one();
}
} // namespace coro

View file

@ -337,19 +337,19 @@ TEST_CASE("benchmark counter task scheduler await event from another coroutine",
REQUIRE(s.empty());
}
TEST_CASE("benchmark tcp_server echo server", "[benchmark]")
TEST_CASE("benchmark tcp_server echo server thread pool", "[benchmark]")
{
const constexpr std::size_t connections = 16;
const constexpr std::size_t messages_per_connection = 10'000;
const constexpr std::size_t connections = 100;
const constexpr std::size_t messages_per_connection = 1'000;
const constexpr std::size_t ops = connections * messages_per_connection;
const std::string msg = "im a data point in a stream of bytes";
const constexpr std::size_t server_count = 1;
const constexpr std::size_t client_count = 1;
const constexpr std::size_t server_count = 5;
const constexpr std::size_t client_count = 5;
const constexpr std::size_t server_thread_count = 1;
const constexpr std::size_t client_thread_count = 1;
const constexpr std::size_t server_thread_count = 4;
const constexpr std::size_t client_thread_count = 4;
std::atomic<uint64_t> listening{0};
std::atomic<uint64_t> accepted{0};
@ -359,18 +359,20 @@ TEST_CASE("benchmark tcp_server echo server", "[benchmark]")
struct server
{
uint64_t id;
coro::io_scheduler scheduler{
coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = server_thread_count}}};
coro::task_container task_container{scheduler};
uint64_t live_clients{0};
coro::event wait_for_clients{};
uint64_t id;
coro::io_scheduler scheduler{coro::io_scheduler::options{
.pool = coro::thread_pool::options{.thread_count = server_thread_count},
.execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool}};
coro::task_container<coro::io_scheduler> task_container{scheduler};
uint64_t live_clients{0};
coro::event wait_for_clients{};
};
struct client
{
coro::io_scheduler scheduler{
coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = client_thread_count}}};
coro::io_scheduler scheduler{coro::io_scheduler::options{
.pool = coro::thread_pool::options{.thread_count = client_thread_count},
.execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool}};
std::vector<coro::task<void>> tasks{};
};
@ -526,10 +528,204 @@ TEST_CASE("benchmark tcp_server echo server", "[benchmark]")
}
auto stop = sc::now();
print_stats("benchmark tcp_client and tcp_server", ops, start, stop);
print_stats("benchmark tcp_client and tcp_server thread_pool", ops, start, stop);
for (const auto& [ms, count] : g_histogram)
{
std::cerr << ms.count() << " : " << count << "\n";
}
}
TEST_CASE("benchmark tcp_server echo server inline", "[benchmark]")
{
const constexpr std::size_t connections = 100;
const constexpr std::size_t messages_per_connection = 1'000;
const constexpr std::size_t ops = connections * messages_per_connection;
const std::string msg = "im a data point in a stream of bytes";
const constexpr std::size_t server_count = 10;
const constexpr std::size_t client_count = 10;
std::atomic<uint64_t> listening{0};
std::atomic<uint64_t> accepted{0};
std::atomic<uint64_t> clients_completed{0};
std::atomic<uint64_t> server_id{0};
using estrat = coro::io_scheduler::execution_strategy_t;
struct server
{
uint64_t id;
coro::io_scheduler scheduler{coro::io_scheduler::options{.execution_strategy = estrat::process_tasks_inline}};
coro::task_container<coro::io_scheduler> task_container{scheduler};
uint64_t live_clients{0};
coro::event wait_for_clients{};
};
struct client
{
coro::io_scheduler scheduler{coro::io_scheduler::options{.execution_strategy = estrat::process_tasks_inline}};
std::vector<coro::task<void>> tasks{};
};
auto make_on_connection_task = [&](server& s, coro::net::tcp_client client) -> coro::task<void> {
std::string in(64, '\0');
// Echo the messages until the socket is closed.
while (true)
{
auto pstatus = co_await client.poll(coro::poll_op::read);
REQUIRE(pstatus == coro::poll_status::event);
auto [rstatus, rspan] = client.recv(in);
if (rstatus == coro::net::recv_status::closed)
{
REQUIRE(rspan.empty());
break;
}
REQUIRE(rstatus == coro::net::recv_status::ok);
in.resize(rspan.size());
auto [sstatus, remaining] = client.send(in);
REQUIRE(sstatus == coro::net::send_status::ok);
REQUIRE(remaining.empty());
}
s.live_clients--;
if (s.live_clients == 0)
{
s.wait_for_clients.set();
}
co_return;
};
auto make_server_task = [&](server& s) -> coro::task<void> {
co_await s.scheduler.schedule();
coro::net::tcp_server server{s.scheduler};
listening++;
while (accepted.load(std::memory_order::acquire) < connections)
{
auto pstatus = co_await server.poll(std::chrono::milliseconds{1});
if (pstatus == coro::poll_status::event)
{
auto c = server.accept();
if (c.socket().is_valid())
{
accepted.fetch_add(1, std::memory_order::release);
s.live_clients++;
s.task_container.start(make_on_connection_task(s, std::move(c)));
}
}
}
co_await s.wait_for_clients;
co_return;
};
std::mutex g_histogram_mutex;
std::map<std::chrono::milliseconds, uint64_t> g_histogram;
auto make_client_task = [&](client& c) -> coro::task<void> {
co_await c.scheduler.schedule();
std::map<std::chrono::milliseconds, uint64_t> histogram;
coro::net::tcp_client client{c.scheduler};
auto cstatus = co_await client.connect(); // std::chrono::seconds{1});
REQUIRE(cstatus == coro::net::connect_status::connected);
for (size_t i = 1; i <= messages_per_connection; ++i)
{
auto req_start = std::chrono::steady_clock::now();
auto [sstatus, remaining] = client.send(msg);
REQUIRE(sstatus == coro::net::send_status::ok);
REQUIRE(remaining.empty());
auto pstatus = co_await client.poll(coro::poll_op::read);
REQUIRE(pstatus == coro::poll_status::event);
std::string response(64, '\0');
auto [rstatus, rspan] = client.recv(response);
REQUIRE(rstatus == coro::net::recv_status::ok);
REQUIRE(rspan.size() == msg.size());
response.resize(rspan.size());
REQUIRE(response == msg);
auto req_stop = std::chrono::steady_clock::now();
histogram[std::chrono::duration_cast<std::chrono::milliseconds>(req_stop - req_start)]++;
}
{
std::scoped_lock lk{g_histogram_mutex};
for (auto [ms, count] : histogram)
{
g_histogram[ms] += count;
}
}
clients_completed.fetch_add(1);
co_return;
};
auto start = sc::now();
// Create the server to accept incoming tcp connections.
std::vector<std::thread> server_threads{};
for (size_t i = 0; i < server_count; ++i)
{
server_threads.emplace_back(std::thread{[&]() {
server s{};
s.id = server_id++;
coro::sync_wait(make_server_task(s));
s.scheduler.shutdown();
}});
}
// The server can take a small bit of time to start up, if we don't wait for it to notify then
// the first few connections can easily fail to connect causing this test to fail.
while (listening != server_count)
{
std::this_thread::sleep_for(std::chrono::milliseconds{1});
}
// Spawn N client connections across a set number of clients.
std::vector<std::thread> client_threads{};
std::vector<client> clients{};
for (size_t i = 0; i < client_count; ++i)
{
client_threads.emplace_back(std::thread{[&]() {
client c{};
for (size_t i = 0; i < connections / client_count; ++i)
{
c.tasks.emplace_back(make_client_task(c));
}
coro::sync_wait(coro::when_all(std::move(c.tasks)));
c.scheduler.shutdown();
}});
}
for (auto& ct : client_threads)
{
ct.join();
}
for (auto& st : server_threads)
{
st.join();
}
auto stop = sc::now();
print_stats("benchmark tcp_client and tcp_server inline", ops, start, stop);
for (const auto& [ms, count] : g_histogram)
{
std::cerr << ms.count() << " : " << count << "\n";
}
}

View file

@ -4,7 +4,7 @@
#include <chrono>
TEST_CASE("dns_resolver basic")
TEST_CASE("dns_resolver basic", "[dns]")
{
coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
coro::net::dns_resolver dns_resolver{scheduler, std::chrono::milliseconds{5000}};
@ -26,6 +26,8 @@ TEST_CASE("dns_resolver basic")
coro::sync_wait(make_host_by_name_task(coro::net::hostname{"www.example.com"}));
std::cerr << "io_scheduler.size() before shutdown = " << scheduler.size() << "\n";
scheduler.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << scheduler.size() << "\n";
REQUIRE(scheduler.empty());
}

View file

@ -27,7 +27,9 @@ TEST_CASE("io_scheduler schedule single task", "[io_scheduler]")
auto value = coro::sync_wait(make_task());
REQUIRE(value == 42);
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
}
@ -52,6 +54,11 @@ TEST_CASE("io_scheduler submit mutiple tasks", "[io_scheduler]")
coro::sync_wait(coro::when_all(std::move(tasks)));
REQUIRE(counter == n);
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
}
TEST_CASE("io_scheduler task with multiple events", "[io_scheduler]")
@ -83,7 +90,9 @@ TEST_CASE("io_scheduler task with multiple events", "[io_scheduler]")
REQUIRE(counter == 3);
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
}
@ -108,7 +117,9 @@ TEST_CASE("io_scheduler task with read poll", "[io_scheduler]")
coro::sync_wait(coro::when_all(make_poll_read_task(), make_poll_write_task()));
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
close(trigger_fd);
}
@ -135,7 +146,9 @@ TEST_CASE("io_scheduler task with read poll with timeout", "[io_scheduler]")
coro::sync_wait(coro::when_all(make_poll_read_task(), make_poll_write_task()));
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
close(trigger_fd);
}
@ -155,7 +168,9 @@ TEST_CASE("io_scheduler task with read poll timeout", "[io_scheduler]")
coro::sync_wait(make_task());
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
close(trigger_fd);
}
@ -289,7 +304,9 @@ TEST_CASE("io_scheduler separate thread resume with return", "[io_scheduler]")
coro::sync_wait(make_task());
service.join();
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
}
@ -317,6 +334,11 @@ TEST_CASE("io_scheduler with basic task", "[io_scheduler]")
auto counter = coro::sync_wait(func());
REQUIRE(counter == expected_value);
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
}
TEST_CASE("io_scheduler scheduler_after", "[io_scheduler]")
@ -344,7 +366,9 @@ TEST_CASE("io_scheduler scheduler_after", "[io_scheduler]")
REQUIRE(counter == 1);
REQUIRE(duration < wait_for);
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
}
@ -360,7 +384,9 @@ TEST_CASE("io_scheduler scheduler_after", "[io_scheduler]")
REQUIRE(counter == 2);
REQUIRE(duration >= wait_for);
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
}
}
@ -432,6 +458,11 @@ TEST_CASE("io_scheduler yield", "[io_scheduler]")
};
coro::sync_wait(func());
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
}
TEST_CASE("io_scheduler yield_for", "[io_scheduler]")
@ -449,6 +480,11 @@ TEST_CASE("io_scheduler yield_for", "[io_scheduler]")
auto duration = coro::sync_wait(make_task());
REQUIRE(duration >= wait_for);
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
}
TEST_CASE("io_scheduler yield_until", "[io_scheduler]")
@ -468,6 +504,11 @@ TEST_CASE("io_scheduler yield_until", "[io_scheduler]")
auto duration = coro::sync_wait(make_task());
REQUIRE(duration >= (wait_for - epsilon));
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
}
TEST_CASE("io_scheduler multipler event waiters", "[io_scheduler]")
@ -505,6 +546,11 @@ TEST_CASE("io_scheduler multipler event waiters", "[io_scheduler]")
};
coro::sync_wait(coro::when_all(spawn(), release()));
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
}
TEST_CASE("io_scheduler self generating coroutine (stack overflow check)", "[io_scheduler]")
@ -541,9 +587,14 @@ TEST_CASE("io_scheduler self generating coroutine (stack overflow check)", "[io_
}
REQUIRE(tasks.size() == total - 1);
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
}
TEST_CASE("io_scheduler manual process events", "[io_scheduler]")
TEST_CASE("io_scheduler manual process events thread pool", "[io_scheduler]")
{
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::io_scheduler s{coro::io_scheduler::options{
@ -555,17 +606,23 @@ TEST_CASE("io_scheduler manual process events", "[io_scheduler]")
std::atomic<bool> polling{false};
auto make_poll_read_task = [&]() -> coro::task<void> {
std::cerr << "poll task start s.size() == " << s.size() << "\n";
co_await s.schedule();
polling = true;
polling = true;
std::cerr << "poll task polling s.size() == " << s.size() << "\n";
auto status = co_await s.poll(trigger_fd, coro::poll_op::read);
REQUIRE(status == coro::poll_status::event);
std::cerr << "poll task exiting s.size() == " << s.size() << "\n";
co_return;
};
auto make_poll_write_task = [&]() -> coro::task<void> {
std::cerr << "write task start s.size() == " << s.size() << "\n";
co_await s.schedule();
uint64_t value{42};
std::cerr << "write task writing s.size() == " << s.size() << "\n";
write(trigger_fd, &value, sizeof(value));
std::cerr << "write task exiting s.size() == " << s.size() << "\n";
co_return;
};
@ -580,9 +637,64 @@ TEST_CASE("io_scheduler manual process events", "[io_scheduler]")
write_task.resume();
REQUIRE(s.process_events(100ms) == 1);
while (s.process_events(100ms) > 0)
;
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
close(trigger_fd);
}
TEST_CASE("io_scheduler manual process events inline", "[io_scheduler]")
{
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::io_scheduler s{coro::io_scheduler::options{
.thread_strategy = coro::io_scheduler::thread_strategy_t::manual,
.execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_inline}};
auto make_poll_read_task = [&]() -> coro::task<void> {
std::cerr << "poll task start s.size() == " << s.size() << "\n";
co_await s.schedule();
std::cerr << "poll task polling s.size() == " << s.size() << "\n";
auto status = co_await s.poll(trigger_fd, coro::poll_op::read);
REQUIRE(status == coro::poll_status::event);
std::cerr << "poll task exiting s.size() == " << s.size() << "\n";
co_return;
};
auto make_poll_write_task = [&]() -> coro::task<void> {
std::cerr << "write task start s.size() == " << s.size() << "\n";
co_await s.schedule();
uint64_t value{42};
std::cerr << "write task writing s.size() == " << s.size() << "\n";
write(trigger_fd, &value, sizeof(value));
std::cerr << "write task exiting s.size() == " << s.size() << "\n";
co_return;
};
auto poll_task = make_poll_read_task();
auto write_task = make_poll_write_task();
// Start the tasks by scheduling them into the io scheduler.
poll_task.resume();
write_task.resume();
// Now process them to completion.
while (true)
{
auto remaining = s.process_events(100ms);
std::cerr << "remaining " << remaining << "\n";
if (remaining == 0)
{
break;
}
};
std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n";
s.shutdown();
std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n";
REQUIRE(s.empty());
close(trigger_fd);
}

View file

@ -10,9 +10,9 @@ TEST_CASE("mutex single waiter not locked exclusive", "[shared_mutex]")
coro::thread_pool tp{coro::thread_pool::options{.thread_count = 1}};
std::vector<uint64_t> output;
coro::shared_mutex m{tp};
coro::shared_mutex<coro::thread_pool> m{tp};
auto make_emplace_task = [&](coro::shared_mutex& m) -> coro::task<void> {
auto make_emplace_task = [&](coro::shared_mutex<coro::thread_pool>& m) -> coro::task<void> {
std::cerr << "Acquiring lock exclusive\n";
{
auto scoped_lock = co_await m.lock();
@ -44,9 +44,9 @@ TEST_CASE("mutex single waiter not locked shared", "[shared_mutex]")
coro::thread_pool tp{coro::thread_pool::options{.thread_count = 1}};
std::vector<uint64_t> values{1, 2, 3};
coro::shared_mutex m{tp};
coro::shared_mutex<coro::thread_pool> m{tp};
auto make_emplace_task = [&](coro::shared_mutex& m) -> coro::task<void> {
auto make_emplace_task = [&](coro::shared_mutex<coro::thread_pool>& m) -> coro::task<void> {
std::cerr << "Acquiring lock shared\n";
{
auto scoped_lock = co_await m.lock_shared();
@ -81,7 +81,7 @@ TEST_CASE("mutex single waiter not locked shared", "[shared_mutex]")
TEST_CASE("mutex many shared and exclusive waiters interleaved", "[shared_mutex]")
{
coro::io_scheduler tp{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 8}}};
coro::shared_mutex m{tp};
coro::shared_mutex<coro::io_scheduler> m{tp};
std::atomic<bool> read_value{false};

View file

@ -125,7 +125,7 @@ TEST_CASE("thread_pool shutdown", "[thread_pool]")
co_return false;
};
tp.shutdown(coro::shutdown_t::async);
tp.shutdown();
REQUIRE(coro::sync_wait(f(tp)) == true);
}