#pragma once #include "coro/task.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace coro { class scheduler; namespace detail { class resume_token_base { public: resume_token_base(scheduler* eng) noexcept : m_scheduler(eng), m_state(nullptr) {} virtual ~resume_token_base() = default; resume_token_base(const resume_token_base&) = delete; resume_token_base(resume_token_base&& other) { m_scheduler = other.m_scheduler; m_state = other.m_state.exchange(0); other.m_scheduler = nullptr; } auto operator=(const resume_token_base&) -> resume_token_base& = delete; auto operator =(resume_token_base&& other) -> resume_token_base& { if (std::addressof(other) != this) { m_scheduler = other.m_scheduler; m_state = other.m_state.exchange(0); other.m_scheduler = nullptr; } return *this; } bool is_set() const noexcept { return m_state.load(std::memory_order::acquire) == this; } struct awaiter { awaiter(const resume_token_base& token) noexcept : m_token(token) {} auto await_ready() const noexcept -> bool { return m_token.is_set(); } auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool { const void* const set_state = &m_token; m_awaiting_coroutine = awaiting_coroutine; // This value will update if other threads write to it via acquire. void* old_value = m_token.m_state.load(std::memory_order::acquire); do { // Resume immediately if already in the set state. if (old_value == set_state) { return false; } m_next = static_cast(old_value); } while (!m_token.m_state.compare_exchange_weak( old_value, this, std::memory_order::release, std::memory_order::acquire)); return true; } auto await_resume() noexcept { // no-op } const resume_token_base& m_token; std::coroutine_handle<> m_awaiting_coroutine; awaiter* m_next{nullptr}; }; auto operator co_await() const noexcept -> awaiter { return awaiter{*this}; } auto reset() noexcept -> void { void* old_value = this; m_state.compare_exchange_strong(old_value, nullptr, std::memory_order::acquire); } protected: friend struct awaiter; scheduler* m_scheduler{nullptr}; mutable std::atomic m_state; }; } // namespace detail template class resume_token final : public detail::resume_token_base { friend scheduler; resume_token() : detail::resume_token_base(nullptr) {} resume_token(scheduler& s) : detail::resume_token_base(&s) {} public: ~resume_token() override = default; resume_token(const resume_token&) = delete; resume_token(resume_token&&) = default; auto operator=(const resume_token&) -> resume_token& = delete; auto operator=(resume_token &&) -> resume_token& = default; auto resume(return_type value) noexcept -> void; auto return_value() const& -> const return_type& { return m_return_value; } auto return_value() && -> return_type&& { return std::move(m_return_value); } private: return_type m_return_value; }; template<> class resume_token final : public detail::resume_token_base { friend scheduler; resume_token() : detail::resume_token_base(nullptr) {} resume_token(scheduler& s) : detail::resume_token_base(&s) {} public: ~resume_token() override = default; resume_token(const resume_token&) = delete; resume_token(resume_token&&) = default; auto operator=(const resume_token&) -> resume_token& = delete; auto operator=(resume_token &&) -> resume_token& = default; auto resume() noexcept -> void; }; enum class poll_op { /// Poll for read operations. read = EPOLLIN, /// Poll for write operations. write = EPOLLOUT, /// Poll for read and write operations. read_write = EPOLLIN | EPOLLOUT }; class scheduler { private: using task_variant = std::variant, std::coroutine_handle<>>; using task_queue = std::deque; /// resume_token needs to be able to call internal scheduler::resume() template friend class resume_token; struct task_data { /// The user's task, lifetime is maintained by the scheduler. coro::task m_user_task; /// The post processing cleanup tasks to remove a completed task from the scheduler. coro::task m_cleanup_task; }; class task_manager { public: using task_position = std::list::iterator; task_manager(const std::size_t reserve_size, const double growth_factor) : m_growth_factor(growth_factor) { m_tasks.resize(reserve_size); for (std::size_t i = 0; i < reserve_size; ++i) { m_task_indexes.emplace_back(i); } m_free_pos = m_task_indexes.begin(); } /** * Stores a users task and sets a continuation coroutine to automatically mark the task * as deleted upon the coroutines completion. * @param user_task The scheduled user's task to store since it has suspended after its * first execution. */ auto store(coro::task user_task) -> void { // Only grow if completely full and attempting to add more. if (m_free_pos == m_task_indexes.end()) { m_free_pos = grow(); } // Store the user task with its cleanup task to maintain their lifetimes until completed. auto index = *m_free_pos; auto& task_data = m_tasks[index]; task_data.m_user_task = std::move(user_task); task_data.m_cleanup_task = cleanup_func(m_free_pos); // Attach the cleanup task to be the continuation after the users task. task_data.m_user_task.promise().continuation(task_data.m_cleanup_task.handle()); // Mark the current used slot as used. std::advance(m_free_pos, 1); } /** * Garbage collects any tasks that are marked as deleted. * @return The number of tasks that were deleted. */ auto gc() -> std::size_t { std::size_t deleted{0}; if (!m_tasks_to_delete.empty()) { for (const auto& pos : m_tasks_to_delete) { // This doesn't actually 'delete' the task, it'll get overwritten when a // new user task claims the free space. It could be useful to actually // delete the tasks so the coroutine stack frames are destroyed. The advantage // of letting a new task replace and old one though is that its a 1:1 exchange // on delete and create, rather than a large pause here to delete all the // completed tasks. // Put the deleted position at the end of the free indexes list. m_task_indexes.splice(m_task_indexes.end(), m_task_indexes, pos); } deleted = m_tasks_to_delete.size(); m_tasks_to_delete.clear(); } return deleted; } /** * @return The number of tasks that are awaiting deletion. */ auto delete_task_size() const -> std::size_t { return m_tasks_to_delete.size(); } /** * @return True if there are no tasks awaiting deletion. */ auto delete_tasks_empty() const -> bool { return m_tasks_to_delete.empty(); } /** * @return The capacity of this task manager before it will need to grow in size. */ auto capacity() const -> std::size_t { return m_tasks.size(); } private: /** * Grows each task container by the growth factor. * @return The position of the free index after growing. */ auto grow() -> task_position { // Save an index at the current last item. auto last_pos = std::prev(m_task_indexes.end()); std::size_t new_size = m_tasks.size() * m_growth_factor; for (std::size_t i = m_tasks.size(); i < new_size; ++i) { m_task_indexes.emplace_back(i); } m_tasks.resize(new_size); // Set the free pos to the item just after the previous last item. return std::next(last_pos); } /** * Each task the user schedules has this task chained as a continuation to execute after * the user's task completes. This function takes the task position in the indexes list * and upon execution marks that slot for deletion. It cannot self delete otherwise it * would corrupt/double free its own coroutine stack frame. */ auto cleanup_func(task_position pos) -> coro::task { // Mark this task for deletion, it cannot delete itself. m_tasks_to_delete.push_back(pos); co_return; }; /// Maintains the lifetime of the tasks until they are completed. std::vector m_tasks{}; /// The full set of indexes into `m_tasks`. std::list m_task_indexes{}; /// The set of tasks that have completed and need to be deleted. std::vector m_tasks_to_delete{}; /// The current free position within the task indexes list. Anything before /// this point is used, itself and anything after is free. task_position m_free_pos{}; /// The amount to grow the containers by when all spaces are taken. double m_growth_factor{}; }; static constexpr const int m_accept_object{0}; static constexpr const void* m_accept_ptr = &m_accept_object; public: using fd_t = int; enum class shutdown_t { /// Synchronously wait for all tasks to complete when calling shutdown. sync, /// Asynchronously let tasks finish on the background thread on shutdown. async }; enum class thread_strategy_t { /// Spawns a background thread for the scheduler to run on. spawn, /// Adopts this thread as the scheduler thread. adopt, /// Requires the user to call process_events() to drive the scheduler manual }; struct options { /// The number of tasks to reserve space for upon creating the scheduler. std::size_t reserve_size{8}; /// The growth factor for task space when capacity is full. double growth_factor{2}; /// The threading strategy. thread_strategy_t thread_strategy{thread_strategy_t::spawn}; }; /** * @param options Various scheduler options to tune how it behaves. */ scheduler(const options opts = options{8, 2, thread_strategy_t::spawn}) : m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)), m_accept_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)), m_thread_strategy(opts.thread_strategy), m_task_manager(opts.reserve_size, opts.growth_factor) { struct epoll_event e { }; e.events = EPOLLIN; e.data.ptr = const_cast(m_accept_ptr); epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_accept_fd, &e); if (m_thread_strategy == thread_strategy_t::spawn) { m_scheduler_thread = std::thread([this] { process_events_dedicated_thread(); }); } else if (m_thread_strategy == thread_strategy_t::adopt) { process_events_dedicated_thread(); } // else manual mode, the user must call process_events. } scheduler(const scheduler&) = delete; scheduler(scheduler&&) = delete; auto operator=(const scheduler&) -> scheduler& = delete; auto operator=(scheduler &&) -> scheduler& = delete; ~scheduler() { shutdown(); if (m_epoll_fd != -1) { close(m_epoll_fd); m_epoll_fd = -1; } if (m_accept_fd != -1) { close(m_accept_fd); m_accept_fd = -1; } } /** * Schedules a task to be run as soon as possible. This pushes the task into a FIFO queue. * @param task The task to schedule as soon as possible. * @return True if the task has been scheduled, false if scheduling failed. Currently the only * way for this to fail is if the scheduler is trying to shutdown. */ auto schedule(coro::task task) -> bool { if (m_shutdown_requested.load(std::memory_order::relaxed)) { return false; } // This function intentionally does not check to see if its executing on the thread that is // processing events. If the given task recursively generates tasks it will result in a // stack overflow very quickly. Instead it takes the long path of adding it to the FIFO // queue and processing through the normal pipeline. This simplifies the code and also makes // the order in which newly submitted tasks are more fair in regards to FIFO. m_size.fetch_add(1, std::memory_order::relaxed); { std::lock_guard lk{m_accept_mutex}; m_accept_queue.emplace_back(std::move(task)); } // Send an event if one isn't already set. We use strong here to avoid spurious failures // but if it fails due to it actually being set we don't want to retry. bool expected{false}; if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed)) { uint64_t value{1}; ::write(m_accept_fd, &value, sizeof(value)); } return true; } /** * Schedules a task to be run after waiting for a certain period of time. * @param task The task to schedule after waiting `after` amount of time. * @return True if the task has been scheduled, false if scheduling failed. Currently the only * way for this to fail is if the scheduler is trying to shutdown. */ auto schedule_after(coro::task task, std::chrono::milliseconds after) -> bool { if (m_shutdown_requested.load(std::memory_order::relaxed)) { return false; } return schedule(scheduler_after_func(std::move(task), after)); } /** * Polls a specific file descriptor for the given poll operation. * @param fd The file descriptor to poll. * @param op The type of poll operation to perform. */ auto poll(fd_t fd, poll_op op) -> coro::task { co_await unsafe_yield([&](resume_token& token) { struct epoll_event e { }; e.events = static_cast(op) | EPOLLONESHOT | EPOLLET; e.data.ptr = &token; epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e); }); epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, nullptr); } /** * This function will first poll the given `fd` to make sure it can be read from. Once notified * that the `fd` has data available to read the given `buffer` is filled with up to the buffer's * size of data. The number of bytes read is returned. * @param fd The file desriptor to read from. * @param buffer The buffer to place read bytes into. * @return The number of bytes read or an error code if negative. */ auto read(fd_t fd, std::span buffer) -> coro::task { co_await poll(fd, poll_op::read); co_return ::read(fd, buffer.data(), buffer.size()); } /** * This function will first poll the given `fd` to make sure it can be written to. Once notified * that the `fd` can be written to the given buffer's contents are written to the `fd`. * @param fd The file descriptor to write the contents of `buffer` to. * @param buffer The data to write to `fd`. * @return The number of bytes written or an error code if negative. */ auto write(fd_t fd, const std::span buffer) -> coro::task { co_await poll(fd, poll_op::write); co_return ::write(fd, buffer.data(), buffer.size()); ; } /** * Immediately yields the current task and provides a resume token to resume this yielded * coroutine when the async operation has completed. * * Normal usage of this might look like: * \code scheduler.yield([](coro::resume_token& t) { auto on_service_complete = [&]() { t.resume(); // This resume call will resume the task on the scheduler thread. }; service.do_work(on_service_complete); }); * \endcode * The above example will yield the current task and then through the 3rd party service's * on complete callback function let the scheduler know that it should resume execution of the task. * * @tparam func Functor to invoke with the yielded coroutine handle to be resumed. * @param f Immediately invoked functor with the yield point coroutine handle to resume with. * @return A task to co_await until the manual `scheduler::resume(handle)` is called. */ template&> before_functor> auto yield(before_functor before) -> coro::task { resume_token token{*this}; before(token); co_await token; if constexpr (std::is_same_v) { co_return; } else { co_return token.return_value(); } } /** * User provided resume token to yield the current coroutine until the token's resume is called. * Its also possible to co_await a resume token inline without calling this yield function. * @param token Resume token to await the current task on. Use scheduer::generate_resume_token() to * generate a resume token to use on this scheduer. */ template auto yield(resume_token& token) -> coro::task { co_await token; co_return; } /** * Yields the current coroutine for `amount` of time. * @throw std::runtime_error If the internal system failed to setup required resources to wait. * @param amount The amount of time to wait. */ auto yield_for(std::chrono::milliseconds amount) -> coro::task { fd_t timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (timer_fd == -1) { std::string msg = "Failed to create timerfd errorno=[" + std::string{strerror(errno)} + "]."; throw std::runtime_error(msg.data()); } struct itimerspec ts { }; auto seconds = std::chrono::duration_cast(amount); amount -= seconds; auto nanoseconds = std::chrono::duration_cast(amount); ts.it_value.tv_sec = seconds.count(); ts.it_value.tv_nsec = nanoseconds.count(); if (timerfd_settime(timer_fd, 0, &ts, nullptr) == -1) { std::string msg = "Failed to set timerfd errorno=[" + std::string{strerror(errno)} + "]."; throw std::runtime_error(msg.data()); } uint64_t value{0}; co_await read(timer_fd, std::span{reinterpret_cast(&value), sizeof(value)}); close(timer_fd); co_return; } /** * Generates a resume token that can be used to resume an executing task. * @tparam The return type of resuming the async operation. * @return Resume token with the given return type. */ template auto generate_resume_token() -> resume_token { return resume_token(*this); } auto process_events(std::chrono::milliseconds timeout = std::chrono::milliseconds{1000}) -> std::size_t { process_events_external_thread(timeout); return m_size.load(std::memory_order::relaxed); } /** * @return The number of active tasks still executing and unprocessed submitted tasks. */ auto size() const -> std::size_t { return m_size.load(std::memory_order::relaxed); } /** * @return True if there are no tasks executing or waiting to be executed in this scheduler. */ auto empty() const -> bool { return size() == 0; } /** * @return The maximum number of tasks this scheduler can process without growing. */ auto capacity() const -> std::size_t { return m_task_manager.capacity(); } /** * Is there a thread processing this schedulers events? * If this is in thread strategy spawn or adopt this will always be true until shutdown. */ auto is_running() const noexcept -> bool { return m_running.load(std::memory_order::relaxed); } /** * @return True if this scheduler has been requested to shutdown. */ auto is_shutdown() const noexcept -> bool { return m_shutdown_requested.load(std::memory_order::relaxed); } /** * Requests the scheduler to finish processing all of its current tasks and shutdown. * New tasks submitted via `scheduler::schedule()` will be rejected after this is called. * @param wait_for_tasks This call will block until all tasks are complete if shutdown_t::sync * is passed in, if shutdown_t::async is passed this function will tell * the scheduler to shutdown but not wait for all tasks to complete, it returns * immediately. */ auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) -> void { if (!m_shutdown_requested.exchange(true, std::memory_order::release)) { // Signal the event loop to stop asap. uint64_t value{1}; ::write(m_accept_fd, &value, sizeof(value)); if (wait_for_tasks == shutdown_t::sync && m_scheduler_thread.joinable()) { m_scheduler_thread.join(); } } } private: /// The event loop epoll file descriptor. fd_t m_epoll_fd{-1}; /// The event loop accept new tasks and resume tasks file descriptor. fd_t m_accept_fd{-1}; /// The threading strategy this scheduler is using. thread_strategy_t m_thread_strategy; /// Is this scheduler currently running? Manual mode might not always be running. std::atomic m_running{false}; /// Has the scheduler been requested to shutdown? std::atomic m_shutdown_requested{false}; /// If running in threading mode spawn the background thread to process events. std::thread m_scheduler_thread; /// FIFO queue for new and resumed tasks to execute. task_queue m_accept_queue{}; std::mutex m_accept_mutex{}; /// Has a thread sent an event? (E.g. avoid a kernel write/read?). std::atomic m_event_set{false}; /// The total number of tasks that are being processed or suspended. std::atomic m_size{0}; /// The maximum number of tasks to process inline before polling for more tasks. static constexpr const std::size_t task_inline_process_amount{64}; /// Pre-allocated memory area for tasks to process. std::array m_processing_tasks; task_manager m_task_manager; auto scheduler_after_func(coro::task inner_task, std::chrono::milliseconds wait_time) -> coro::task { // Seems to already be done. if (inner_task.is_ready()) { co_return; } // Wait for the period requested, and then resume their task. co_await yield_for(wait_time); inner_task.resume(); if (!inner_task.is_ready()) { m_task_manager.store(std::move(inner_task)); } co_return; } template&> before_functor> auto unsafe_yield(before_functor before) -> coro::task { resume_token token{}; before(token); co_await token; if constexpr (std::is_same_v) { co_return; } else { co_return token.return_value(); } } auto resume(std::coroutine_handle<> handle) -> void { { std::lock_guard lk{m_accept_mutex}; m_accept_queue.emplace_back(handle); } // Signal to the event loop there is a task to resume if one hasn't already been sent. bool expected{false}; if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed)) { uint64_t value{1}; ::write(m_accept_fd, &value, sizeof(value)); } } static constexpr std::chrono::milliseconds m_default_timeout{1000}; static constexpr std::chrono::milliseconds m_no_timeout{0}; static constexpr std::size_t m_max_events = 8; std::array m_events{}; auto task_start(coro::task& task) -> void { if (!task.is_ready()) // sanity check, the user could have manually resumed. { // Attempt to process the task synchronously before suspending. task.resume(); if (!task.is_ready()) { m_task_manager.store(std::move(task)); // This task is now suspended waiting for an event. } else { // This task completed synchronously. m_size.fetch_sub(1, std::memory_order::relaxed); } } else { m_size.fetch_sub(1, std::memory_order::relaxed); } } inline auto process_task_variant(task_variant& tv) -> void { if (std::holds_alternative>(tv)) { auto& task = std::get>(tv); task_start(task); } else { auto handle = std::get>(tv); if (!handle.done()) { handle.resume(); } } } auto process_task_queue() -> void { std::size_t amount{0}; { std::lock_guard lk{m_accept_mutex}; while (!m_accept_queue.empty() && amount < task_inline_process_amount) { m_processing_tasks[amount] = std::move(m_accept_queue.front()); m_accept_queue.pop_front(); ++amount; } } // The queue is empty, we are done here. if (amount == 0) { return; } for (std::size_t i = 0; i < amount; ++i) { process_task_variant(m_processing_tasks[i]); } } auto process_events_poll_execute(std::chrono::milliseconds user_timeout) -> void { // Need to acquire m_accept_queue size to determine if there are any pending tasks. std::atomic_thread_fence(std::memory_order::acquire); bool tasks_ready = !m_accept_queue.empty(); // bool tasks_ready = m_event_set.load(std::memory_order::acquire); auto timeout = (tasks_ready) ? m_no_timeout : user_timeout; // Poll is run every iteration to make sure 'waiting' events are properly put into // the FIFO queue for when they are ready. auto event_count = epoll_wait(m_epoll_fd, m_events.data(), m_max_events, timeout.count()); if (event_count > 0) { for (std::size_t i = 0; i < static_cast(event_count); ++i) { void* handle_ptr = m_events[i].data.ptr; if (handle_ptr == m_accept_ptr) { uint64_t value{0}; ::read(m_accept_fd, &value, sizeof(value)); (void)value; // discard, the read merely resets the eventfd counter to zero. // Let any threads scheduling work know that the event set has been consumed. // Important to do this after the accept file descriptor has been read. // This needs to succeed so best practice is to loop compare exchange weak. bool expected{true}; while (!m_event_set.compare_exchange_weak( expected, false, std::memory_order::release, std::memory_order::relaxed)) {} tasks_ready = true; } else { // Individual poll task wake-up, this will queue the coroutines waiting // on the resume token into the FIFO queue for processing. auto* token_ptr = static_cast*>(handle_ptr); token_ptr->resume(); } } } if (tasks_ready) { process_task_queue(); } if (!m_task_manager.delete_tasks_empty()) { m_size.fetch_sub(m_task_manager.gc(), std::memory_order::relaxed); } } auto process_events_external_thread(std::chrono::milliseconds user_timeout) -> void { // Do not allow two threads to process events at the same time. bool expected{false}; if (m_running.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed)) { process_events_poll_execute(user_timeout); m_running.exchange(false, std::memory_order::release); } } auto process_events_dedicated_thread() -> void { m_running.exchange(true, std::memory_order::release); // Execute tasks until stopped or there are more tasks to complete. while (!m_shutdown_requested.load(std::memory_order::relaxed) || m_size.load(std::memory_order::relaxed) > 0) { process_events_poll_execute(m_default_timeout); } m_running.exchange(false, std::memory_order::release); } }; template inline auto resume_token::resume(return_type value) noexcept -> void { void* old_value = m_state.exchange(this, std::memory_order::acq_rel); if (old_value != this) { m_return_value = std::move(value); auto* waiters = static_cast(old_value); while (waiters != nullptr) { // Intentionally not checking if this is running on the scheduler process event thread // as it can create a stack overflow if it triggers a 'resume chain'. unsafe_yield() // is guaranteed in this context to never be recursive and thus resuming directly // on the process event thread should not be able to trigger a stack overflow. auto* next = waiters->m_next; // If scheduler is nullptr this is an unsafe_yield() // If scheduler is present this is a yield() if (m_scheduler == nullptr) // || m_scheduler->this_thread_is_processing_events()) { waiters->m_awaiting_coroutine.resume(); } else { m_scheduler->resume(waiters->m_awaiting_coroutine); } waiters = next; } } } inline auto resume_token::resume() noexcept -> void { void* old_value = m_state.exchange(this, std::memory_order::acq_rel); if (old_value != this) { auto* waiters = static_cast(old_value); while (waiters != nullptr) { auto* next = waiters->m_next; if (m_scheduler == nullptr) { waiters->m_awaiting_coroutine.resume(); } else { m_scheduler->resume(waiters->m_awaiting_coroutine); } waiters = next; } } } } // namespace coro