diff --git a/CMakeLists.txt b/CMakeLists.txt index e4202e7..ff4fd39 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,9 +2,9 @@ cmake_minimum_required(VERSION 3.0) project(libcoro CXX) # Set the githooks directory to auto format and update the readme. -message("git config core.hooksPath .githooks") +message("${PROJECT_NAME} ${CMAKE_CURRENT_SOURCE_DIR} -> git config --local core.hooksPath .githooks") execute_process( - COMMAND git config core.hooksPath .githooks + COMMAND git config --local core.hooksPath .githooks WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} ) @@ -25,8 +25,11 @@ add_subdirectory(vendor/c-ares/c-ares) set(LIBCORO_SOURCE_FILES inc/coro/concepts/awaitable.hpp inc/coro/concepts/buffer.hpp + inc/coro/concepts/executor.hpp inc/coro/concepts/promise.hpp + inc/coro/concepts/range_of.hpp + inc/coro/detail/poll_info.hpp inc/coro/detail/void_value.hpp inc/coro/net/connect.hpp src/net/connect.cpp @@ -44,6 +47,7 @@ set(LIBCORO_SOURCE_FILES inc/coro/coro.hpp inc/coro/event.hpp src/event.cpp + inc/coro/fd.hpp inc/coro/generator.hpp inc/coro/io_scheduler.hpp src/io_scheduler.cpp inc/coro/latch.hpp @@ -51,11 +55,10 @@ set(LIBCORO_SOURCE_FILES inc/coro/poll.hpp inc/coro/ring_buffer.hpp inc/coro/semaphore.hpp src/semaphore.cpp - inc/coro/shared_mutex.hpp src/shared_mutex.cpp - inc/coro/shutdown.hpp + inc/coro/shared_mutex.hpp inc/coro/stop_signal.hpp inc/coro/sync_wait.hpp src/sync_wait.cpp - inc/coro/task_container.hpp src/task_container.cpp + inc/coro/task_container.hpp inc/coro/task.hpp inc/coro/thread_pool.hpp src/thread_pool.cpp inc/coro/when_all.hpp diff --git a/inc/coro/concepts/awaitable.hpp b/inc/coro/concepts/awaitable.hpp index 264ade0..d3bb16c 100644 --- a/inc/coro/concepts/awaitable.hpp +++ b/inc/coro/concepts/awaitable.hpp @@ -23,7 +23,7 @@ concept awaiter = requires(type t, std::coroutine_handle<> c) std::same_as || std::same_as || std::same_as>; - {t.await_resume()}; + { t.await_resume() }; }; /** diff --git a/inc/coro/concepts/executor.hpp b/inc/coro/concepts/executor.hpp new file mode 100644 index 0000000..d413291 --- /dev/null +++ b/inc/coro/concepts/executor.hpp @@ -0,0 +1,29 @@ +#pragma once + +#include "coro/concepts/awaitable.hpp" + +#include +#include +// #include +// #include + +namespace coro::concepts +{ +template +concept executor = requires(type t, std::coroutine_handle<> c) +{ + { + t.schedule() + } + ->coro::concepts::awaiter; + { + t.yield() + } + ->coro::concepts::awaiter; + { + t.resume(c) + } + ->std::same_as; +}; + +} // namespace coro::concepts diff --git a/inc/coro/concepts/range_of.hpp b/inc/coro/concepts/range_of.hpp new file mode 100644 index 0000000..de67e94 --- /dev/null +++ b/inc/coro/concepts/range_of.hpp @@ -0,0 +1,20 @@ +#pragma once + +#include +#include + +namespace coro::concepts +{ +/** + * Concept to require that the range contains a specific type of value. + */ +template +concept range_of = std::ranges::range&& std::is_same_v>; + +/** + * Concept to require that a sized range contains a specific type of value. + */ +template +concept sized_range_of = std::ranges::sized_range&& std::is_same_v>; + +} // namespace coro::concepts diff --git a/inc/coro/coro.hpp b/inc/coro/coro.hpp index 67cb155..b8d7b0d 100644 --- a/inc/coro/coro.hpp +++ b/inc/coro/coro.hpp @@ -2,7 +2,9 @@ #include "coro/concepts/awaitable.hpp" #include "coro/concepts/buffer.hpp" +#include "coro/concepts/executor.hpp" #include "coro/concepts/promise.hpp" +#include "coro/concepts/range_of.hpp" #include "coro/net/connect.hpp" #include "coro/net/dns_resolver.hpp" @@ -25,7 +27,6 @@ #include "coro/ring_buffer.hpp" #include "coro/semaphore.hpp" #include "coro/shared_mutex.hpp" -#include "coro/shutdown.hpp" #include "coro/stop_signal.hpp" #include "coro/sync_wait.hpp" #include "coro/task.hpp" diff --git a/inc/coro/detail/poll_info.hpp b/inc/coro/detail/poll_info.hpp new file mode 100644 index 0000000..c2ac69f --- /dev/null +++ b/inc/coro/detail/poll_info.hpp @@ -0,0 +1,74 @@ +#pragma once + +#include "coro/fd.hpp" +#include "coro/poll.hpp" + +#include +#include +#include +#include +#include + +namespace coro::detail +{ +/** + * Poll Info encapsulates everything about a poll operation for the event as well as its paired + * timeout. This is important since coroutines that are waiting on an event or timeout do not + * immediately execute, they are re-scheduled onto the thread pool, so its possible its pair + * event or timeout also triggers while the coroutine is still waiting to resume. This means that + * the first one to happen, the event itself or its timeout, needs to disable the other pair item + * prior to resuming the coroutine. + * + * Finally, its also important to note that the event and its paired timeout could happen during + * the same epoll_wait and possibly trigger the coroutine to start twice. Only one can win, so the + * first one processed sets m_processed to true and any subsequent events in the same epoll batch + * are effectively discarded. + */ +struct poll_info +{ + using clock = std::chrono::steady_clock; + using time_point = clock::time_point; + using timed_events = std::multimap; + + poll_info() = default; + ~poll_info() = default; + + poll_info(const poll_info&) = delete; + poll_info(poll_info&&) = delete; + auto operator=(const poll_info&) -> poll_info& = delete; + auto operator=(poll_info&&) -> poll_info& = delete; + + struct poll_awaiter + { + explicit poll_awaiter(poll_info& pi) noexcept : m_pi(pi) {} + + auto await_ready() const noexcept -> bool { return false; } + auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void + { + m_pi.m_awaiting_coroutine = awaiting_coroutine; + std::atomic_thread_fence(std::memory_order::release); + } + auto await_resume() noexcept -> coro::poll_status { return m_pi.m_poll_status; } + + poll_info& m_pi; + }; + + auto operator co_await() noexcept -> poll_awaiter { return poll_awaiter{*this}; } + + /// The file descriptor being polled on. This is needed so that if the timeout occurs first then + /// the event loop can immediately disable the event within epoll. + fd_t m_fd{-1}; + /// The timeout's position in the timeout map. A poll() with no timeout or yield() this is empty. + /// This is needed so that if the event occurs first then the event loop can immediately disable + /// the timeout within epoll. + std::optional m_timer_pos{std::nullopt}; + /// The awaiting coroutine for this poll info to resume upon event or timeout. + std::coroutine_handle<> m_awaiting_coroutine; + /// The status of the poll operation. + coro::poll_status m_poll_status{coro::poll_status::error}; + /// Did the timeout and event trigger at the same time on the same epoll_wait call? + /// Once this is set to true all future events on this poll info are null and void. + bool m_processed{false}; +}; + +} // namespace coro::detail diff --git a/inc/coro/event.hpp b/inc/coro/event.hpp index b0c9814..8c8c343 100644 --- a/inc/coro/event.hpp +++ b/inc/coro/event.hpp @@ -1,5 +1,7 @@ #pragma once +#include "coro/concepts/executor.hpp" + #include #include @@ -27,36 +29,6 @@ t2: resume() class event { public: - /** - * Creates an event with the given initial state of being set or not set. - * @param initially_set By default all events start as not set, but if needed this parameter can - * set the event to already be triggered. - */ - explicit event(bool initially_set = false) noexcept; - ~event() = default; - - event(const event&) = delete; - event(event&&) = delete; - auto operator=(const event&) -> event& = delete; - auto operator=(event&&) -> event& = delete; - - /** - * @return True if this event is currently in the set state. - */ - auto is_set() const noexcept -> bool { return m_state.load(std::memory_order_acquire) == this; } - - /** - * Sets this event and resumes all awaiters. Note that all waiters will be resumed onto this - * thread of execution. - */ - auto set() noexcept -> void; - - /** - * Sets this event and resumes all awaiters onto the given thread pool. This will distribute - * the waiters across the thread pools threads. - */ - auto set(coro::thread_pool& tp) noexcept -> void; - struct awaiter { /** @@ -90,6 +62,50 @@ public: awaiter* m_next{nullptr}; }; + /** + * Creates an event with the given initial state of being set or not set. + * @param initially_set By default all events start as not set, but if needed this parameter can + * set the event to already be triggered. + */ + explicit event(bool initially_set = false) noexcept; + ~event() = default; + + event(const event&) = delete; + event(event&&) = delete; + auto operator=(const event&) -> event& = delete; + auto operator=(event&&) -> event& = delete; + + /** + * @return True if this event is currently in the set state. + */ + auto is_set() const noexcept -> bool { return m_state.load(std::memory_order_acquire) == this; } + + /** + * Sets this event and resumes all awaiters. Note that all waiters will be resumed onto this + * thread of execution. + */ + auto set() noexcept -> void; + + /** + * Sets this event and resumes all awaiters onto the given thread pool. This will distribute + * the waiters across the thread pools threads. + */ + template + auto set(executor_type& e) noexcept -> void + { + void* old_value = m_state.exchange(this, std::memory_order::acq_rel); + if (old_value != this) + { + auto* waiters = static_cast(old_value); + while (waiters != nullptr) + { + auto* next = waiters->m_next; + e.resume(waiters->m_awaiting_coroutine); + waiters = next; + } + } + } + /** * @return An awaiter struct to suspend and resume this coroutine for when the event is set. */ diff --git a/inc/coro/fd.hpp b/inc/coro/fd.hpp new file mode 100644 index 0000000..aa91712 --- /dev/null +++ b/inc/coro/fd.hpp @@ -0,0 +1,7 @@ +#pragma once + +namespace coro +{ +using fd_t = int; + +} // namespace coro diff --git a/inc/coro/io_scheduler.hpp b/inc/coro/io_scheduler.hpp index f3bc1a4..27352cc 100644 --- a/inc/coro/io_scheduler.hpp +++ b/inc/coro/io_scheduler.hpp @@ -1,6 +1,7 @@ #pragma once -#include "coro/event.hpp" +#include "coro/detail/poll_info.hpp" +#include "coro/fd.hpp" #include "coro/net/socket.hpp" #include "coro/poll.hpp" #include "coro/thread_pool.hpp" @@ -8,35 +9,44 @@ #include #include #include +#include +#include #include #include namespace coro { -namespace detail +class io_scheduler { -class poll_info; -} // namespace detail - -class io_scheduler : public coro::thread_pool -{ - friend detail::poll_info; - - using clock = std::chrono::steady_clock; - using time_point = clock::time_point; - using timed_events = std::multimap; + using clock = detail::poll_info::clock; + using time_point = detail::poll_info::time_point; + using timed_events = detail::poll_info::timed_events; public: - using fd_t = int; + class schedule_operation; + friend schedule_operation; enum class thread_strategy_t { - /// Spawns a background thread for the scheduler to run on. + /// Spawns a dedicated background thread for the scheduler to run on. spawn, - /// Requires the user to call process_events() to drive the scheduler + /// Requires the user to call process_events() to drive the scheduler. manual }; + enum class execution_strategy_t + { + /// Tasks will be FIFO queued to be executed on a thread pool. This is better for tasks that + /// are long lived and will use lots of CPU because long lived tasks will block other i/o + /// operations while they complete. This strategy is generally better for lower latency + /// requirements at the cost of throughput. + process_tasks_on_thread_pool, + /// Tasks will be executed inline on the io scheduler thread. This is better for short tasks + /// that can be quickly processed and not block other i/o operations for very long. This + /// strategy is generally better for higher throughput at the cost of latency. + process_tasks_inline + }; + struct options { /// Should the io scheduler spawn a dedicated event processor? @@ -50,6 +60,10 @@ public: .thread_count = ((std::thread::hardware_concurrency() > 1) ? (std::thread::hardware_concurrency() - 1) : 1), .on_thread_start_functor = nullptr, .on_thread_stop_functor = nullptr}; + + /// If inline task processing is enabled then the io worker will resume tasks on its thread + /// rather than scheduling them to be picked up by the thread pool. + const execution_strategy_t execution_strategy{execution_strategy_t::process_tasks_on_thread_pool}; }; explicit io_scheduler( @@ -57,18 +71,19 @@ public: .thread_strategy = thread_strategy_t::spawn, .on_io_thread_start_functor = nullptr, .on_io_thread_stop_functor = nullptr, - .pool = { - .thread_count = - ((std::thread::hardware_concurrency() > 1) ? (std::thread::hardware_concurrency() - 1) : 1), - .on_thread_start_functor = nullptr, - .on_thread_stop_functor = nullptr}}); + .pool = + {.thread_count = + ((std::thread::hardware_concurrency() > 1) ? (std::thread::hardware_concurrency() - 1) : 1), + .on_thread_start_functor = nullptr, + .on_thread_stop_functor = nullptr}, + .execution_strategy = execution_strategy_t::process_tasks_on_thread_pool}); io_scheduler(const io_scheduler&) = delete; io_scheduler(io_scheduler&&) = delete; auto operator=(const io_scheduler&) -> io_scheduler& = delete; auto operator=(io_scheduler&&) -> io_scheduler& = delete; - virtual ~io_scheduler() override; + ~io_scheduler(); /** * Given a thread_strategy_t::manual this function should be called at regular intervals to @@ -82,6 +97,61 @@ public: */ auto process_events(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> std::size_t; + class schedule_operation + { + friend class io_scheduler; + explicit schedule_operation(io_scheduler& scheduler) noexcept : m_scheduler(scheduler) {} + + public: + /** + * Operations always pause so the executing thread can be switched. + */ + auto await_ready() noexcept -> bool { return false; } + + /** + * Suspending always returns to the caller (using void return of await_suspend()) and + * stores the coroutine internally for the executing thread to resume from. + */ + auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void + { + if (m_scheduler.m_opts.execution_strategy == execution_strategy_t::process_tasks_inline) + { + m_scheduler.m_size.fetch_add(1, std::memory_order::release); + { + std::scoped_lock lk{m_scheduler.m_scheduled_tasks_mutex}; + m_scheduler.m_scheduled_tasks.emplace_back(awaiting_coroutine); + } + + // Trigger the event to wake-up the scheduler if this event isn't currently triggered. + bool expected{false}; + if (m_scheduler.m_schedule_fd_triggered.compare_exchange_strong( + expected, true, std::memory_order::release, std::memory_order::relaxed)) + { + eventfd_t value{1}; + eventfd_write(m_scheduler.m_schedule_fd, value); + } + } + else + { + m_scheduler.m_thread_pool->resume(awaiting_coroutine); + } + } + + /** + * no-op as this is the function called first by the thread pool's executing thread. + */ + auto await_resume() noexcept -> void {} + + private: + /// The thread pool that this operation will execute on. + io_scheduler& m_scheduler; + }; + + /** + * Schedules the current task onto this io_scheduler for execution. + */ + auto schedule() -> schedule_operation { return schedule_operation{*this}; } + /** * Schedules the current task to run after the given amount of time has elapsed. * @param amount The amount of time to wait before resuming execution of this task. @@ -96,6 +166,11 @@ public: */ [[nodiscard]] auto schedule_at(time_point time) -> coro::task; + /** + * Yields the current task to the end of the queue of waiting tasks. + */ + [[nodiscard]] auto yield() -> schedule_operation { return schedule_operation{*this}; }; + /** * Yields the current task for the given amount of time. * @param amount The amount of time to yield for before resuming executino of this task. @@ -137,14 +212,57 @@ public: } /** - * Starts the shutdown of the io scheduler. All currently executing and pending tasks will complete - * prior to shutting down. - * @param wait_for_tasks Given shutdown_t::sync this function will block until all oustanding - * tasks are completed. Given shutdown_t::async this function will trigger - * the shutdown process but return immediately. In this case the io_scheduler's - * destructor will block if any background threads haven't joined. + * Resumes execution of a direct coroutine handle on this io scheduler. + * @param handle The coroutine handle to resume execution. */ - auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) noexcept -> void override; + auto resume(std::coroutine_handle<> handle) -> void + { + if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline) + { + { + std::scoped_lock lk{m_scheduled_tasks_mutex}; + m_scheduled_tasks.emplace_back(handle); + } + + bool expected{false}; + if (m_schedule_fd_triggered.compare_exchange_strong( + expected, true, std::memory_order::release, std::memory_order::relaxed)) + { + eventfd_t value{1}; + eventfd_write(m_schedule_fd, value); + } + } + else + { + m_thread_pool->resume(handle); + } + } + + /** + * @return The number of tasks waiting in the task queue + the executing tasks. + */ + auto size() const noexcept -> std::size_t + { + if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline) + { + return m_size.load(std::memory_order::acquire); + } + else + { + return m_size.load(std::memory_order::acquire) + m_thread_pool->size(); + } + } + + /** + * @return True if the task queue is empty and zero tasks are currently executing. + */ + auto empty() const noexcept -> bool { return size() == 0; } + + /** + * Starts the shutdown of the io scheduler. All currently executing and pending tasks will complete + * prior to shutting down. This call is blocking and will not return until all tasks complete. + */ + auto shutdown() noexcept -> void; private: /// The configuration options. @@ -156,27 +274,35 @@ private: fd_t m_shutdown_fd{-1}; /// The event loop timer fd for timed events, e.g. yield_for() or scheduler_after(). fd_t m_timer_fd{-1}; + /// The schedule file descriptor if the scheduler is in inline processing mode. + fd_t m_schedule_fd{-1}; + std::atomic m_schedule_fd_triggered{false}; + + /// The number of tasks executing or awaiting events in this io scheduler. + std::atomic m_size{0}; /// The background io worker threads. std::thread m_io_thread; + /// Thread pool for executing tasks when not in inline mode. + std::unique_ptr m_thread_pool{nullptr}; std::mutex m_timed_events_mutex{}; /// The map of time point's to poll infos for tasks that are yielding for a period of time /// or for tasks that are polling with timeouts. timed_events m_timed_events{}; + /// Has the io_scheduler been requested to shut down? + std::atomic m_shutdown_requested{false}; + std::atomic m_io_processing{false}; auto process_events_manual(std::chrono::milliseconds timeout) -> void; auto process_events_dedicated_thread() -> void; auto process_events_execute(std::chrono::milliseconds timeout) -> void; static auto event_to_poll_status(uint32_t events) -> poll_status; - auto process_event_execute(detail::poll_info* pi, poll_status status) -> void; - auto process_timeout_execute() -> void; - - auto add_timer_token(time_point tp, detail::poll_info& pi) -> timed_events::iterator; - auto remove_timer_token(timed_events::iterator pos) -> void; - auto update_timeout(time_point now) -> void; + auto process_scheduled_execute_inline() -> void; + std::mutex m_scheduled_tasks_mutex{}; + std::vector> m_scheduled_tasks{}; static constexpr const int m_shutdown_object{0}; static constexpr const void* m_shutdown_ptr = &m_shutdown_object; @@ -184,10 +310,21 @@ private: static constexpr const int m_timer_object{0}; static constexpr const void* m_timer_ptr = &m_timer_object; + static constexpr const int m_schedule_object{0}; + static constexpr const void* m_schedule_ptr = &m_schedule_object; + static const constexpr std::chrono::milliseconds m_default_timeout{1000}; static const constexpr std::chrono::milliseconds m_no_timeout{0}; - static const constexpr std::size_t m_max_events = 8; + static const constexpr std::size_t m_max_events = 16; std::array m_events{}; + std::vector> m_handles_to_resume{}; + + auto process_event_execute(detail::poll_info* pi, poll_status status) -> void; + auto process_timeout_execute() -> void; + + auto add_timer_token(time_point tp, detail::poll_info& pi) -> timed_events::iterator; + auto remove_timer_token(timed_events::iterator pos) -> void; + auto update_timeout(time_point now) -> void; }; } // namespace coro diff --git a/inc/coro/net/dns_resolver.hpp b/inc/coro/net/dns_resolver.hpp index 059c383..942fa7d 100644 --- a/inc/coro/net/dns_resolver.hpp +++ b/inc/coro/net/dns_resolver.hpp @@ -1,5 +1,6 @@ #pragma once +#include "coro/fd.hpp" #include "coro/io_scheduler.hpp" #include "coro/net/hostname.hpp" #include "coro/net/ip_address.hpp" @@ -83,9 +84,9 @@ private: /// This is the set of sockets that are currently being actively polled so multiple poll tasks /// are not setup when ares_poll() is called. - std::unordered_set m_active_sockets{}; + std::unordered_set m_active_sockets{}; - task_container m_task_container; + task_container m_task_container; /// Global count to track if c-ares has been initialized or cleaned up. static uint64_t m_ares_count; @@ -93,7 +94,7 @@ private: static std::mutex m_ares_mutex; auto ares_poll() -> void; - auto make_poll_task(io_scheduler::fd_t fd, poll_op ops) -> coro::task; + auto make_poll_task(fd_t fd, poll_op ops) -> coro::task; }; } // namespace coro::net diff --git a/inc/coro/shared_mutex.hpp b/inc/coro/shared_mutex.hpp index 8de0e96..88f2f0a 100644 --- a/inc/coro/shared_mutex.hpp +++ b/inc/coro/shared_mutex.hpp @@ -1,28 +1,31 @@ #pragma once +#include "coro/concepts/executor.hpp" + #include #include #include namespace coro { +template class shared_mutex; -class thread_pool; /** * A scoped RAII lock holder for a coro::shared_mutex. It will call the appropriate unlock() or * unlock_shared() based on how the coro::shared_mutex was originally acquired, either shared or * exclusive modes. */ +template class shared_scoped_lock { public: - shared_scoped_lock(shared_mutex& sm, bool exclusive) : m_shared_mutex(&sm), m_exclusive(exclusive) {} + shared_scoped_lock(shared_mutex& sm, bool exclusive) : m_shared_mutex(&sm), m_exclusive(exclusive) {} /** * Unlocks the mutex upon this shared scoped lock destructing. */ - ~shared_scoped_lock(); + ~shared_scoped_lock() { unlock(); } shared_scoped_lock(const shared_scoped_lock&) = delete; shared_scoped_lock(shared_scoped_lock&& other) @@ -45,22 +48,38 @@ public: /** * Unlocks the shared mutex prior to this lock going out of scope. */ - auto unlock() -> void; + auto unlock() -> void + { + if (m_shared_mutex != nullptr) + { + if (m_exclusive) + { + m_shared_mutex->unlock(); + } + else + { + m_shared_mutex->unlock_shared(); + } + + m_shared_mutex = nullptr; + } + } private: - shared_mutex* m_shared_mutex{nullptr}; - bool m_exclusive{false}; + shared_mutex* m_shared_mutex{nullptr}; + bool m_exclusive{false}; }; +template class shared_mutex { public: /** - * @param tp The thread pool for when multiple shared waiters can be woken up at the same time, - * each shared waiter will be scheduled to immediately run on this thread pool in - * parralell. + * @param e The thread pool for when multiple shared waiters can be woken up at the same time, + * each shared waiter will be scheduled to immediately run on this thread pool in + * parallel. */ - explicit shared_mutex(coro::thread_pool& tp); + explicit shared_mutex(executor_type& e) : m_executor(e) {} ~shared_mutex() = default; shared_mutex(const shared_mutex&) = delete; @@ -72,9 +91,66 @@ public: { lock_operation(shared_mutex& sm, bool exclusive) : m_shared_mutex(sm), m_exclusive(exclusive) {} - auto await_ready() const noexcept -> bool; - auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool; - auto await_resume() noexcept -> shared_scoped_lock { return shared_scoped_lock{m_shared_mutex, m_exclusive}; } + auto await_ready() const noexcept -> bool + { + if (m_exclusive) + { + return m_shared_mutex.try_lock(); + } + else + { + return m_shared_mutex.try_lock_shared(); + } + } + + auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool + { + std::unique_lock lk{m_shared_mutex.m_mutex}; + // Its possible the lock has been released between await_ready() and await_suspend(), double + // check and make sure we are not going to suspend when nobody holds the lock. + if (m_exclusive) + { + if (m_shared_mutex.try_lock_locked(lk)) + { + return false; + } + } + else + { + if (m_shared_mutex.try_lock_shared_locked(lk)) + { + return false; + } + } + + // For sure the lock is currently held in a manner that it cannot be acquired, suspend ourself + // at the end of the waiter list. + + if (m_shared_mutex.m_tail_waiter == nullptr) + { + m_shared_mutex.m_head_waiter = this; + m_shared_mutex.m_tail_waiter = this; + } + else + { + m_shared_mutex.m_tail_waiter->m_next = this; + m_shared_mutex.m_tail_waiter = this; + } + + // If this is an exclusive lock acquire then mark it as so so that shared locks after this + // exclusive one will also suspend so this exclusive lock doens't get starved. + if (m_exclusive) + { + ++m_shared_mutex.m_exclusive_waiters; + } + + m_awaiting_coroutine = awaiting_coroutine; + return true; + } + auto await_resume() noexcept -> shared_scoped_lock + { + return shared_scoped_lock{m_shared_mutex, m_exclusive}; + } private: friend class shared_mutex; @@ -99,12 +175,27 @@ public: /** * @return True if the lock could immediately be acquired in a shared state. */ - auto try_lock_shared() -> bool; + auto try_lock_shared() -> bool + { + // To acquire the shared lock the state must be one of two states: + // 1) unlocked + // 2) shared locked with zero exclusive waiters + // Zero exclusive waiters prevents exclusive starvation if shared locks are + // always continuously happening. + + std::unique_lock lk{m_mutex}; + return try_lock_shared_locked(lk); + } /** * @return True if the lock could immediately be acquired in an exclusive state. */ - auto try_lock() -> bool; + auto try_lock() -> bool + { + // To acquire the exclusive lock the state must be unlocked. + std::unique_lock lk{m_mutex}; + return try_lock_locked(lk); + } /** * Unlocks a single shared state user. *REQUIRES* that the lock was first acquired exactly once @@ -114,7 +205,24 @@ public: * If the shared user count drops to zero and this lock has an exclusive waiter then the exclusive * waiter acquires the lock. */ - auto unlock_shared() -> void; + auto unlock_shared() -> void + { + std::unique_lock lk{m_mutex}; + --m_shared_users; + + // Only wake waiters from shared state if all shared users have completed. + if (m_shared_users == 0) + { + if (m_head_waiter != nullptr) + { + wake_waiters(lk); + } + else + { + m_state = state::unlocked; + } + } + } /** * Unlocks the mutex from its exclusive state. If there is a following exclusive watier then @@ -122,7 +230,18 @@ public: * shared waiters acquire the lock in a shared state in parallel and are resumed on the original * thread pool this shared mutex was created with. */ - auto unlock() -> void; + auto unlock() -> void + { + std::unique_lock lk{m_mutex}; + if (m_head_waiter != nullptr) + { + wake_waiters(lk); + } + else + { + m_state = state::unlocked; + } + } private: friend class lock_operation; @@ -134,8 +253,8 @@ private: locked_exclusive }; - /// This thread pool is for resuming multiple shared waiters. - coro::thread_pool& m_thread_pool; + /// This executor is for resuming multiple shared waiters. + executor_type& m_executor; std::mutex m_mutex; @@ -150,10 +269,87 @@ private: lock_operation* m_head_waiter{nullptr}; lock_operation* m_tail_waiter{nullptr}; - auto try_lock_shared_locked(std::unique_lock& lk) -> bool; - auto try_lock_locked(std::unique_lock& lk) -> bool; + auto try_lock_shared_locked(std::unique_lock& lk) -> bool + { + if (m_state == state::unlocked) + { + // If the shared mutex is unlocked put it into shared mode and add ourself as using the lock. + m_state = state::locked_shared; + ++m_shared_users; + lk.unlock(); + return true; + } + else if (m_state == state::locked_shared && m_exclusive_waiters == 0) + { + // If the shared mutex is in a shared locked state and there are no exclusive waiters + // the add ourself as using the lock. + ++m_shared_users; + lk.unlock(); + return true; + } - auto wake_waiters(std::unique_lock& lk) -> void; + // If the lock is in shared mode but there are exclusive waiters then we will also wait so + // the writers are not starved. + + // If the lock is in exclusive mode already then we need to wait. + + return false; + } + + auto try_lock_locked(std::unique_lock& lk) -> bool + { + if (m_state == state::unlocked) + { + m_state = state::locked_exclusive; + lk.unlock(); + return true; + } + return false; + } + + auto wake_waiters(std::unique_lock& lk) -> void + { + // First determine what the next lock state will be based on the first waiter. + if (m_head_waiter->m_exclusive) + { + // If its exclusive then only this waiter can be woken up. + m_state = state::locked_exclusive; + lock_operation* to_resume = m_head_waiter; + m_head_waiter = m_head_waiter->m_next; + --m_exclusive_waiters; + if (m_head_waiter == nullptr) + { + m_tail_waiter = nullptr; + } + + // Since this is an exclusive lock waiting we can resume it directly. + lk.unlock(); + to_resume->m_awaiting_coroutine.resume(); + } + else + { + // If its shared then we will scan forward and awake all shared waiters onto the given + // thread pool so they can run in parallel. + m_state = state::locked_shared; + do + { + lock_operation* to_resume = m_head_waiter; + m_head_waiter = m_head_waiter->m_next; + if (m_head_waiter == nullptr) + { + m_tail_waiter = nullptr; + } + ++m_shared_users; + + m_executor.resume(to_resume->m_awaiting_coroutine); + } while (m_head_waiter != nullptr && !m_head_waiter->m_exclusive); + + // Cannot unlock until the entire set of shared waiters has been traversed. I think this + // makes more sense than allocating space for all the shared waiters, unlocking, and then + // resuming in a batch? + lk.unlock(); + } + } }; } // namespace coro diff --git a/inc/coro/shutdown.hpp b/inc/coro/shutdown.hpp deleted file mode 100644 index 399f592..0000000 --- a/inc/coro/shutdown.hpp +++ /dev/null @@ -1,13 +0,0 @@ -#pragma once - -namespace coro -{ -enum class shutdown_t -{ - /// Synchronously wait for all tasks to complete when calling shutdown. - sync, - /// Asynchronously let tasks finish on the background thread on shutdown. - async -}; - -} // namespace coro diff --git a/inc/coro/task_container.hpp b/inc/coro/task_container.hpp index a5db75e..6718d26 100644 --- a/inc/coro/task_container.hpp +++ b/inc/coro/task_container.hpp @@ -1,16 +1,17 @@ #pragma once +#include "coro/concepts/executor.hpp" #include "coro/task.hpp" #include +#include #include #include #include namespace coro { -class thread_pool; - +template class task_container { public: @@ -25,16 +26,33 @@ public: }; /** - * @param tp Tasks started in the container are scheduled onto this thread pool. For tasks created + * @param e Tasks started in the container are scheduled onto this executor. For tasks created * from a coro::io_scheduler, this would usually be that coro::io_scheduler instance. * @param opts Task container options. */ - task_container(thread_pool& tp, const options opts = options{.reserve_size = 8, .growth_factor = 2}); + task_container(executor_type& e, const options opts = options{.reserve_size = 8, .growth_factor = 2}) + : m_growth_factor(opts.growth_factor), + m_executor(e) + { + m_tasks.resize(opts.reserve_size); + for (std::size_t i = 0; i < opts.reserve_size; ++i) + { + m_task_indexes.emplace_back(i); + } + m_free_pos = m_task_indexes.begin(); + } task_container(const task_container&) = delete; task_container(task_container&&) = delete; auto operator=(const task_container&) -> task_container& = delete; auto operator=(task_container&&) -> task_container& = delete; - ~task_container(); + ~task_container() + { + // This will hang the current thread.. but if tasks are not complete thats also pretty bad. + while (!empty()) + { + garbage_collect(); + } + } enum class garbage_collect_t { @@ -51,14 +69,44 @@ public: * call? Calling at regular intervals will reduce memory usage of completed * tasks and allow for the task container to re-use allocated space. */ - auto start(coro::task user_task, garbage_collect_t cleanup = garbage_collect_t::yes) -> void; + auto start(coro::task user_task, garbage_collect_t cleanup = garbage_collect_t::yes) -> void + { + m_size.fetch_add(1, std::memory_order::relaxed); + + std::scoped_lock lk{m_mutex}; + + if (cleanup == garbage_collect_t::yes) + { + gc_internal(); + } + + // Only grow if completely full and attempting to add more. + if (m_free_pos == m_task_indexes.end()) + { + m_free_pos = grow(); + } + + // Store the task inside a cleanup task for self deletion. + auto index = *m_free_pos; + m_tasks[index] = make_cleanup_task(std::move(user_task), m_free_pos); + + // Mark the current used slot as used. + std::advance(m_free_pos, 1); + + // Start executing from the cleanup task to schedule the user's task onto the thread pool. + m_tasks[index].resume(); + } /** * Garbage collects any tasks that are marked as deleted. This frees up space to be re-used by * the task container for newly stored tasks. * @return The number of tasks that were deleted. */ - auto garbage_collect() -> std::size_t; + auto garbage_collect() -> std::size_t + { + std::scoped_lock lk{m_mutex}; + return gc_internal(); + } /** * @return The number of tasks that are awaiting deletion. @@ -104,19 +152,59 @@ public: * This does not shut down the task container, but can be used when shutting down, or if your * logic requires all the tasks contained within to complete, it is similar to coro::latch. */ - auto garbage_collect_and_yield_until_empty() -> coro::task; + auto garbage_collect_and_yield_until_empty() -> coro::task + { + while (!empty()) + { + garbage_collect(); + co_await m_executor.yield(); + } + } private: /** * Grows each task container by the growth factor. * @return The position of the free index after growing. */ - auto grow() -> task_position; + auto grow() -> task_position + { + // Save an index at the current last item. + auto last_pos = std::prev(m_task_indexes.end()); + std::size_t new_size = m_tasks.size() * m_growth_factor; + for (std::size_t i = m_tasks.size(); i < new_size; ++i) + { + m_task_indexes.emplace_back(i); + } + m_tasks.resize(new_size); + // Set the free pos to the item just after the previous last item. + return std::next(last_pos); + } /** * Interal GC call, expects the public function to lock. */ - auto gc_internal() -> std::size_t; + auto gc_internal() -> std::size_t + { + std::size_t deleted{0}; + if (!m_tasks_to_delete.empty()) + { + for (const auto& pos : m_tasks_to_delete) + { + // This doesn't actually 'delete' the task, it'll get overwritten when a + // new user task claims the free space. It could be useful to actually + // delete the tasks so the coroutine stack frames are destroyed. The advantage + // of letting a new task replace and old one though is that its a 1:1 exchange + // on delete and create, rather than a large pause here to delete all the + // completed tasks. + + // Put the deleted position at the end of the free indexes list. + m_task_indexes.splice(m_task_indexes.end(), m_task_indexes, pos); + } + deleted = m_tasks_to_delete.size(); + m_tasks_to_delete.clear(); + } + return deleted; + } /** * Encapsulate the users tasks in a cleanup task which marks itself for deletion upon @@ -130,7 +218,37 @@ private: * @param pos The position where the task data will be stored in the task manager. * @return The user's task wrapped in a self cleanup task. */ - auto make_cleanup_task(task user_task, task_position pos) -> coro::task; + auto make_cleanup_task(task user_task, task_position pos) -> coro::task + { + // Immediately move the task onto the executor. + co_await m_executor.schedule(); + + try + { + // Await the users task to complete. + co_await user_task; + } + catch (const std::exception& e) + { + // TODO: what would be a good way to report this to the user...? Catching here is required + // since the co_await will unwrap the unhandled exception on the task. + // The user's task should ideally be wrapped in a catch all and handle it themselves, but + // that cannot be guaranteed. + std::cerr << "coro::task_container user_task had an unhandled exception e.what()= " << e.what() << "\n"; + } + catch (...) + { + // don't crash if they throw something that isn't derived from std::exception + std::cerr << "coro::task_container user_task had unhandle exception, not derived from std::exception.\n"; + } + + std::scoped_lock lk{m_mutex}; + m_tasks_to_delete.push_back(pos); + // This has to be done within scope lock to make sure this coroutine task completes before the + // task container object destructs -- if it was waiting on .empty() to become true. + m_size.fetch_sub(1, std::memory_order::relaxed); + co_return; + } /// Mutex for safely mutating the task containers across threads, expected usage is within /// thread pools for indeterminate lifetime requests. @@ -148,8 +266,8 @@ private: task_position m_free_pos{}; /// The amount to grow the containers by when all spaces are taken. double m_growth_factor{}; - /// The thread pool to schedule tasks that have just started. - thread_pool& m_thread_pool; + /// The executor to schedule tasks that have just started. + executor_type& m_executor; }; } // namespace coro diff --git a/inc/coro/thread_pool.hpp b/inc/coro/thread_pool.hpp index 8ee9b81..6c407d2 100644 --- a/inc/coro/thread_pool.hpp +++ b/inc/coro/thread_pool.hpp @@ -1,7 +1,7 @@ #pragma once +#include "coro/concepts/range_of.hpp" #include "coro/event.hpp" -#include "coro/shutdown.hpp" #include "coro/task.hpp" #include @@ -11,15 +11,13 @@ #include #include #include +#include #include #include #include namespace coro { -class event; -class shared_mutex; - /** * Creates a thread pool that executes arbitrary coroutine tasks in a FIFO scheduler policy. * The thread pool by default will create an execution thread per available core on the system. @@ -133,6 +131,46 @@ public: } } + /** + * Schedules any coroutine handle that is ready to be resumed. + * @param handle The coroutine handle to schedule. + */ + auto resume(std::coroutine_handle<> handle) noexcept -> void; + + /** + * Schedules the set of coroutine handles that are ready to be resumed. + * @param handles The coroutine handles to schedule. + */ + template> range_type> + auto resume(const range_type& handles) noexcept -> void + { + m_size.fetch_add(std::size(handles), std::memory_order::release); + + size_t null_handles{0}; + + { + std::scoped_lock lk{m_wait_mutex}; + for (const auto& handle : handles) + { + if (handle != nullptr) [[likely]] + { + m_queue.emplace_back(handle); + } + else + { + ++null_handles; + } + } + } + + if (null_handles > 0) + { + m_size.fetch_sub(null_handles, std::memory_order::release); + } + + m_wait_cv.notify_one(); + } + /** * Immediately yields the current task and places it at the end of the queue of tasks waiting * to be processed. This will immediately be picked up again once it naturally goes through the @@ -143,11 +181,10 @@ public: /** * Shutsdown the thread pool. This will finish any tasks scheduled prior to calling this - * function but will prevent the thread pool from scheduling any new tasks. - * @param wait_for_tasks Should this function block until all remaining scheduled tasks have - * completed? Pass in sync to wait, or async to not block. + * function but will prevent the thread pool from scheduling any new tasks. This call is + * blocking and will wait until all inflight tasks are completed before returnin. */ - virtual auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) noexcept -> void; + auto shutdown() noexcept -> void; /** * @return The number of tasks waiting in the task queue + the executing tasks. @@ -198,22 +235,10 @@ private: */ auto schedule_impl(std::coroutine_handle<> handle) noexcept -> void; -protected: /// The number of tasks in the queue + currently executing. std::atomic m_size{0}; /// Has the thread pool been requested to shut down? std::atomic m_shutdown_requested{false}; - - /// Required to resume all waiters of the event onto a thread_pool. - friend event; - friend shared_mutex; - - /** - * Schedules any coroutine that is ready to be resumed. - * @param handle The coroutine handle to schedule. - */ - auto resume(std::coroutine_handle<> handle) noexcept -> void; - auto resume(const std::vector>& handles) noexcept -> void; }; } // namespace coro diff --git a/src/event.cpp b/src/event.cpp index 2b724c7..f5fd6c8 100644 --- a/src/event.cpp +++ b/src/event.cpp @@ -24,22 +24,22 @@ auto event::set() noexcept -> void } } -auto event::set(coro::thread_pool& tp) noexcept -> void -{ - // Exchange the state to this, if the state was previously not this, then traverse the list - // of awaiters and resume their coroutines. - void* old_value = m_state.exchange(this, std::memory_order::acq_rel); - if (old_value != this) - { - auto* waiters = static_cast(old_value); - while (waiters != nullptr) - { - auto* next = waiters->m_next; - tp.resume(waiters->m_awaiting_coroutine); - waiters = next; - } - } -} +// auto event::set(coro::thread_pool& tp) noexcept -> void +// { +// // Exchange the state to this, if the state was previously not this, then traverse the list +// // of awaiters and resume their coroutines. +// void* old_value = m_state.exchange(this, std::memory_order::acq_rel); +// if (old_value != this) +// { +// auto* waiters = static_cast(old_value); +// while (waiters != nullptr) +// { +// auto* next = waiters->m_next; +// tp.resume(waiters->m_awaiting_coroutine); +// waiters = next; +// } +// } +// } auto event::awaiter::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool { diff --git a/src/io_scheduler.cpp b/src/io_scheduler.cpp index 1aa9b48..67532f0 100644 --- a/src/io_scheduler.cpp +++ b/src/io_scheduler.cpp @@ -14,73 +14,18 @@ using namespace std::chrono_literals; namespace coro { -namespace detail -{ -/** - * Poll Info encapsulates everything about a poll operation for the event as well as its paired - * timeout. This is important since coroutines that are waiting on an event or timeout do not - * immediately execute, they are re-scheduled onto the thread pool, so its possible its pair - * event or timeout also triggers while the coroutine is still waiting to resume. This means that - * the first one to happen, the event itself or its timeout, needs to disable the other pair item - * prior to resuming the coroutine. - * - * Finally, its also important to note that the event and its paired timeout could happen during - * the same epoll_wait and possibly trigger the coroutine to start twice. Only one can win, so the - * first one processed sets m_processed to true and any subsequent events in the same epoll batch - * are effectively discarded. - */ -struct poll_info -{ - poll_info() = default; - ~poll_info() = default; - - poll_info(const poll_info&) = delete; - poll_info(poll_info&&) = delete; - auto operator=(const poll_info&) -> poll_info& = delete; - auto operator=(poll_info&&) -> poll_info& = delete; - - struct poll_awaiter - { - explicit poll_awaiter(poll_info& pi) noexcept : m_pi(pi) {} - - auto await_ready() const noexcept -> bool { return false; } - auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> void - { - m_pi.m_awaiting_coroutine = awaiting_coroutine; - std::atomic_thread_fence(std::memory_order::release); - } - auto await_resume() noexcept -> coro::poll_status { return m_pi.m_poll_status; } - - poll_info& m_pi; - }; - - auto operator co_await() noexcept -> poll_awaiter { return poll_awaiter{*this}; } - - /// The file descriptor being polled on. This is needed so that if the timeout occurs first then - /// the event loop can immediately disable the event within epoll. - io_scheduler::fd_t m_fd{-1}; - /// The timeout's position in the timeout map. A poll() with no timeout or yield() this is empty. - /// This is needed so that if the event occurs first then the event loop can immediately disable - /// the timeout within epoll. - std::optional m_timer_pos{std::nullopt}; - /// The awaiting coroutine for this poll info to resume upon event or timeout. - std::coroutine_handle<> m_awaiting_coroutine; - /// The status of the poll operation. - coro::poll_status m_poll_status{coro::poll_status::error}; - /// Did the timeout and event trigger at the same time on the same epoll_wait call? - /// Once this is set to true all future events on this poll info are null and void. - bool m_processed{false}; -}; - -} // namespace detail - io_scheduler::io_scheduler(options opts) - : thread_pool(std::move(opts.pool)), - m_opts(std::move(opts)), + : m_opts(std::move(opts)), m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)), m_shutdown_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)), - m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)) + m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)), + m_schedule_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)) { + if (opts.execution_strategy == execution_strategy_t::process_tasks_on_thread_pool) + { + m_thread_pool = std::make_unique(std::move(m_opts.pool)); + } + epoll_event e{}; e.events = EPOLLIN; @@ -90,6 +35,9 @@ io_scheduler::io_scheduler(options opts) e.data.ptr = const_cast(m_timer_ptr); epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_timer_fd, &e); + e.data.ptr = const_cast(m_schedule_ptr); + epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_schedule_fd, &e); + if (m_opts.thread_strategy == thread_strategy_t::spawn) { m_io_thread = std::thread([this]() { process_events_dedicated_thread(); }); @@ -116,12 +64,17 @@ io_scheduler::~io_scheduler() close(m_timer_fd); m_timer_fd = -1; } + if (m_schedule_fd != -1) + { + close(m_schedule_fd); + m_schedule_fd = -1; + } } auto io_scheduler::process_events(std::chrono::milliseconds timeout) -> std::size_t { process_events_manual(timeout); - return m_size.load(std::memory_order::relaxed); + return size(); } auto io_scheduler::schedule_after(std::chrono::milliseconds amount) -> coro::task @@ -142,6 +95,11 @@ auto io_scheduler::yield_for(std::chrono::milliseconds amount) -> coro::task coro::task coro::task } else { + m_size.fetch_add(1, std::memory_order::release); + auto amount = std::chrono::duration_cast(time - now); detail::poll_info pi{}; add_timer_token(now + amount, pi); co_await pi; + + m_size.fetch_sub(1, std::memory_order::release); } co_return; } auto io_scheduler::poll(fd_t fd, coro::poll_op op, std::chrono::milliseconds timeout) -> coro::task { + // Because the size will drop when this coroutine suspends every poll needs to undo the subtraction + // on the number of active tasks in the scheduler. When this task is resumed by the event loop. + m_size.fetch_add(1, std::memory_order::release); + // Setup two events, a timeout event and the actual poll for op event. // Whichever triggers first will delete the other to guarantee only one wins. // The resume token will be set by the scheduler to what the event turned out to be. @@ -200,16 +168,30 @@ auto io_scheduler::poll(fd_t fd, coro::poll_op op, std::chrono::milliseconds tim // The event loop will 'clean-up' whichever event didn't win since the coroutine is scheduled // onto the thread poll its possible the other type of event could trigger while its waiting // to execute again, thus restarting the coroutine twice, that would be quite bad. - co_return co_await pi; + auto result = co_await pi; + m_size.fetch_sub(1, std::memory_order::release); + co_return result; } -auto io_scheduler::shutdown(shutdown_t wait_for_tasks) noexcept -> void +auto io_scheduler::shutdown() noexcept -> void { - thread_pool::shutdown(wait_for_tasks); + // Only allow shutdown to occur once. + if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false) + { + if (m_thread_pool != nullptr) + { + m_thread_pool->shutdown(); + } - // Signal the event loop to stop asap, triggering the event fd is safe. - uint64_t value{1}; - ::write(m_shutdown_fd, &value, sizeof(value)); + // Signal the event loop to stop asap, triggering the event fd is safe. + uint64_t value{1}; + ::write(m_shutdown_fd, &value, sizeof(value)); + + if (m_io_thread.joinable()) + { + m_io_thread.join(); + } + } } auto io_scheduler::process_events_manual(std::chrono::milliseconds timeout) -> void @@ -231,7 +213,7 @@ auto io_scheduler::process_events_dedicated_thread() -> void m_io_processing.exchange(true, std::memory_order::release); // Execute tasks until stopped or there are no more tasks to complete. - while (!m_shutdown_requested.load(std::memory_order::relaxed) || m_size.load(std::memory_order::relaxed) > 0) + while (!m_shutdown_requested.load(std::memory_order::acquire) || size() > 0) { process_events_execute(m_default_timeout); } @@ -258,18 +240,45 @@ auto io_scheduler::process_events_execute(std::chrono::milliseconds timeout) -> // Process all events that have timed out. process_timeout_execute(); } + else if (handle_ptr == m_schedule_ptr) + { + // Process scheduled coroutines. + process_scheduled_execute_inline(); + } else if (handle_ptr == m_shutdown_ptr) [[unlikely]] { // Nothing to do , just needed to wake-up and smell the flowers } else { - // Individual poll task wake-up, this will queue the coroutines waiting - // on the resume token into the FIFO queue for processing. + // Individual poll task wake-up. process_event_execute(static_cast(handle_ptr), event_to_poll_status(event.events)); } } } + + // Its important to not resume any handles until the full set is accounted for. If a timeout + // and an event for the same handle happen in the same epoll_wait() call then inline processing + // will destruct the poll_info object before the second event is handled. This is also possible + // with thread pool processing, but probably has an extremely low chance of occuring due to + // the thread switch required. If m_max_events == 1 this would be unnecessary. + + if (!m_handles_to_resume.empty()) + { + if (m_opts.execution_strategy == execution_strategy_t::process_tasks_inline) + { + for (auto& handle : m_handles_to_resume) + { + handle.resume(); + } + } + else + { + m_thread_pool->resume(m_handles_to_resume); + } + + m_handles_to_resume.clear(); + } } auto io_scheduler::event_to_poll_status(uint32_t events) -> poll_status @@ -290,6 +299,30 @@ auto io_scheduler::event_to_poll_status(uint32_t events) -> poll_status throw std::runtime_error{"invalid epoll state"}; } +auto io_scheduler::process_scheduled_execute_inline() -> void +{ + std::vector> tasks{}; + { + // Acquire the entire list, and then reset it. + std::scoped_lock lk{m_scheduled_tasks_mutex}; + tasks.swap(m_scheduled_tasks); + + // Clear the schedule eventfd if this is a scheduled task. + eventfd_t value{0}; + eventfd_read(m_shutdown_fd, &value); + + // Clear the in memory flag to reduce eventfd_* calls on scheduling. + m_schedule_fd_triggered.exchange(false, std::memory_order::release); + } + + // This set of handles can be safely resumed now since they do not have a corresponding timeout event. + for (auto& task : tasks) + { + task.resume(); + } + m_size.fetch_sub(tasks.size(), std::memory_order::release); +} + auto io_scheduler::process_event_execute(detail::poll_info* pi, poll_status status) -> void { if (!pi->m_processed) @@ -317,7 +350,8 @@ auto io_scheduler::process_event_execute(detail::poll_info* pi, poll_status stat { std::atomic_thread_fence(std::memory_order::acquire); } - resume(pi->m_awaiting_coroutine); + + m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine); } } @@ -345,8 +379,6 @@ auto io_scheduler::process_timeout_execute() -> void } } - std::vector> handles{}; - handles.reserve(poll_infos.size()); for (auto pi : poll_infos) { if (!pi->m_processed) @@ -367,14 +399,11 @@ auto io_scheduler::process_timeout_execute() -> void // std::cerr << "process_event_execute() has a nullptr event\n"; } - handles.emplace_back(pi->m_awaiting_coroutine); + m_handles_to_resume.emplace_back(pi->m_awaiting_coroutine); pi->m_poll_status = coro::poll_status::timeout; } } - // Resume all timed out coroutines. - resume(handles); - // Update the time to the next smallest time point, re-take the current now time // since updating and resuming tasks could shift the time. update_timeout(clock::now()); diff --git a/src/net/dns_resolver.cpp b/src/net/dns_resolver.cpp index 9b61184..6651fe9 100644 --- a/src/net/dns_resolver.cpp +++ b/src/net/dns_resolver.cpp @@ -145,7 +145,7 @@ auto dns_resolver::ares_poll() -> void std::vector> poll_tasks{}; for (size_t i = 0; i < new_sockets; ++i) { - auto fd = static_cast(ares_sockets[i]); + auto fd = static_cast(ares_sockets[i]); // If this socket is not currently actively polling, start polling! if (m_active_sockets.emplace(fd).second) @@ -155,7 +155,7 @@ auto dns_resolver::ares_poll() -> void } } -auto dns_resolver::make_poll_task(io_scheduler::fd_t fd, poll_op ops) -> coro::task +auto dns_resolver::make_poll_task(fd_t fd, poll_op ops) -> coro::task { auto result = co_await m_io_scheduler.poll(fd, ops, m_timeout); switch (result) diff --git a/src/shared_mutex.cpp b/src/shared_mutex.cpp deleted file mode 100644 index 453d7cd..0000000 --- a/src/shared_mutex.cpp +++ /dev/null @@ -1,221 +0,0 @@ -#include "coro/shared_mutex.hpp" -#include "coro/thread_pool.hpp" - -namespace coro -{ -shared_scoped_lock::~shared_scoped_lock() -{ - unlock(); -} - -auto shared_scoped_lock::unlock() -> void -{ - if (m_shared_mutex != nullptr) - { - if (m_exclusive) - { - m_shared_mutex->unlock(); - } - else - { - m_shared_mutex->unlock_shared(); - } - - m_shared_mutex = nullptr; - } -} - -shared_mutex::shared_mutex(coro::thread_pool& tp) : m_thread_pool(tp) -{ -} - -auto shared_mutex::lock_operation::await_ready() const noexcept -> bool -{ - if (m_exclusive) - { - return m_shared_mutex.try_lock(); - } - else - { - return m_shared_mutex.try_lock_shared(); - } -} - -auto shared_mutex::lock_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool -{ - std::unique_lock lk{m_shared_mutex.m_mutex}; - // Its possible the lock has been released between await_ready() and await_suspend(), double - // check and make sure we are not going to suspend when nobody holds the lock. - if (m_exclusive) - { - if (m_shared_mutex.try_lock_locked(lk)) - { - return false; - } - } - else - { - if (m_shared_mutex.try_lock_shared_locked(lk)) - { - return false; - } - } - - // For sure the lock is currently held in a manner that it cannot be acquired, suspend ourself - // at the end of the waiter list. - - if (m_shared_mutex.m_tail_waiter == nullptr) - { - m_shared_mutex.m_head_waiter = this; - m_shared_mutex.m_tail_waiter = this; - } - else - { - m_shared_mutex.m_tail_waiter->m_next = this; - m_shared_mutex.m_tail_waiter = this; - } - - // If this is an exclusive lock acquire then mark it as so so that shared locks after this - // exclusive one will also suspend so this exclusive lock doens't get starved. - if (m_exclusive) - { - ++m_shared_mutex.m_exclusive_waiters; - } - - m_awaiting_coroutine = awaiting_coroutine; - return true; -} - -auto shared_mutex::try_lock_shared() -> bool -{ - // To acquire the shared lock the state must be one of two states: - // 1) unlocked - // 2) shared locked with zero exclusive waiters - // Zero exclusive waiters prevents exclusive starvation if shared locks are - // always continuously happening. - - std::unique_lock lk{m_mutex}; - return try_lock_shared_locked(lk); -} - -auto shared_mutex::try_lock() -> bool -{ - // To acquire the exclusive lock the state must be unlocked. - std::unique_lock lk{m_mutex}; - return try_lock_locked(lk); -} - -auto shared_mutex::unlock_shared() -> void -{ - std::unique_lock lk{m_mutex}; - --m_shared_users; - - // Only wake waiters from shared state if all shared users have completed. - if (m_shared_users == 0) - { - if (m_head_waiter != nullptr) - { - wake_waiters(lk); - } - else - { - m_state = state::unlocked; - } - } -} - -auto shared_mutex::unlock() -> void -{ - std::unique_lock lk{m_mutex}; - if (m_head_waiter != nullptr) - { - wake_waiters(lk); - } - else - { - m_state = state::unlocked; - } -} -auto shared_mutex::try_lock_shared_locked(std::unique_lock& lk) -> bool -{ - if (m_state == state::unlocked) - { - // If the shared mutex is unlocked put it into shared mode and add ourself as using the lock. - m_state = state::locked_shared; - ++m_shared_users; - lk.unlock(); - return true; - } - else if (m_state == state::locked_shared && m_exclusive_waiters == 0) - { - // If the shared mutex is in a shared locked state and there are no exclusive waiters - // the add ourself as using the lock. - ++m_shared_users; - lk.unlock(); - return true; - } - - // If the lock is in shared mode but there are exclusive waiters then we will also wait so - // the writers are not starved. - - // If the lock is in exclusive mode already then we need to wait. - - return false; -} - -auto shared_mutex::try_lock_locked(std::unique_lock& lk) -> bool -{ - if (m_state == state::unlocked) - { - m_state = state::locked_exclusive; - lk.unlock(); - return true; - } - return false; -} - -auto shared_mutex::wake_waiters(std::unique_lock& lk) -> void -{ - // First determine what the next lock state will be based on the first waiter. - if (m_head_waiter->m_exclusive) - { - // If its exclusive then only this waiter can be woken up. - m_state = state::locked_exclusive; - lock_operation* to_resume = m_head_waiter; - m_head_waiter = m_head_waiter->m_next; - --m_exclusive_waiters; - if (m_head_waiter == nullptr) - { - m_tail_waiter = nullptr; - } - - // Since this is an exclusive lock waiting we can resume it directly. - lk.unlock(); - to_resume->m_awaiting_coroutine.resume(); - } - else - { - // If its shared then we will scan forward and awake all shared waiters onto the given - // thread pool so they can run in parallel. - m_state = state::locked_shared; - do - { - lock_operation* to_resume = m_head_waiter; - m_head_waiter = m_head_waiter->m_next; - if (m_head_waiter == nullptr) - { - m_tail_waiter = nullptr; - } - ++m_shared_users; - - m_thread_pool.resume(to_resume->m_awaiting_coroutine); - } while (m_head_waiter != nullptr && !m_head_waiter->m_exclusive); - - // Cannot unlock until the entire set of shared waiters has been traversed. I think this - // makes more sense than allocating space for all the shared waiters, unlocking, and then - // resuming in a batch? - lk.unlock(); - } -} - -} // namespace coro diff --git a/src/task_container.cpp b/src/task_container.cpp deleted file mode 100644 index 3473bbb..0000000 --- a/src/task_container.cpp +++ /dev/null @@ -1,141 +0,0 @@ -#include "coro/task_container.hpp" -#include "coro/thread_pool.hpp" - -#include - -namespace coro -{ -task_container::task_container(thread_pool& tp, const options opts) - : m_growth_factor(opts.growth_factor), - m_thread_pool(tp) -{ - m_tasks.resize(opts.reserve_size); - for (std::size_t i = 0; i < opts.reserve_size; ++i) - { - m_task_indexes.emplace_back(i); - } - m_free_pos = m_task_indexes.begin(); -} - -task_container::~task_container() -{ - // This will hang the current thread.. but if tasks are not complete thats also pretty bad. - while (!empty()) - { - garbage_collect(); - } -} - -auto task_container::start(coro::task user_task, garbage_collect_t cleanup) -> void -{ - m_size.fetch_add(1, std::memory_order::relaxed); - - std::scoped_lock lk{m_mutex}; - - if (cleanup == garbage_collect_t::yes) - { - gc_internal(); - } - - // Only grow if completely full and attempting to add more. - if (m_free_pos == m_task_indexes.end()) - { - m_free_pos = grow(); - } - - // Store the task inside a cleanup task for self deletion. - auto index = *m_free_pos; - m_tasks[index] = make_cleanup_task(std::move(user_task), m_free_pos); - - // Mark the current used slot as used. - std::advance(m_free_pos, 1); - - // Start executing from the cleanup task to schedule the user's task onto the thread pool. - m_tasks[index].resume(); -} - -auto task_container::garbage_collect() -> std::size_t -{ - std::scoped_lock lk{m_mutex}; - return gc_internal(); -} - -auto task_container::garbage_collect_and_yield_until_empty() -> coro::task -{ - while (!empty()) - { - garbage_collect(); - co_await m_thread_pool.yield(); - } -} - -auto task_container::grow() -> task_position -{ - // Save an index at the current last item. - auto last_pos = std::prev(m_task_indexes.end()); - std::size_t new_size = m_tasks.size() * m_growth_factor; - for (std::size_t i = m_tasks.size(); i < new_size; ++i) - { - m_task_indexes.emplace_back(i); - } - m_tasks.resize(new_size); - // Set the free pos to the item just after the previous last item. - return std::next(last_pos); -} - -auto task_container::gc_internal() -> std::size_t -{ - std::size_t deleted{0}; - if (!m_tasks_to_delete.empty()) - { - for (const auto& pos : m_tasks_to_delete) - { - // This doesn't actually 'delete' the task, it'll get overwritten when a - // new user task claims the free space. It could be useful to actually - // delete the tasks so the coroutine stack frames are destroyed. The advantage - // of letting a new task replace and old one though is that its a 1:1 exchange - // on delete and create, rather than a large pause here to delete all the - // completed tasks. - - // Put the deleted position at the end of the free indexes list. - m_task_indexes.splice(m_task_indexes.end(), m_task_indexes, pos); - } - deleted = m_tasks_to_delete.size(); - m_tasks_to_delete.clear(); - } - return deleted; -} - -auto task_container::make_cleanup_task(task user_task, task_position pos) -> coro::task -{ - // Immediately move the task onto the thread pool. - co_await m_thread_pool.schedule(); - - try - { - // Await the users task to complete. - co_await user_task; - } - catch (const std::exception& e) - { - // TODO: what would be a good way to report this to the user...? Catching here is required - // since the co_await will unwrap the unhandled exception on the task. - // The user's task should ideally be wrapped in a catch all and handle it themselves, but - // that cannot be guaranteed. - std::cerr << "coro::task_container user_task had an unhandled exception e.what()= " << e.what() << "\n"; - } - catch (...) - { - // don't crash if they throw something that isn't derived from std::exception - std::cerr << "coro::task_container user_task had unhandle exception, not derived from std::exception.\n"; - } - - std::scoped_lock lk{m_mutex}; - m_tasks_to_delete.push_back(pos); - // This has to be done within scope lock to make sure this coroutine task completes before the - // task container object destructs -- if it was waiting on .empty() to become true. - m_size.fetch_sub(1, std::memory_order::relaxed); - co_return; -} - -} // namespace coro diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index c620ffa..d171f1c 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -1,5 +1,7 @@ #include "coro/thread_pool.hpp" +#include + namespace coro { thread_pool::operation::operation(thread_pool& tp) noexcept : m_thread_pool(tp) @@ -35,30 +37,39 @@ auto thread_pool::schedule() -> operation { if (!m_shutdown_requested.load(std::memory_order::relaxed)) { - m_size.fetch_add(1, std::memory_order::relaxed); + m_size.fetch_add(1, std::memory_order::release); return operation{*this}; } throw std::runtime_error("coro::thread_pool is shutting down, unable to schedule new tasks."); } -auto thread_pool::shutdown(shutdown_t wait_for_tasks) noexcept -> void +auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> void { - if (!m_shutdown_requested.exchange(true, std::memory_order::release)) + if (handle == nullptr) + { + return; + } + + m_size.fetch_add(1, std::memory_order::release); + schedule_impl(handle); +} + +auto thread_pool::shutdown() noexcept -> void +{ + // Only allow shutdown to occur once. + if (m_shutdown_requested.exchange(true, std::memory_order::acq_rel) == false) { for (auto& thread : m_threads) { thread.request_stop(); } - if (wait_for_tasks == shutdown_t::sync) + for (auto& thread : m_threads) { - for (auto& thread : m_threads) + if (thread.joinable()) { - if (thread.joinable()) - { - thread.join(); - } + thread.join(); } } } @@ -115,37 +126,4 @@ auto thread_pool::schedule_impl(std::coroutine_handle<> handle) noexcept -> void m_wait_cv.notify_one(); } -auto thread_pool::resume(std::coroutine_handle<> handle) noexcept -> void -{ - if (handle == nullptr) - { - return; - } - - m_size.fetch_add(1, std::memory_order::relaxed); - schedule_impl(handle); -} - -auto thread_pool::resume(const std::vector>& handles) noexcept -> void -{ - m_size.fetch_add(handles.size(), std::memory_order::relaxed); - - { - std::scoped_lock lk{m_wait_mutex}; - for (const auto& handle : handles) - { - if (handle != nullptr) [[likely]] - { - m_queue.emplace_back(handle); - } - else - { - m_size.fetch_sub(1, std::memory_order::release); - } - } - } - - m_wait_cv.notify_one(); -} - } // namespace coro diff --git a/test/bench.cpp b/test/bench.cpp index d396a81..5517a66 100644 --- a/test/bench.cpp +++ b/test/bench.cpp @@ -337,19 +337,19 @@ TEST_CASE("benchmark counter task scheduler await event from another coroutine", REQUIRE(s.empty()); } -TEST_CASE("benchmark tcp_server echo server", "[benchmark]") +TEST_CASE("benchmark tcp_server echo server thread pool", "[benchmark]") { - const constexpr std::size_t connections = 16; - const constexpr std::size_t messages_per_connection = 10'000; + const constexpr std::size_t connections = 100; + const constexpr std::size_t messages_per_connection = 1'000; const constexpr std::size_t ops = connections * messages_per_connection; const std::string msg = "im a data point in a stream of bytes"; - const constexpr std::size_t server_count = 1; - const constexpr std::size_t client_count = 1; + const constexpr std::size_t server_count = 5; + const constexpr std::size_t client_count = 5; - const constexpr std::size_t server_thread_count = 1; - const constexpr std::size_t client_thread_count = 1; + const constexpr std::size_t server_thread_count = 4; + const constexpr std::size_t client_thread_count = 4; std::atomic listening{0}; std::atomic accepted{0}; @@ -359,18 +359,20 @@ TEST_CASE("benchmark tcp_server echo server", "[benchmark]") struct server { - uint64_t id; - coro::io_scheduler scheduler{ - coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = server_thread_count}}}; - coro::task_container task_container{scheduler}; - uint64_t live_clients{0}; - coro::event wait_for_clients{}; + uint64_t id; + coro::io_scheduler scheduler{coro::io_scheduler::options{ + .pool = coro::thread_pool::options{.thread_count = server_thread_count}, + .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool}}; + coro::task_container task_container{scheduler}; + uint64_t live_clients{0}; + coro::event wait_for_clients{}; }; struct client { - coro::io_scheduler scheduler{ - coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = client_thread_count}}}; + coro::io_scheduler scheduler{coro::io_scheduler::options{ + .pool = coro::thread_pool::options{.thread_count = client_thread_count}, + .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool}}; std::vector> tasks{}; }; @@ -526,10 +528,204 @@ TEST_CASE("benchmark tcp_server echo server", "[benchmark]") } auto stop = sc::now(); - print_stats("benchmark tcp_client and tcp_server", ops, start, stop); + print_stats("benchmark tcp_client and tcp_server thread_pool", ops, start, stop); for (const auto& [ms, count] : g_histogram) { std::cerr << ms.count() << " : " << count << "\n"; } } + +TEST_CASE("benchmark tcp_server echo server inline", "[benchmark]") +{ + const constexpr std::size_t connections = 100; + const constexpr std::size_t messages_per_connection = 1'000; + const constexpr std::size_t ops = connections * messages_per_connection; + + const std::string msg = "im a data point in a stream of bytes"; + + const constexpr std::size_t server_count = 10; + const constexpr std::size_t client_count = 10; + + std::atomic listening{0}; + std::atomic accepted{0}; + std::atomic clients_completed{0}; + + std::atomic server_id{0}; + + using estrat = coro::io_scheduler::execution_strategy_t; + + struct server + { + uint64_t id; + coro::io_scheduler scheduler{coro::io_scheduler::options{.execution_strategy = estrat::process_tasks_inline}}; + coro::task_container task_container{scheduler}; + uint64_t live_clients{0}; + coro::event wait_for_clients{}; + }; + + struct client + { + coro::io_scheduler scheduler{coro::io_scheduler::options{.execution_strategy = estrat::process_tasks_inline}}; + std::vector> tasks{}; + }; + + auto make_on_connection_task = [&](server& s, coro::net::tcp_client client) -> coro::task { + std::string in(64, '\0'); + + // Echo the messages until the socket is closed. + while (true) + { + auto pstatus = co_await client.poll(coro::poll_op::read); + REQUIRE(pstatus == coro::poll_status::event); + + auto [rstatus, rspan] = client.recv(in); + if (rstatus == coro::net::recv_status::closed) + { + REQUIRE(rspan.empty()); + break; + } + REQUIRE(rstatus == coro::net::recv_status::ok); + + in.resize(rspan.size()); + + auto [sstatus, remaining] = client.send(in); + REQUIRE(sstatus == coro::net::send_status::ok); + REQUIRE(remaining.empty()); + } + + s.live_clients--; + if (s.live_clients == 0) + { + s.wait_for_clients.set(); + } + co_return; + }; + + auto make_server_task = [&](server& s) -> coro::task { + co_await s.scheduler.schedule(); + + coro::net::tcp_server server{s.scheduler}; + + listening++; + + while (accepted.load(std::memory_order::acquire) < connections) + { + auto pstatus = co_await server.poll(std::chrono::milliseconds{1}); + if (pstatus == coro::poll_status::event) + { + auto c = server.accept(); + if (c.socket().is_valid()) + { + accepted.fetch_add(1, std::memory_order::release); + + s.live_clients++; + s.task_container.start(make_on_connection_task(s, std::move(c))); + } + } + } + + co_await s.wait_for_clients; + co_return; + }; + + std::mutex g_histogram_mutex; + std::map g_histogram; + + auto make_client_task = [&](client& c) -> coro::task { + co_await c.scheduler.schedule(); + std::map histogram; + coro::net::tcp_client client{c.scheduler}; + + auto cstatus = co_await client.connect(); // std::chrono::seconds{1}); + REQUIRE(cstatus == coro::net::connect_status::connected); + + for (size_t i = 1; i <= messages_per_connection; ++i) + { + auto req_start = std::chrono::steady_clock::now(); + auto [sstatus, remaining] = client.send(msg); + REQUIRE(sstatus == coro::net::send_status::ok); + REQUIRE(remaining.empty()); + + auto pstatus = co_await client.poll(coro::poll_op::read); + REQUIRE(pstatus == coro::poll_status::event); + + std::string response(64, '\0'); + auto [rstatus, rspan] = client.recv(response); + REQUIRE(rstatus == coro::net::recv_status::ok); + REQUIRE(rspan.size() == msg.size()); + response.resize(rspan.size()); + REQUIRE(response == msg); + + auto req_stop = std::chrono::steady_clock::now(); + histogram[std::chrono::duration_cast(req_stop - req_start)]++; + } + + { + std::scoped_lock lk{g_histogram_mutex}; + for (auto [ms, count] : histogram) + { + g_histogram[ms] += count; + } + } + + clients_completed.fetch_add(1); + + co_return; + }; + + auto start = sc::now(); + + // Create the server to accept incoming tcp connections. + std::vector server_threads{}; + for (size_t i = 0; i < server_count; ++i) + { + server_threads.emplace_back(std::thread{[&]() { + server s{}; + s.id = server_id++; + coro::sync_wait(make_server_task(s)); + s.scheduler.shutdown(); + }}); + } + + // The server can take a small bit of time to start up, if we don't wait for it to notify then + // the first few connections can easily fail to connect causing this test to fail. + while (listening != server_count) + { + std::this_thread::sleep_for(std::chrono::milliseconds{1}); + } + + // Spawn N client connections across a set number of clients. + std::vector client_threads{}; + std::vector clients{}; + for (size_t i = 0; i < client_count; ++i) + { + client_threads.emplace_back(std::thread{[&]() { + client c{}; + for (size_t i = 0; i < connections / client_count; ++i) + { + c.tasks.emplace_back(make_client_task(c)); + } + coro::sync_wait(coro::when_all(std::move(c.tasks))); + c.scheduler.shutdown(); + }}); + } + + for (auto& ct : client_threads) + { + ct.join(); + } + + for (auto& st : server_threads) + { + st.join(); + } + + auto stop = sc::now(); + print_stats("benchmark tcp_client and tcp_server inline", ops, start, stop); + + for (const auto& [ms, count] : g_histogram) + { + std::cerr << ms.count() << " : " << count << "\n"; + } +} \ No newline at end of file diff --git a/test/net/test_dns_resolver.cpp b/test/net/test_dns_resolver.cpp index 0ad8997..600f5c5 100644 --- a/test/net/test_dns_resolver.cpp +++ b/test/net/test_dns_resolver.cpp @@ -4,7 +4,7 @@ #include -TEST_CASE("dns_resolver basic") +TEST_CASE("dns_resolver basic", "[dns]") { coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; coro::net::dns_resolver dns_resolver{scheduler, std::chrono::milliseconds{5000}}; @@ -26,6 +26,8 @@ TEST_CASE("dns_resolver basic") coro::sync_wait(make_host_by_name_task(coro::net::hostname{"www.example.com"})); + std::cerr << "io_scheduler.size() before shutdown = " << scheduler.size() << "\n"; scheduler.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << scheduler.size() << "\n"; REQUIRE(scheduler.empty()); } \ No newline at end of file diff --git a/test/test_io_scheduler.cpp b/test/test_io_scheduler.cpp index e156a79..f3ef002 100644 --- a/test/test_io_scheduler.cpp +++ b/test/test_io_scheduler.cpp @@ -27,7 +27,9 @@ TEST_CASE("io_scheduler schedule single task", "[io_scheduler]") auto value = coro::sync_wait(make_task()); REQUIRE(value == 42); + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; REQUIRE(s.empty()); } @@ -52,6 +54,11 @@ TEST_CASE("io_scheduler submit mutiple tasks", "[io_scheduler]") coro::sync_wait(coro::when_all(std::move(tasks))); REQUIRE(counter == n); + + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; + s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; + REQUIRE(s.empty()); } TEST_CASE("io_scheduler task with multiple events", "[io_scheduler]") @@ -83,7 +90,9 @@ TEST_CASE("io_scheduler task with multiple events", "[io_scheduler]") REQUIRE(counter == 3); + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; REQUIRE(s.empty()); } @@ -108,7 +117,9 @@ TEST_CASE("io_scheduler task with read poll", "[io_scheduler]") coro::sync_wait(coro::when_all(make_poll_read_task(), make_poll_write_task())); + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; REQUIRE(s.empty()); close(trigger_fd); } @@ -135,7 +146,9 @@ TEST_CASE("io_scheduler task with read poll with timeout", "[io_scheduler]") coro::sync_wait(coro::when_all(make_poll_read_task(), make_poll_write_task())); + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; REQUIRE(s.empty()); close(trigger_fd); } @@ -155,7 +168,9 @@ TEST_CASE("io_scheduler task with read poll timeout", "[io_scheduler]") coro::sync_wait(make_task()); + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; REQUIRE(s.empty()); close(trigger_fd); } @@ -289,7 +304,9 @@ TEST_CASE("io_scheduler separate thread resume with return", "[io_scheduler]") coro::sync_wait(make_task()); service.join(); + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; REQUIRE(s.empty()); } @@ -317,6 +334,11 @@ TEST_CASE("io_scheduler with basic task", "[io_scheduler]") auto counter = coro::sync_wait(func()); REQUIRE(counter == expected_value); + + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; + s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; + REQUIRE(s.empty()); } TEST_CASE("io_scheduler scheduler_after", "[io_scheduler]") @@ -344,7 +366,9 @@ TEST_CASE("io_scheduler scheduler_after", "[io_scheduler]") REQUIRE(counter == 1); REQUIRE(duration < wait_for); + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; REQUIRE(s.empty()); } @@ -360,7 +384,9 @@ TEST_CASE("io_scheduler scheduler_after", "[io_scheduler]") REQUIRE(counter == 2); REQUIRE(duration >= wait_for); + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; REQUIRE(s.empty()); } } @@ -432,6 +458,11 @@ TEST_CASE("io_scheduler yield", "[io_scheduler]") }; coro::sync_wait(func()); + + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; + s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; + REQUIRE(s.empty()); } TEST_CASE("io_scheduler yield_for", "[io_scheduler]") @@ -449,6 +480,11 @@ TEST_CASE("io_scheduler yield_for", "[io_scheduler]") auto duration = coro::sync_wait(make_task()); REQUIRE(duration >= wait_for); + + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; + s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; + REQUIRE(s.empty()); } TEST_CASE("io_scheduler yield_until", "[io_scheduler]") @@ -468,6 +504,11 @@ TEST_CASE("io_scheduler yield_until", "[io_scheduler]") auto duration = coro::sync_wait(make_task()); REQUIRE(duration >= (wait_for - epsilon)); + + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; + s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; + REQUIRE(s.empty()); } TEST_CASE("io_scheduler multipler event waiters", "[io_scheduler]") @@ -505,6 +546,11 @@ TEST_CASE("io_scheduler multipler event waiters", "[io_scheduler]") }; coro::sync_wait(coro::when_all(spawn(), release())); + + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; + s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; + REQUIRE(s.empty()); } TEST_CASE("io_scheduler self generating coroutine (stack overflow check)", "[io_scheduler]") @@ -541,9 +587,14 @@ TEST_CASE("io_scheduler self generating coroutine (stack overflow check)", "[io_ } REQUIRE(tasks.size() == total - 1); + + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; + s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; + REQUIRE(s.empty()); } -TEST_CASE("io_scheduler manual process events", "[io_scheduler]") +TEST_CASE("io_scheduler manual process events thread pool", "[io_scheduler]") { auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); coro::io_scheduler s{coro::io_scheduler::options{ @@ -555,17 +606,23 @@ TEST_CASE("io_scheduler manual process events", "[io_scheduler]") std::atomic polling{false}; auto make_poll_read_task = [&]() -> coro::task { + std::cerr << "poll task start s.size() == " << s.size() << "\n"; co_await s.schedule(); - polling = true; + polling = true; + std::cerr << "poll task polling s.size() == " << s.size() << "\n"; auto status = co_await s.poll(trigger_fd, coro::poll_op::read); REQUIRE(status == coro::poll_status::event); + std::cerr << "poll task exiting s.size() == " << s.size() << "\n"; co_return; }; auto make_poll_write_task = [&]() -> coro::task { + std::cerr << "write task start s.size() == " << s.size() << "\n"; co_await s.schedule(); uint64_t value{42}; + std::cerr << "write task writing s.size() == " << s.size() << "\n"; write(trigger_fd, &value, sizeof(value)); + std::cerr << "write task exiting s.size() == " << s.size() << "\n"; co_return; }; @@ -580,9 +637,64 @@ TEST_CASE("io_scheduler manual process events", "[io_scheduler]") write_task.resume(); - REQUIRE(s.process_events(100ms) == 1); + while (s.process_events(100ms) > 0) + ; + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; + REQUIRE(s.empty()); + close(trigger_fd); +} + +TEST_CASE("io_scheduler manual process events inline", "[io_scheduler]") +{ + auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + coro::io_scheduler s{coro::io_scheduler::options{ + .thread_strategy = coro::io_scheduler::thread_strategy_t::manual, + .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_inline}}; + + auto make_poll_read_task = [&]() -> coro::task { + std::cerr << "poll task start s.size() == " << s.size() << "\n"; + co_await s.schedule(); + std::cerr << "poll task polling s.size() == " << s.size() << "\n"; + auto status = co_await s.poll(trigger_fd, coro::poll_op::read); + REQUIRE(status == coro::poll_status::event); + std::cerr << "poll task exiting s.size() == " << s.size() << "\n"; + co_return; + }; + + auto make_poll_write_task = [&]() -> coro::task { + std::cerr << "write task start s.size() == " << s.size() << "\n"; + co_await s.schedule(); + uint64_t value{42}; + std::cerr << "write task writing s.size() == " << s.size() << "\n"; + write(trigger_fd, &value, sizeof(value)); + std::cerr << "write task exiting s.size() == " << s.size() << "\n"; + co_return; + }; + + auto poll_task = make_poll_read_task(); + auto write_task = make_poll_write_task(); + + // Start the tasks by scheduling them into the io scheduler. + poll_task.resume(); + write_task.resume(); + + // Now process them to completion. + while (true) + { + auto remaining = s.process_events(100ms); + std::cerr << "remaining " << remaining << "\n"; + if (remaining == 0) + { + break; + } + }; + + std::cerr << "io_scheduler.size() before shutdown = " << s.size() << "\n"; + s.shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << s.size() << "\n"; REQUIRE(s.empty()); close(trigger_fd); } diff --git a/test/test_shared_mutex.cpp b/test/test_shared_mutex.cpp index b3b2772..e087eca 100644 --- a/test/test_shared_mutex.cpp +++ b/test/test_shared_mutex.cpp @@ -10,9 +10,9 @@ TEST_CASE("mutex single waiter not locked exclusive", "[shared_mutex]") coro::thread_pool tp{coro::thread_pool::options{.thread_count = 1}}; std::vector output; - coro::shared_mutex m{tp}; + coro::shared_mutex m{tp}; - auto make_emplace_task = [&](coro::shared_mutex& m) -> coro::task { + auto make_emplace_task = [&](coro::shared_mutex& m) -> coro::task { std::cerr << "Acquiring lock exclusive\n"; { auto scoped_lock = co_await m.lock(); @@ -44,9 +44,9 @@ TEST_CASE("mutex single waiter not locked shared", "[shared_mutex]") coro::thread_pool tp{coro::thread_pool::options{.thread_count = 1}}; std::vector values{1, 2, 3}; - coro::shared_mutex m{tp}; + coro::shared_mutex m{tp}; - auto make_emplace_task = [&](coro::shared_mutex& m) -> coro::task { + auto make_emplace_task = [&](coro::shared_mutex& m) -> coro::task { std::cerr << "Acquiring lock shared\n"; { auto scoped_lock = co_await m.lock_shared(); @@ -81,7 +81,7 @@ TEST_CASE("mutex single waiter not locked shared", "[shared_mutex]") TEST_CASE("mutex many shared and exclusive waiters interleaved", "[shared_mutex]") { coro::io_scheduler tp{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 8}}}; - coro::shared_mutex m{tp}; + coro::shared_mutex m{tp}; std::atomic read_value{false}; diff --git a/test/test_thread_pool.cpp b/test/test_thread_pool.cpp index 9c9a197..f393df1 100644 --- a/test/test_thread_pool.cpp +++ b/test/test_thread_pool.cpp @@ -125,7 +125,7 @@ TEST_CASE("thread_pool shutdown", "[thread_pool]") co_return false; }; - tp.shutdown(coro::shutdown_t::async); + tp.shutdown(); REQUIRE(coro::sync_wait(f(tp)) == true); }