diff --git a/.clang-format b/.clang-format index 609921a..fd2cf90 100644 --- a/.clang-format +++ b/.clang-format @@ -1,4 +1,5 @@ --- +AccessModifierOffset: -4 AlignAfterOpenBracket: AlwaysBreak AlignConsecutiveMacros: 'true' AlignConsecutiveAssignments: 'true' diff --git a/CMakeLists.txt b/CMakeLists.txt index 10f3c21..253e1fc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,7 +9,7 @@ message("${PROJECT_NAME} CORO_CODE_COVERAGE = ${CORO_CODE_COVERAGE}") set(LIBCORO_SOURCE_FILES inc/coro/coro.hpp - inc/coro/event.hpp + inc/coro/event.hpp src/event.cpp inc/coro/generator.hpp inc/coro/latch.hpp inc/coro/scheduler.hpp diff --git a/inc/coro/event.hpp b/inc/coro/event.hpp index 2b7fe96..1841928 100644 --- a/inc/coro/event.hpp +++ b/inc/coro/event.hpp @@ -1,108 +1,105 @@ #pragma once -#include -#include #include +#include namespace coro { - +/** + * Event is a manully triggered thread safe signal that can be co_await()'ed by multiple awaiters. + * Each awaiter should co_await the event and upon the event being set each awaiter will have their + * coroutine resumed. + * + * The event can be manually reset to the un-set state to be re-used. + * \code +t1: coro::event e; +... +t2: func(coro::event& e) { ... co_await e; ... } +... +t1: do_work(); +t1: e.set(); +... +t2: resume() + * \endcode + */ class event { public: - event(bool initially_set = false) noexcept - : m_state((initially_set) ? static_cast(this) : nullptr) - { - } - virtual ~event() = default; + /** + * Creates an event with the given initial state of being set or not set. + * @param initially_set By default all events start as not set, but if needed this parameter can + * set the event to already be triggered. + */ + explicit event(bool initially_set = false) noexcept; + ~event() = default; event(const event&) = delete; - event(event&&) = delete; + event(event&&) = delete; auto operator=(const event&) -> event& = delete; - auto operator=(event&&) -> event& = delete; + auto operator=(event &&) -> event& = delete; - bool is_set() const noexcept - { - return m_state.load(std::memory_order_acquire) == this; - } + /** + * @return True if this event is currently in the set state. + */ + auto is_set() const noexcept -> bool { return m_state.load(std::memory_order_acquire) == this; } - auto set() noexcept -> void - { - void* old_value = m_state.exchange(this, std::memory_order_acq_rel); - if(old_value != this) - { - auto* waiters = static_cast(old_value); - while(waiters != nullptr) - { - auto* next = waiters->m_next; - waiters->m_awaiting_coroutine.resume(); - waiters = next; - } - } - } + /** + * Sets this event and resumes all awaiters. + */ + auto set() noexcept -> void; struct awaiter { - awaiter(const event& event) noexcept - : m_event(event) - { + /** + * @param e The event to wait for it to be set. + */ + awaiter(const event& e) noexcept : m_event(e) {} - } + /** + * @return True if the event is already set, otherwise false to suspend this coroutine. + */ + auto await_ready() const noexcept -> bool { return m_event.is_set(); } - auto await_ready() const noexcept -> bool - { - return m_event.is_set(); - } + /** + * Adds this coroutine to the list of awaiters in a thread safe fashion. If the event + * is set while attempting to add this coroutine to the awaiters then this will return false + * to resume execution immediately. + * @return False if the event is already set, otherwise true to suspend this coroutine. + */ + auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool; - auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool - { - const void* const set_state = &m_event; - - m_awaiting_coroutine = awaiting_coroutine; - - // This value will update if other threads write to it via acquire. - void* old_value = m_event.m_state.load(std::memory_order_acquire); - do - { - // Resume immediately if already in the set state. - if(old_value == set_state) - { - return false; - } - - m_next = static_cast(old_value); - } while(!m_event.m_state.compare_exchange_weak( - old_value, - this, - std::memory_order_release, - std::memory_order_acquire)); - - return true; - } - - auto await_resume() noexcept - { - - } + /** + * Nothing to do on resume. + */ + auto await_resume() noexcept {} + /// Refernce to the event that this awaiter is waiting on. const event& m_event; + /// The awaiting continuation coroutine handle. std::coroutine_handle<> m_awaiting_coroutine; + /// The next awaiter in line for this event, nullptr if this is the end. awaiter* m_next{nullptr}; }; - auto operator co_await() const noexcept -> awaiter - { - return awaiter(*this); - } + /** + * @return An awaiter struct to suspend and resume this coroutine for when the event is set. + */ + auto operator co_await() const noexcept -> awaiter { return awaiter(*this); } - auto reset() noexcept -> void - { - void* old_value = this; - m_state.compare_exchange_strong(old_value, nullptr, std::memory_order_acquire); - } + /** + * Resets the event from set to not set so it can be re-used. If the event is not currently + * set then this function has no effect. + */ + auto reset() noexcept -> void; protected: + /// For access to m_state. friend struct awaiter; + /// The state of the event, nullptr is not set with zero awaiters. Set to an awaiter* there are + /// coroutines awaiting the event to be set, and set to this the event has triggered. + /// 1) nullptr == not set + /// 2) awaiter* == linked list of awaiters waiting for the event to trigger. + /// 3) this == The event is triggered and all awaiters are resumed. mutable std::atomic m_state; }; diff --git a/inc/coro/generator.hpp b/inc/coro/generator.hpp index 182c65c..086da41 100644 --- a/inc/coro/generator.hpp +++ b/inc/coro/generator.hpp @@ -5,130 +5,97 @@ namespace coro { - template class generator; namespace detail { - template class generator_promise { public: - using value_type = std::remove_reference_t; + using value_type = std::remove_reference_t; using reference_type = std::conditional_t, T, T&>; - using pointer_type = value_type*; + using pointer_type = value_type*; generator_promise() = default; auto get_return_object() noexcept -> generator; - auto initial_suspend() const - { - return std::suspend_always{}; - } + auto initial_suspend() const { return std::suspend_always{}; } - auto final_suspend() const - { - return std::suspend_always{}; - } + auto final_suspend() const { return std::suspend_always{}; } - template< - typename U = T, - std::enable_if_t::value, int> = 0> + template::value, int> = 0> auto yield_value(std::remove_reference_t& value) noexcept { - m_value = std::addressof(value); return std::suspend_always{}; } auto yield_value(std::remove_reference_t&& value) noexcept { - m_value = std::addressof(value); return std::suspend_always{}; } - auto unhandled_exception() -> void - { - m_exception = std::current_exception(); - } + auto unhandled_exception() -> void { m_exception = std::current_exception(); } - auto return_void() -> void - { + auto return_void() -> void {} - } - - auto value() const noexcept -> reference_type - { - return static_cast(*m_value); - } + auto value() const noexcept -> reference_type { return static_cast(*m_value); } template auto await_transform(U&& value) -> std::suspend_never = delete; auto rethrow_if_exception() -> void { - if(m_exception) + if (m_exception) { std::rethrow_exception(m_exception); } } + private: - pointer_type m_value{nullptr}; + pointer_type m_value{nullptr}; std::exception_ptr m_exception; }; -struct generator_sentinel {}; +struct generator_sentinel +{ +}; template class generator_iterator { using coroutine_handle = std::coroutine_handle>; + public: using iterator_category = std::input_iterator_tag; - using difference_type = std::ptrdiff_t; - using value_type = typename generator_promise::value_type; - using reference = typename generator_promise::reference_type; - using pointer = typename generator_promise::pointer_type; + using difference_type = std::ptrdiff_t; + using value_type = typename generator_promise::value_type; + using reference = typename generator_promise::reference_type; + using pointer = typename generator_promise::pointer_type; - generator_iterator() noexcept - { + generator_iterator() noexcept {} - } - - explicit generator_iterator(coroutine_handle coroutine) noexcept - : m_coroutine(coroutine) - { - - } + explicit generator_iterator(coroutine_handle coroutine) noexcept : m_coroutine(coroutine) {} friend auto operator==(const generator_iterator& it, generator_sentinel) noexcept -> bool { return it.m_coroutine == nullptr || it.m_coroutine.done(); } - friend auto operator!=(const generator_iterator& it, generator_sentinel s) noexcept -> bool - { - return !(it == s); - } + friend auto operator!=(const generator_iterator& it, generator_sentinel s) noexcept -> bool { return !(it == s); } - friend auto operator==(generator_sentinel s, const generator_iterator& it) noexcept -> bool - { - return (it == s); - } + friend auto operator==(generator_sentinel s, const generator_iterator& it) noexcept -> bool { return (it == s); } - friend auto operator!=(generator_sentinel s, const generator_iterator& it) noexcept -> bool - { - return it != s; - } + friend auto operator!=(generator_sentinel s, const generator_iterator& it) noexcept -> bool { return it != s; } generator_iterator& operator++() { m_coroutine.resume(); - if(m_coroutine.done()) + if (m_coroutine.done()) { m_coroutine.promise().rethrow_if_exception(); } @@ -136,20 +103,12 @@ public: return *this; } - auto operator++(int) -> void - { - (void)operator++(); - } + auto operator++(int) -> void { (void)operator++(); } - reference operator*() const noexcept - { - return m_coroutine.promise().value(); - } + reference operator*() const noexcept { return m_coroutine.promise().value(); } + + pointer operator->() const noexcept { return std::addressof(operator*()); } - pointer operator->() const noexcept - { - return std::addressof(operator*()); - } private: coroutine_handle m_coroutine{nullptr}; }; @@ -161,26 +120,18 @@ class generator { public: using promise_type = detail::generator_promise; - using iterator = detail::generator_iterator; - using sentinel = detail::generator_sentinel; + using iterator = detail::generator_iterator; + using sentinel = detail::generator_sentinel; - generator() noexcept - : m_coroutine(nullptr) - { - - } + generator() noexcept : m_coroutine(nullptr) {} generator(const generator&) = delete; - generator(generator&& other) noexcept - : m_coroutine(other.m_coroutine) - { - other.m_coroutine = nullptr; - } + generator(generator&& other) noexcept : m_coroutine(other.m_coroutine) { other.m_coroutine = nullptr; } auto operator=(const generator&) = delete; - auto operator=(generator&& other) noexcept -> generator& + auto operator =(generator&& other) noexcept -> generator& { - m_coroutine = other.m_coroutine; + m_coroutine = other.m_coroutine; other.m_coroutine = nullptr; return *this; @@ -188,7 +139,7 @@ public: ~generator() { - if(m_coroutine) + if (m_coroutine) { m_coroutine.destroy(); } @@ -196,10 +147,10 @@ public: auto begin() -> iterator { - if(m_coroutine != nullptr) + if (m_coroutine != nullptr) { m_coroutine.resume(); - if(m_coroutine.done()) + if (m_coroutine.done()) { m_coroutine.promise().rethrow_if_exception(); } @@ -208,27 +159,18 @@ public: return iterator{m_coroutine}; } - auto end() noexcept -> sentinel - { - return sentinel{}; - } - + auto end() noexcept -> sentinel { return sentinel{}; } private: friend class detail::generator_promise; - explicit generator(std::coroutine_handle coroutine) noexcept - : m_coroutine(coroutine) - { - - } + explicit generator(std::coroutine_handle coroutine) noexcept : m_coroutine(coroutine) {} std::coroutine_handle m_coroutine; }; namespace detail { - template auto generator_promise::get_return_object() noexcept -> generator { diff --git a/inc/coro/latch.hpp b/inc/coro/latch.hpp index f5087e4..90370c5 100644 --- a/inc/coro/latch.hpp +++ b/inc/coro/latch.hpp @@ -6,48 +6,33 @@ namespace coro { - class latch { public: - latch(std::ptrdiff_t count) noexcept - : m_count(count), - m_event(count <= 0) - { - - } + latch(std::ptrdiff_t count) noexcept : m_count(count), m_event(count <= 0) {} latch(const latch&) = delete; - latch(latch&&) = delete; + latch(latch&&) = delete; auto operator=(const latch&) -> latch& = delete; - auto operator=(latch&&) -> latch& = delete; + auto operator=(latch &&) -> latch& = delete; - auto is_ready() const noexcept -> bool - { - return m_event.is_set(); - } + auto is_ready() const noexcept -> bool { return m_event.is_set(); } - auto remaining() const noexcept -> std::size_t - { - return m_count.load(std::memory_order::acquire); - } + auto remaining() const noexcept -> std::size_t { return m_count.load(std::memory_order::acquire); } auto count_down(std::ptrdiff_t n = 1) noexcept -> void { - if(m_count.fetch_sub(n, std::memory_order::acq_rel) <= n) + if (m_count.fetch_sub(n, std::memory_order::acq_rel) <= n) { m_event.set(); } } - auto operator co_await() const noexcept -> event::awaiter - { - return m_event.operator co_await(); - } + auto operator co_await() const noexcept -> event::awaiter { return m_event.operator co_await(); } private: std::atomic m_count; - event m_event; + event m_event; }; } // namespace coro diff --git a/inc/coro/scheduler.hpp b/inc/coro/scheduler.hpp index 86826bc..a40fca9 100644 --- a/inc/coro/scheduler.hpp +++ b/inc/coro/scheduler.hpp @@ -3,31 +3,30 @@ #include "coro/task.hpp" #include -#include +#include +#include #include #include #include -#include -#include -#include -#include -#include -#include #include +#include +#include +#include +#include +#include +#include #include #include +#include #include #include -#include #include -#include #include namespace coro { - class scheduler; namespace detail @@ -35,11 +34,7 @@ namespace detail class resume_token_base { public: - resume_token_base(scheduler* eng) noexcept - : m_scheduler(eng), - m_state(nullptr) - { - } + resume_token_base(scheduler* eng) noexcept : m_scheduler(eng), m_state(nullptr) {} virtual ~resume_token_base() = default; @@ -47,18 +42,17 @@ public: resume_token_base(resume_token_base&& other) { m_scheduler = other.m_scheduler; - m_state = other.m_state.exchange(0); + m_state = other.m_state.exchange(0); other.m_scheduler = nullptr; - } auto operator=(const resume_token_base&) -> resume_token_base& = delete; - auto operator=(resume_token_base&& other) -> resume_token_base& + auto operator =(resume_token_base&& other) -> resume_token_base& { - if(std::addressof(other) != this) + if (std::addressof(other) != this) { m_scheduler = other.m_scheduler; - m_state = other.m_state.exchange(0); + m_state = other.m_state.exchange(0); other.m_scheduler = nullptr; } @@ -66,23 +60,13 @@ public: return *this; } - bool is_set() const noexcept - { - return m_state.load(std::memory_order::acquire) == this; - } + bool is_set() const noexcept { return m_state.load(std::memory_order::acquire) == this; } struct awaiter { - awaiter(const resume_token_base& token) noexcept - : m_token(token) - { + awaiter(const resume_token_base& token) noexcept : m_token(token) {} - } - - auto await_ready() const noexcept -> bool - { - return m_token.is_set(); - } + auto await_ready() const noexcept -> bool { return m_token.is_set(); } auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool { @@ -95,17 +79,14 @@ public: do { // Resume immediately if already in the set state. - if(old_value == set_state) + if (old_value == set_state) { return false; } m_next = static_cast(old_value); - } while(!m_token.m_state.compare_exchange_weak( - old_value, - this, - std::memory_order::release, - std::memory_order::acquire)); + } while (!m_token.m_state.compare_exchange_weak( + old_value, this, std::memory_order::release, std::memory_order::acquire)); return true; } @@ -116,14 +97,11 @@ public: } const resume_token_base& m_token; - std::coroutine_handle<> m_awaiting_coroutine; - awaiter* m_next{nullptr}; + std::coroutine_handle<> m_awaiting_coroutine; + awaiter* m_next{nullptr}; }; - auto operator co_await() const noexcept -> awaiter - { - return awaiter{*this}; - } + auto operator co_await() const noexcept -> awaiter { return awaiter{*this}; } auto reset() noexcept -> void { @@ -133,7 +111,7 @@ public: protected: friend struct awaiter; - scheduler* m_scheduler{nullptr}; + scheduler* m_scheduler{nullptr}; mutable std::atomic m_state; }; @@ -143,36 +121,23 @@ template class resume_token final : public detail::resume_token_base { friend scheduler; - resume_token() - : detail::resume_token_base(nullptr) - { + resume_token() : detail::resume_token_base(nullptr) {} + resume_token(scheduler& s) : detail::resume_token_base(&s) {} - } - resume_token(scheduler& s) - : detail::resume_token_base(&s) - { - - } public: - ~resume_token() override = default; resume_token(const resume_token&) = delete; - resume_token(resume_token&&) = default; + resume_token(resume_token&&) = default; auto operator=(const resume_token&) -> resume_token& = delete; - auto operator=(resume_token&&) -> resume_token& = default; + auto operator=(resume_token &&) -> resume_token& = default; auto resume(return_type value) noexcept -> void; - auto return_value() const & -> const return_type& - { - return m_return_value; - } + auto return_value() const& -> const return_type& { return m_return_value; } + + auto return_value() && -> return_type&& { return std::move(m_return_value); } - auto return_value() && -> return_type&& - { - return std::move(m_return_value); - } private: return_type m_return_value; }; @@ -181,23 +146,16 @@ template<> class resume_token final : public detail::resume_token_base { friend scheduler; - resume_token() - : detail::resume_token_base(nullptr) - { + resume_token() : detail::resume_token_base(nullptr) {} + resume_token(scheduler& s) : detail::resume_token_base(&s) {} - } - resume_token(scheduler& s) - : detail::resume_token_base(&s) - { - - } public: ~resume_token() override = default; resume_token(const resume_token&) = delete; - resume_token(resume_token&&) = default; + resume_token(resume_token&&) = default; auto operator=(const resume_token&) -> resume_token& = delete; - auto operator=(resume_token&&) -> resume_token& = default; + auto operator=(resume_token &&) -> resume_token& = default; auto resume() noexcept -> void; }; @@ -216,7 +174,7 @@ class scheduler { private: using task_variant = std::variant, std::coroutine_handle<>>; - using task_queue = std::deque; + using task_queue = std::deque; /// resume_token needs to be able to call internal scheduler::resume() template @@ -235,11 +193,10 @@ private: public: using task_position = std::list::iterator; - task_manager(const std::size_t reserve_size, const double growth_factor) - : m_growth_factor(growth_factor) + task_manager(const std::size_t reserve_size, const double growth_factor) : m_growth_factor(growth_factor) { m_tasks.resize(reserve_size); - for(std::size_t i = 0; i < reserve_size; ++i) + for (std::size_t i = 0; i < reserve_size; ++i) { m_task_indexes.emplace_back(i); } @@ -255,15 +212,15 @@ private: auto store(coro::task user_task) -> void { // Only grow if completely full and attempting to add more. - if(m_free_pos == m_task_indexes.end()) + if (m_free_pos == m_task_indexes.end()) { m_free_pos = grow(); } // Store the user task with its cleanup task to maintain their lifetimes until completed. - auto index = *m_free_pos; - auto& task_data = m_tasks[index]; - task_data.m_user_task = std::move(user_task); + auto index = *m_free_pos; + auto& task_data = m_tasks[index]; + task_data.m_user_task = std::move(user_task); task_data.m_cleanup_task = cleanup_func(m_free_pos); // Attach the cleanup task to be the continuation after the users task. @@ -280,9 +237,9 @@ private: auto gc() -> std::size_t { std::size_t deleted{0}; - if(!m_tasks_to_delete.empty()) + if (!m_tasks_to_delete.empty()) { - for(const auto& pos : m_tasks_to_delete) + for (const auto& pos : m_tasks_to_delete) { // This doesn't actually 'delete' the task, it'll get overwritten when a // new user task claims the free space. It could be useful to actually @@ -323,9 +280,9 @@ private: auto grow() -> task_position { // Save an index at the current last item. - auto last_pos = std::prev(m_task_indexes.end()); + auto last_pos = std::prev(m_task_indexes.end()); std::size_t new_size = m_tasks.size() * m_growth_factor; - for(std::size_t i = m_tasks.size(); i < new_size; ++i) + for (std::size_t i = m_tasks.size(); i < new_size; ++i) { m_task_indexes.emplace_back(i); } @@ -360,7 +317,7 @@ private: double m_growth_factor{}; }; - static constexpr const int m_accept_object{0}; + static constexpr const int m_accept_object{0}; static constexpr const void* m_accept_ptr = &m_accept_object; public: @@ -397,25 +354,25 @@ public: /** * @param options Various scheduler options to tune how it behaves. */ - scheduler( - const options opts = options{8, 2, thread_strategy_t::spawn} - ) - : m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)), - m_accept_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)), - m_thread_strategy(opts.thread_strategy), - m_task_manager(opts.reserve_size, opts.growth_factor) + scheduler(const options opts = options{8, 2, thread_strategy_t::spawn}) + : m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)), + m_accept_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)), + m_thread_strategy(opts.thread_strategy), + m_task_manager(opts.reserve_size, opts.growth_factor) { - struct epoll_event e{}; + struct epoll_event e + { + }; e.events = EPOLLIN; e.data.ptr = const_cast(m_accept_ptr); epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_accept_fd, &e); - if(m_thread_strategy == thread_strategy_t::spawn) + if (m_thread_strategy == thread_strategy_t::spawn) { m_scheduler_thread = std::thread([this] { process_events_dedicated_thread(); }); } - else if(m_thread_strategy == thread_strategy_t::adopt) + else if (m_thread_strategy == thread_strategy_t::adopt) { process_events_dedicated_thread(); } @@ -423,19 +380,19 @@ public: } scheduler(const scheduler&) = delete; - scheduler(scheduler&&) = delete; + scheduler(scheduler&&) = delete; auto operator=(const scheduler&) -> scheduler& = delete; - auto operator=(scheduler&&) -> scheduler& = delete; + auto operator=(scheduler &&) -> scheduler& = delete; ~scheduler() { shutdown(); - if(m_epoll_fd != -1) + if (m_epoll_fd != -1) { close(m_epoll_fd); m_epoll_fd = -1; } - if(m_accept_fd != -1) + if (m_accept_fd != -1) { close(m_accept_fd); m_accept_fd = -1; @@ -450,7 +407,7 @@ public: */ auto schedule(coro::task task) -> bool { - if(m_shutdown_requested.load(std::memory_order::relaxed)) + if (m_shutdown_requested.load(std::memory_order::relaxed)) { return false; } @@ -470,11 +427,7 @@ public: // Send an event if one isn't already set. We use strong here to avoid spurious failures // but if it fails due to it actually being set we don't want to retry. bool expected{false}; - if(m_event_set.compare_exchange_strong( - expected, - true, - std::memory_order::release, - std::memory_order::relaxed)) + if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed)) { uint64_t value{1}; ::write(m_accept_fd, &value, sizeof(value)); @@ -491,7 +444,7 @@ public: */ auto schedule_after(coro::task task, std::chrono::milliseconds after) -> bool { - if(m_shutdown_requested.load(std::memory_order::relaxed)) + if (m_shutdown_requested.load(std::memory_order::relaxed)) { return false; } @@ -506,15 +459,14 @@ public: */ auto poll(fd_t fd, poll_op op) -> coro::task { - co_await unsafe_yield( - [&](resume_token& token) + co_await unsafe_yield([&](resume_token& token) { + struct epoll_event e { - struct epoll_event e{}; - e.events = static_cast(op) | EPOLLONESHOT | EPOLLET; - e.data.ptr = &token; - epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e); - } - ); + }; + e.events = static_cast(op) | EPOLLONESHOT | EPOLLET; + e.data.ptr = &token; + epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e); + }); epoll_ctl(m_epoll_fd, EPOLL_CTL_DEL, fd, nullptr); } @@ -543,7 +495,8 @@ public: auto write(fd_t fd, const std::span buffer) -> coro::task { co_await poll(fd, poll_op::write); - co_return ::write(fd, buffer.data(), buffer.size());; + co_return ::write(fd, buffer.data(), buffer.size()); + ; } /** @@ -603,22 +556,24 @@ public: auto yield_for(std::chrono::milliseconds amount) -> coro::task { fd_t timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); - if(timer_fd == -1) + if (timer_fd == -1) { std::string msg = "Failed to create timerfd errorno=[" + std::string{strerror(errno)} + "]."; throw std::runtime_error(msg.data()); } - struct itimerspec ts{}; + struct itimerspec ts + { + }; auto seconds = std::chrono::duration_cast(amount); amount -= seconds; auto nanoseconds = std::chrono::duration_cast(amount); - ts.it_value.tv_sec = seconds.count(); + ts.it_value.tv_sec = seconds.count(); ts.it_value.tv_nsec = nanoseconds.count(); - if(timerfd_settime(timer_fd, 0, &ts, nullptr) == -1) + if (timerfd_settime(timer_fd, 0, &ts, nullptr) == -1) { std::string msg = "Failed to set timerfd errorno=[" + std::string{strerror(errno)} + "]."; throw std::runtime_error(msg.data()); @@ -683,13 +638,13 @@ public: */ auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) -> void { - if(!m_shutdown_requested.exchange(true, std::memory_order::release)) + if (!m_shutdown_requested.exchange(true, std::memory_order::release)) { // Signal the event loop to stop asap. uint64_t value{1}; ::write(m_accept_fd, &value, sizeof(value)); - if(wait_for_tasks == shutdown_t::sync && m_scheduler_thread.joinable()) + if (wait_for_tasks == shutdown_t::sync && m_scheduler_thread.joinable()) { m_scheduler_thread.join(); } @@ -731,7 +686,7 @@ private: auto scheduler_after_func(coro::task inner_task, std::chrono::milliseconds wait_time) -> coro::task { // Seems to already be done. - if(inner_task.is_ready()) + if (inner_task.is_ready()) { co_return; } @@ -739,7 +694,7 @@ private: // Wait for the period requested, and then resume their task. co_await yield_for(wait_time); inner_task.resume(); - if(!inner_task.is_ready()) + if (!inner_task.is_ready()) { m_task_manager.store(std::move(inner_task)); } @@ -771,26 +726,26 @@ private: // Signal to the event loop there is a task to resume if one hasn't already been sent. bool expected{false}; - if(m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed)) + if (m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed)) { uint64_t value{1}; ::write(m_accept_fd, &value, sizeof(value)); } } - static constexpr std::chrono::milliseconds m_default_timeout{1000}; - static constexpr std::chrono::milliseconds m_no_timeout{0}; - static constexpr std::size_t m_max_events = 8; + static constexpr std::chrono::milliseconds m_default_timeout{1000}; + static constexpr std::chrono::milliseconds m_no_timeout{0}; + static constexpr std::size_t m_max_events = 8; std::array m_events{}; auto task_start(coro::task& task) -> void { - if(!task.is_ready()) // sanity check, the user could have manually resumed. + if (!task.is_ready()) // sanity check, the user could have manually resumed. { // Attempt to process the task synchronously before suspending. task.resume(); - if(!task.is_ready()) + if (!task.is_ready()) { m_task_manager.store(std::move(task)); // This task is now suspended waiting for an event. @@ -807,10 +762,9 @@ private: } } - inline auto process_task_variant(task_variant& tv) -> void { - if(std::holds_alternative>(tv)) + if (std::holds_alternative>(tv)) { auto& task = std::get>(tv); task_start(task); @@ -818,7 +772,7 @@ private: else { auto handle = std::get>(tv); - if(!handle.done()) + if (!handle.done()) { handle.resume(); } @@ -830,7 +784,7 @@ private: std::size_t amount{0}; { std::lock_guard lk{m_accept_mutex}; - while(!m_accept_queue.empty() && amount < task_inline_process_amount) + while (!m_accept_queue.empty() && amount < task_inline_process_amount) { m_processing_tasks[amount] = std::move(m_accept_queue.front()); m_accept_queue.pop_front(); @@ -839,12 +793,12 @@ private: } // The queue is empty, we are done here. - if(amount == 0) + if (amount == 0) { return; } - for(std::size_t i = 0 ; i < amount; ++i) + for (std::size_t i = 0; i < amount; ++i) { process_task_variant(m_processing_tasks[i]); } @@ -862,13 +816,13 @@ private: // Poll is run every iteration to make sure 'waiting' events are properly put into // the FIFO queue for when they are ready. auto event_count = epoll_wait(m_epoll_fd, m_events.data(), m_max_events, timeout.count()); - if(event_count > 0) + if (event_count > 0) { - for(std::size_t i = 0; i < static_cast(event_count); ++i) + for (std::size_t i = 0; i < static_cast(event_count); ++i) { void* handle_ptr = m_events[i].data.ptr; - if(handle_ptr == m_accept_ptr) + if (handle_ptr == m_accept_ptr) { uint64_t value{0}; ::read(m_accept_fd, &value, sizeof(value)); @@ -878,11 +832,9 @@ private: // Important to do this after the accept file descriptor has been read. // This needs to succeed so best practice is to loop compare exchange weak. bool expected{true}; - while(!m_event_set.compare_exchange_weak( - expected, - false, - std::memory_order::release, - std::memory_order::relaxed)) { } + while (!m_event_set.compare_exchange_weak( + expected, false, std::memory_order::release, std::memory_order::relaxed)) + {} tasks_ready = true; } @@ -896,12 +848,12 @@ private: } } - if(tasks_ready) + if (tasks_ready) { process_task_queue(); } - if(!m_task_manager.delete_tasks_empty()) + if (!m_task_manager.delete_tasks_empty()) { m_size.fetch_sub(m_task_manager.gc(), std::memory_order::relaxed); } @@ -911,11 +863,7 @@ private: { // Do not allow two threads to process events at the same time. bool expected{false}; - if(m_running.compare_exchange_strong( - expected, - true, - std::memory_order::release, - std::memory_order::relaxed)) + if (m_running.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed)) { process_events_poll_execute(user_timeout); m_running.exchange(false, std::memory_order::release); @@ -926,7 +874,7 @@ private: { m_running.exchange(true, std::memory_order::release); // Execute tasks until stopped or there are more tasks to complete. - while(!m_shutdown_requested.load(std::memory_order::relaxed) || m_size.load(std::memory_order::relaxed) > 0) + while (!m_shutdown_requested.load(std::memory_order::relaxed) || m_size.load(std::memory_order::relaxed) > 0) { process_events_poll_execute(m_default_timeout); } @@ -938,12 +886,12 @@ template inline auto resume_token::resume(return_type value) noexcept -> void { void* old_value = m_state.exchange(this, std::memory_order::acq_rel); - if(old_value != this) + if (old_value != this) { m_return_value = std::move(value); auto* waiters = static_cast(old_value); - while(waiters != nullptr) + while (waiters != nullptr) { // Intentionally not checking if this is running on the scheduler process event thread // as it can create a stack overflow if it triggers a 'resume chain'. unsafe_yield() @@ -953,7 +901,7 @@ inline auto resume_token::resume(return_type value) noexcept -> voi auto* next = waiters->m_next; // If scheduler is nullptr this is an unsafe_yield() // If scheduler is present this is a yield() - if(m_scheduler == nullptr)// || m_scheduler->this_thread_is_processing_events()) + if (m_scheduler == nullptr) // || m_scheduler->this_thread_is_processing_events()) { waiters->m_awaiting_coroutine.resume(); } @@ -969,13 +917,13 @@ inline auto resume_token::resume(return_type value) noexcept -> voi inline auto resume_token::resume() noexcept -> void { void* old_value = m_state.exchange(this, std::memory_order::acq_rel); - if(old_value != this) + if (old_value != this) { auto* waiters = static_cast(old_value); - while(waiters != nullptr) + while (waiters != nullptr) { auto* next = waiters->m_next; - if(m_scheduler == nullptr) + if (m_scheduler == nullptr) { waiters->m_awaiting_coroutine.resume(); } @@ -989,4 +937,3 @@ inline auto resume_token::resume() noexcept -> void } } // namespace coro - diff --git a/inc/coro/sync_wait.hpp b/inc/coro/sync_wait.hpp index 3285e2c..0a620ba 100644 --- a/inc/coro/sync_wait.hpp +++ b/inc/coro/sync_wait.hpp @@ -1,32 +1,30 @@ #pragma once -#include "coro/task.hpp" #include "coro/scheduler.hpp" +#include "coro/task.hpp" namespace coro { - template auto sync_wait(task_type&& task) -> decltype(auto) { - while(!task.is_ready()) + while (!task.is_ready()) { task.resume(); } return task.promise().return_value(); } -template -auto sync_wait_all(tasks&& ...awaitables) -> void +template +auto sync_wait_all(tasks&&... awaitables) -> void { - scheduler s{ scheduler::options { - .reserve_size = sizeof...(awaitables), - .thread_strategy = scheduler::thread_strategy_t::manual } - }; + scheduler s{scheduler::options{ + .reserve_size = sizeof...(awaitables), .thread_strategy = scheduler::thread_strategy_t::manual}}; (s.schedule(std::move(awaitables)), ...); - while(s.process_events() > 0) ; + while (s.process_events() > 0) + ; } } // namespace coro diff --git a/inc/coro/task.hpp b/inc/coro/task.hpp index 3463bb8..9b387b6 100644 --- a/inc/coro/task.hpp +++ b/inc/coro/task.hpp @@ -4,29 +4,24 @@ namespace coro { - template class task; namespace detail { - struct promise_base { friend struct final_awaitable; struct final_awaitable { - auto await_ready() const noexcept -> bool - { - return false; - } + auto await_ready() const noexcept -> bool { return false; } template auto await_suspend(std::coroutine_handle coroutine) noexcept -> std::coroutine_handle<> { - // // If there is a continuation call it, otherwise this is the end of the line. + // If there is a continuation call it, otherwise this is the end of the line. auto& promise = coroutine.promise(); - if(promise.m_continuation != nullptr) + if (promise.m_continuation != nullptr) { return promise.m_continuation; } @@ -43,52 +38,37 @@ struct promise_base }; promise_base() noexcept = default; - ~promise_base() = default; + ~promise_base() = default; - auto initial_suspend() - { - return std::suspend_always{}; - } + auto initial_suspend() { return std::suspend_always{}; } - auto final_suspend() - { - return final_awaitable{}; - } + auto final_suspend() { return final_awaitable{}; } - auto unhandled_exception() -> void - { - m_exception_ptr = std::current_exception(); - } + auto unhandled_exception() -> void { m_exception_ptr = std::current_exception(); } - auto continuation(std::coroutine_handle<> continuation) noexcept -> void - { - m_continuation = continuation; - } + auto continuation(std::coroutine_handle<> continuation) noexcept -> void { m_continuation = continuation; } protected: std::coroutine_handle<> m_continuation{nullptr}; - std::exception_ptr m_exception_ptr{}; + std::exception_ptr m_exception_ptr{}; }; template struct promise final : public promise_base { - using task_type = task; + using task_type = task; using coroutine_handle = std::coroutine_handle>; promise() noexcept = default; - ~promise() = default; + ~promise() = default; auto get_return_object() noexcept -> task_type; - auto return_value(return_type value) -> void - { - m_return_value = std::move(value); - } + auto return_value(return_type value) -> void { m_return_value = std::move(value); } - auto return_value() const & -> const return_type& + auto return_value() const& -> const return_type& { - if(m_exception_ptr) + if (m_exception_ptr) { std::rethrow_exception(m_exception_ptr); } @@ -98,7 +78,7 @@ struct promise final : public promise_base auto return_value() && -> return_type&& { - if(m_exception_ptr) + if (m_exception_ptr) { std::rethrow_exception(m_exception_ptr); } @@ -113,22 +93,19 @@ private: template<> struct promise : public promise_base { - using task_type = task; + using task_type = task; using coroutine_handle = std::coroutine_handle>; promise() noexcept = default; - ~promise() = default; + ~promise() = default; auto get_return_object() noexcept -> task_type; - auto return_void() noexcept -> void - { - - } + auto return_void() noexcept -> void {} auto return_value() const -> void { - if(m_exception_ptr) + if (m_exception_ptr) { std::rethrow_exception(m_exception_ptr); } @@ -141,22 +118,15 @@ template class task { public: - using task_type = task; - using promise_type = detail::promise; + using task_type = task; + using promise_type = detail::promise; using coroutine_handle = std::coroutine_handle; struct awaitable_base { - awaitable_base(coroutine_handle coroutine) noexcept - : m_coroutine(coroutine) - { + awaitable_base(coroutine_handle coroutine) noexcept : m_coroutine(coroutine) {} - } - - auto await_ready() const noexcept -> bool - { - return !m_coroutine || m_coroutine.done(); - } + auto await_ready() const noexcept -> bool { return !m_coroutine || m_coroutine.done(); } auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> std::coroutine_handle<> { @@ -167,43 +137,31 @@ public: std::coroutine_handle m_coroutine{nullptr}; }; - task() noexcept - : m_coroutine(nullptr) - { + task() noexcept : m_coroutine(nullptr) {} - } - - task(coroutine_handle handle) - : m_coroutine(handle) - { - - } + task(coroutine_handle handle) : m_coroutine(handle) {} task(const task&) = delete; - task(task&& other) noexcept - : m_coroutine(other.m_coroutine) - { - other.m_coroutine = nullptr; - } + task(task&& other) noexcept : m_coroutine(other.m_coroutine) { other.m_coroutine = nullptr; } ~task() { - if(m_coroutine != nullptr) + if (m_coroutine != nullptr) { m_coroutine.destroy(); } } auto operator=(const task&) -> task& = delete; - auto operator=(task&& other) noexcept -> task& + auto operator =(task&& other) noexcept -> task& { - if(std::addressof(other) != this) + if (std::addressof(other) != this) { - if(m_coroutine != nullptr) + if (m_coroutine != nullptr) { m_coroutine.destroy(); } - m_coroutine = other.m_coroutine; + m_coroutine = other.m_coroutine; other.m_coroutine = nullptr; } @@ -213,14 +171,11 @@ public: /** * @return True if the task is in its final suspend or if the task has been destroyed. */ - auto is_ready() const noexcept -> bool - { - return m_coroutine == nullptr || m_coroutine.done(); - } + auto is_ready() const noexcept -> bool { return m_coroutine == nullptr || m_coroutine.done(); } auto resume() -> bool { - if(!m_coroutine.done()) + if (!m_coroutine.done()) { m_coroutine.resume(); } @@ -229,7 +184,7 @@ public: auto destroy() -> bool { - if(m_coroutine != nullptr) + if (m_coroutine != nullptr) { m_coroutine.destroy(); m_coroutine = nullptr; @@ -243,33 +198,18 @@ public: { struct awaitable : public awaitable_base { - auto await_resume() noexcept -> decltype(auto) - { - return this->m_coroutine.promise().return_value(); - } + auto await_resume() noexcept -> decltype(auto) { return this->m_coroutine.promise().return_value(); } }; return awaitable{m_coroutine}; } - auto promise() & -> promise_type& - { - return m_coroutine.promise(); - } + auto promise() & -> promise_type& { return m_coroutine.promise(); } - auto promise() const & -> const promise_type& - { - return m_coroutine.promise(); - } - auto promise() && -> promise_type&& - { - return std::move(m_coroutine.promise()); - } + auto promise() const& -> const promise_type& { return m_coroutine.promise(); } + auto promise() && -> promise_type&& { return std::move(m_coroutine.promise()); } - auto handle() -> coroutine_handle - { - return m_coroutine; - } + auto handle() -> coroutine_handle { return m_coroutine; } private: coroutine_handle m_coroutine{nullptr}; @@ -277,7 +217,6 @@ private: namespace detail { - template inline auto promise::get_return_object() noexcept -> task { diff --git a/src/event.cpp b/src/event.cpp new file mode 100644 index 0000000..105377c --- /dev/null +++ b/src/event.cpp @@ -0,0 +1,55 @@ +#include "coro/event.hpp" + +namespace coro +{ +event::event(bool initially_set) noexcept : m_state((initially_set) ? static_cast(this) : nullptr) +{ +} + +auto event::set() noexcept -> void +{ + // Exchange the state to this, if the state was previously not this, then traverse the list + // of awaiters and resume their coroutines. + void* old_value = m_state.exchange(this, std::memory_order::acq_rel); + if (old_value != this) + { + auto* waiters = static_cast(old_value); + while (waiters != nullptr) + { + auto* next = waiters->m_next; + waiters->m_awaiting_coroutine.resume(); + waiters = next; + } + } +} + +auto event::awaiter::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool +{ + const void* const set_state = &m_event; + + m_awaiting_coroutine = awaiting_coroutine; + + // This value will update if other threads write to it via acquire. + void* old_value = m_event.m_state.load(std::memory_order::acquire); + do + { + // Resume immediately if already in the set state. + if (old_value == set_state) + { + return false; + } + + m_next = static_cast(old_value); + } while (!m_event.m_state.compare_exchange_weak( + old_value, this, std::memory_order::release, std::memory_order::acquire)); + + return true; +} + +auto event::reset() noexcept -> void +{ + void* old_value = this; + m_state.compare_exchange_strong(old_value, nullptr, std::memory_order::acquire); +} + +} // namespace coro diff --git a/test/bench.cpp b/test/bench.cpp index 5901ab2..b14fbf3 100644 --- a/test/bench.cpp +++ b/test/bench.cpp @@ -2,30 +2,26 @@ #include -#include -#include #include +#include #include +#include using namespace std::chrono_literals; using sc = std::chrono::steady_clock; constexpr std::size_t default_iterations = 5'000'000; -static auto print_stats( - const std::string& bench_name, - uint64_t operations, - sc::time_point start, - sc::time_point stop -) -> void +static auto print_stats(const std::string& bench_name, uint64_t operations, sc::time_point start, sc::time_point stop) + -> void { auto duration = std::chrono::duration_cast(stop - start); - auto ms = std::chrono::duration_cast(duration); + auto ms = std::chrono::duration_cast(duration); std::cout << bench_name << "\n"; std::cout << " " << operations << " ops in " << ms.count() << "ms\n"; - double seconds = duration.count() / 1'000'000'000.0; + double seconds = duration.count() / 1'000'000'000.0; double ops_per_sec = static_cast(operations / seconds); std::cout << " ops/sec: " << std::fixed << ops_per_sec << "\n"; @@ -35,15 +31,14 @@ TEST_CASE("benchmark counter func direct call") { constexpr std::size_t iterations = default_iterations; std::atomic counter{0}; - auto func = [&]() -> void - { + auto func = [&]() -> void { counter.fetch_add(1, std::memory_order::relaxed); return; }; auto start = sc::now(); - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { func(); } @@ -56,17 +51,15 @@ TEST_CASE("benchmark counter func coro::sync_wait(awaitable)") { constexpr std::size_t iterations = default_iterations; std::atomic counter{0}; - auto func = [&]() -> coro::task - { + auto func = [&]() -> coro::task { counter.fetch_add(1, std::memory_order::relaxed); co_return; }; auto start = sc::now(); - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { - coro::sync_wait(func()); } @@ -78,15 +71,14 @@ TEST_CASE("benchmark counter func coro::sync_wait_all(awaitable)") { constexpr std::size_t iterations = default_iterations; std::atomic counter{0}; - auto func = [&]() -> coro::task - { + auto func = [&]() -> coro::task { counter.fetch_add(1, std::memory_order::relaxed); co_return; }; auto start = sc::now(); - for(std::size_t i = 0; i < iterations; i += 10) + for (std::size_t i = 0; i < iterations; i += 10) { coro::sync_wait_all(func(), func(), func(), func(), func(), func(), func(), func(), func(), func()); } @@ -99,17 +91,16 @@ TEST_CASE("benchmark counter task scheduler") { constexpr std::size_t iterations = default_iterations; - coro::scheduler s1{}; + coro::scheduler s1{}; std::atomic counter{0}; - auto func = [&]() -> coro::task - { + auto func = [&]() -> coro::task { counter.fetch_add(1, std::memory_order::relaxed); co_return; }; auto start = sc::now(); - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { s1.schedule(func()); } @@ -123,19 +114,18 @@ TEST_CASE("benchmark counter task scheduler") TEST_CASE("benchmark counter task scheduler yield -> resume from main") { constexpr std::size_t iterations = default_iterations; - constexpr std::size_t ops = iterations * 2; // the external resume is still a resume op + constexpr std::size_t ops = iterations * 2; // the external resume is still a resume op - coro::scheduler s{}; + coro::scheduler s{}; std::vector> tokens{}; - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { tokens.emplace_back(s.generate_resume_token()); } std::atomic counter{0}; - auto wait_func = [&](std::size_t index) -> coro::task - { + auto wait_func = [&](std::size_t index) -> coro::task { co_await s.yield(tokens[index]); counter.fetch_add(1, std::memory_order::relaxed); co_return; @@ -143,12 +133,12 @@ TEST_CASE("benchmark counter task scheduler yield -> resume from main") auto start = sc::now(); - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { s.schedule(wait_func(i)); } - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { tokens[i].resume(); } @@ -164,33 +154,31 @@ TEST_CASE("benchmark counter task scheduler yield -> resume from main") TEST_CASE("benchmark counter task scheduler yield -> resume from coroutine") { constexpr std::size_t iterations = default_iterations; - constexpr std::size_t ops = iterations * 2; // each iteration executes 2 coroutines. + constexpr std::size_t ops = iterations * 2; // each iteration executes 2 coroutines. - coro::scheduler s{}; + coro::scheduler s{}; std::vector> tokens{}; - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { tokens.emplace_back(s.generate_resume_token()); } std::atomic counter{0}; - auto wait_func = [&](std::size_t index) -> coro::task - { + auto wait_func = [&](std::size_t index) -> coro::task { co_await s.yield(tokens[index]); counter.fetch_add(1, std::memory_order::relaxed); co_return; }; - auto resume_func = [&](std::size_t index) -> coro::task - { + auto resume_func = [&](std::size_t index) -> coro::task { tokens[index].resume(); co_return; }; auto start = sc::now(); - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { s.schedule(wait_func(i)); s.schedule(resume_func(i)); @@ -207,33 +195,31 @@ TEST_CASE("benchmark counter task scheduler yield -> resume from coroutine") TEST_CASE("benchmark counter task scheduler resume from coroutine -> yield") { constexpr std::size_t iterations = default_iterations; - constexpr std::size_t ops = iterations * 2; // each iteration executes 2 coroutines. + constexpr std::size_t ops = iterations * 2; // each iteration executes 2 coroutines. - coro::scheduler s{}; + coro::scheduler s{}; std::vector> tokens{}; - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { tokens.emplace_back(s.generate_resume_token()); } std::atomic counter{0}; - auto wait_func = [&](std::size_t index) -> coro::task - { + auto wait_func = [&](std::size_t index) -> coro::task { co_await s.yield(tokens[index]); counter.fetch_add(1, std::memory_order::relaxed); co_return; }; - auto resume_func = [&](std::size_t index) -> coro::task - { + auto resume_func = [&](std::size_t index) -> coro::task { tokens[index].resume(); co_return; }; auto start = sc::now(); - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { s.schedule(resume_func(i)); s.schedule(wait_func(i)); @@ -250,38 +236,36 @@ TEST_CASE("benchmark counter task scheduler resume from coroutine -> yield") TEST_CASE("benchmark counter task scheduler yield (all) -> resume (all) from coroutine with reserve") { constexpr std::size_t iterations = default_iterations; - constexpr std::size_t ops = iterations * 2; // each iteration executes 2 coroutines. + constexpr std::size_t ops = iterations * 2; // each iteration executes 2 coroutines. - coro::scheduler s{ coro::scheduler::options { .reserve_size = iterations } }; + coro::scheduler s{coro::scheduler::options{.reserve_size = iterations}}; std::vector> tokens{}; - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { tokens.emplace_back(s.generate_resume_token()); } std::atomic counter{0}; - auto wait_func = [&](std::size_t index) -> coro::task - { + auto wait_func = [&](std::size_t index) -> coro::task { co_await s.yield(tokens[index]); counter.fetch_add(1, std::memory_order::relaxed); co_return; }; - auto resume_func = [&](std::size_t index) -> coro::task - { + auto resume_func = [&](std::size_t index) -> coro::task { tokens[index].resume(); co_return; }; auto start = sc::now(); - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { s.schedule(wait_func(i)); } - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { s.schedule(resume_func(i)); } diff --git a/test/test_event.cpp b/test/test_event.cpp index aba2cac..a384737 100644 --- a/test/test_event.cpp +++ b/test/test_event.cpp @@ -23,7 +23,6 @@ TEST_CASE("event single awaiter") REQUIRE(task.promise().return_value() == 42); } - auto producer(coro::event& event) -> void { // Long running task that consumers are waiting for goes here... @@ -70,3 +69,28 @@ TEST_CASE("event multiple watchers") REQUIRE(value2.promise().return_value() == 42); REQUIRE(value3.promise().return_value() == 42); } + +TEST_CASE("event reset") +{ + coro::event e{}; + + e.reset(); + REQUIRE_FALSE(e.is_set()); + + auto value1 = consumer(e); + value1.resume(); // start co_awaiting event + REQUIRE_FALSE(value1.is_ready()); + + producer(e); + REQUIRE(value1.promise().return_value() == 42); + + e.reset(); + + auto value2 = consumer(e); + value2.resume(); + REQUIRE_FALSE(value2.is_ready()); + + producer(e); + + REQUIRE(value2.promise().return_value() == 42); +} diff --git a/test/test_generator.cpp b/test/test_generator.cpp index 1d799f3..fae0ebc 100644 --- a/test/test_generator.cpp +++ b/test/test_generator.cpp @@ -5,12 +5,9 @@ TEST_CASE("generator single yield") { std::string msg{"Hello World Generator!"}; - auto func = [&]() -> coro::generator - { - co_yield msg; - }; + auto func = [&]() -> coro::generator { co_yield msg; }; - for(const auto& v : func()) + for (const auto& v : func()) { REQUIRE(v == msg); } @@ -20,10 +17,9 @@ TEST_CASE("generator infinite incrementing integer yield") { constexpr const int64_t max = 1024; - auto func = []() -> coro::generator - { + auto func = []() -> coro::generator { int64_t i{0}; - while(true) + while (true) { ++i; co_yield i; @@ -31,12 +27,12 @@ TEST_CASE("generator infinite incrementing integer yield") }; int64_t v{1}; - for(const auto& v_1 : func()) + for (const auto& v_1 : func()) { REQUIRE(v == v_1); ++v; - if(v > max) + if (v > max) { break; } diff --git a/test/test_latch.cpp b/test/test_latch.cpp index d592956..0dd1256 100644 --- a/test/test_latch.cpp +++ b/test/test_latch.cpp @@ -5,13 +5,11 @@ #include #include - TEST_CASE("latch count=0") { coro::latch l{0}; - auto task = [&]() -> coro::task - { + auto task = [&]() -> coro::task { co_await l; co_return 42; }(); @@ -25,8 +23,7 @@ TEST_CASE("latch count=1") { coro::latch l{1}; - auto task = [&]() -> coro::task - { + auto task = [&]() -> coro::task { auto workers = l.remaining(); co_await l; co_return workers; @@ -44,8 +41,7 @@ TEST_CASE("latch count=1 count_down=5") { coro::latch l{1}; - auto task = [&]() -> coro::task - { + auto task = [&]() -> coro::task { auto workers = l.remaining(); co_await l; co_return workers; @@ -63,8 +59,7 @@ TEST_CASE("latch count=5 count_down=1 x5") { coro::latch l{5}; - auto task = [&]() -> coro::task - { + auto task = [&]() -> coro::task { auto workers = l.remaining(); co_await l; co_return workers; @@ -90,8 +85,7 @@ TEST_CASE("latch count=5 count_down=5") { coro::latch l{5}; - auto task = [&]() -> coro::task - { + auto task = [&]() -> coro::task { auto workers = l.remaining(); co_await l; co_return workers; diff --git a/test/test_scheduler.cpp b/test/test_scheduler.cpp index dd37f59..79aa251 100644 --- a/test/test_scheduler.cpp +++ b/test/test_scheduler.cpp @@ -2,11 +2,11 @@ #include -#include #include -#include -#include #include +#include +#include +#include using namespace std::chrono_literals; @@ -16,7 +16,8 @@ TEST_CASE("scheduler sizeof()") std::cerr << "sizeof(coro:task)=[" << sizeof(coro::task) << "]\n"; std::cerr << "sizeof(std::coroutine_handle<>)=[" << sizeof(std::coroutine_handle<>) << "]\n"; - std::cerr << "sizeof(std::variant>)=[" << sizeof(std::variant>) << "]\n"; + std::cerr << "sizeof(std::variant>)=[" << sizeof(std::variant>) + << "]\n"; REQUIRE(true); } @@ -24,15 +25,14 @@ TEST_CASE("scheduler sizeof()") TEST_CASE("scheduler submit single task") { std::atomic counter{0}; - coro::scheduler s{}; + coro::scheduler s{}; // Note that captures are only safe as long as the lambda object outlives the execution // of the coroutine. In all of these tests the lambda is created on the root test function // and thus will always outlive the coroutines, but in a real application this is dangerous // and coroutine 'captures' should be passed in via paramters to the function to be copied // into the coroutines stack frame. Lets - auto func = [&]() -> coro::task - { + auto func = [&]() -> coro::task { std::cerr << "Hello world from scheduler task!\n"; counter++; co_return; @@ -53,10 +53,9 @@ TEST_CASE("scheduler submit single task with move and auto initializing lambda") // through its parameters directly. std::atomic counter{0}; - coro::scheduler s{}; + coro::scheduler s{}; - auto task = [](std::atomic& counter) -> coro::task - { + auto task = [](std::atomic& counter) -> coro::task { std::cerr << "Hello world from scheduler task!\n"; counter++; co_return; @@ -73,10 +72,13 @@ TEST_CASE("scheduler submit mutiple tasks") { constexpr std::size_t n = 1000; std::atomic counter{0}; - coro::scheduler s{}; + coro::scheduler s{}; - auto func = [&]() -> coro::task { counter++; co_return; }; - for(std::size_t i = 0; i < n; ++i) + auto func = [&]() -> coro::task { + counter++; + co_return; + }; + for (std::size_t i = 0; i < n; ++i) { s.schedule(func()); } @@ -88,12 +90,11 @@ TEST_CASE("scheduler submit mutiple tasks") TEST_CASE("scheduler task with multiple yields on event") { std::atomic counter{0}; - coro::scheduler s{}; - auto token = s.generate_resume_token(); + coro::scheduler s{}; + auto token = s.generate_resume_token(); // coro::resume_token token{s}; - auto func = [&]() -> coro::task - { + auto func = [&]() -> coro::task { std::cerr << "1st suspend\n"; co_await s.yield(token); std::cerr << "1st resume\n"; @@ -116,7 +117,7 @@ TEST_CASE("scheduler task with multiple yields on event") auto resume_task = [&](coro::resume_token& token, uint64_t expected) { token.resume(1); - while(counter != expected) + while (counter != expected) { std::this_thread::sleep_for(1ms); } @@ -135,11 +136,10 @@ TEST_CASE("scheduler task with multiple yields on event") TEST_CASE("scheduler task with read poll") { - auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); coro::scheduler s{}; - auto func = [&]() -> coro::task - { + auto func = [&]() -> coro::task { // Poll will block until there is data to read. co_await s.poll(trigger_fd, coro::poll_op::read); REQUIRE(true); @@ -158,16 +158,12 @@ TEST_CASE("scheduler task with read poll") TEST_CASE("scheduler task with read") { constexpr uint64_t expected_value{42}; - auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - coro::scheduler s{}; + auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + coro::scheduler s{}; - auto func = [&]() -> coro::task - { + auto func = [&]() -> coro::task { uint64_t val{0}; - auto bytes_read = co_await s.read( - trigger_fd, - std::span(reinterpret_cast(&val), sizeof(val)) - ); + auto bytes_read = co_await s.read(trigger_fd, std::span(reinterpret_cast(&val), sizeof(val))); REQUIRE(bytes_read == sizeof(uint64_t)); REQUIRE(val == expected_value); @@ -191,23 +187,17 @@ TEST_CASE("scheduler task with read and write same fd") // pipe test for two concurrent tasks read and write awaiting on different file descriptors. constexpr uint64_t expected_value{9001}; - auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - coro::scheduler s{}; + auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + coro::scheduler s{}; - auto func = [&]() -> coro::task - { + auto func = [&]() -> coro::task { auto bytes_written = co_await s.write( - trigger_fd, - std::span(reinterpret_cast(&expected_value), sizeof(expected_value)) - ); + trigger_fd, std::span(reinterpret_cast(&expected_value), sizeof(expected_value))); REQUIRE(bytes_written == sizeof(uint64_t)); uint64_t val{0}; - auto bytes_read = co_await s.read( - trigger_fd, - std::span(reinterpret_cast(&val), sizeof(val)) - ); + auto bytes_read = co_await s.read(trigger_fd, std::span(reinterpret_cast(&val), sizeof(val))); REQUIRE(bytes_read == sizeof(uint64_t)); REQUIRE(val == expected_value); @@ -223,25 +213,23 @@ TEST_CASE("scheduler task with read and write same fd") TEST_CASE("scheduler task with read and write pipe") { const std::string msg{"coroutines are really cool but not that EASY!"}; - int pipe_fd[2]; + int pipe_fd[2]; pipe2(pipe_fd, O_NONBLOCK); coro::scheduler s{}; - auto read_func = [&]() -> coro::task - { - std::string buffer(4096, '0'); + auto read_func = [&]() -> coro::task { + std::string buffer(4096, '0'); std::span view{buffer.data(), buffer.size()}; - auto bytes_read = co_await s.read(pipe_fd[0], view); + auto bytes_read = co_await s.read(pipe_fd[0], view); REQUIRE(bytes_read == msg.size()); buffer.resize(bytes_read); REQUIRE(buffer == msg); }; - auto write_func = [&]() -> coro::task - { + auto write_func = [&]() -> coro::task { std::span view{msg.data(), msg.size()}; - auto bytes_written = co_await s.write(pipe_fd[1], view); + auto bytes_written = co_await s.write(pipe_fd[1], view); REQUIRE(bytes_written == msg.size()); }; @@ -253,11 +241,8 @@ TEST_CASE("scheduler task with read and write pipe") close(pipe_fd[1]); } -static auto standalone_read( - coro::scheduler& s, - coro::scheduler::fd_t socket, - std::span buffer -) -> coro::task +static auto standalone_read(coro::scheduler& s, coro::scheduler::fd_t socket, std::span buffer) + -> coro::task { // do other stuff in larger function co_return co_await s.read(socket, buffer); @@ -267,13 +252,13 @@ static auto standalone_read( TEST_CASE("scheduler standalone read task") { constexpr ssize_t expected_value{1111}; - auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); - coro::scheduler s{}; + auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + coro::scheduler s{}; - auto func = [&]() -> coro::task - { + auto func = [&]() -> coro::task { ssize_t v{0}; - auto bytes_read = co_await standalone_read(s, trigger_fd, std::span(reinterpret_cast(&v), sizeof(v))); + auto bytes_read = + co_await standalone_read(s, trigger_fd, std::span(reinterpret_cast(&v), sizeof(v))); REQUIRE(bytes_read == sizeof(ssize_t)); REQUIRE(v == expected_value); @@ -292,8 +277,7 @@ TEST_CASE("scheduler separate thread resume") { coro::scheduler s{}; - auto func = [&]() -> coro::task - { + auto func = [&]() -> coro::task { // User manual resume token, create one specifically for each task being generated // coro::resume_token token{s}; auto token = s.generate_resume_token(); @@ -321,33 +305,26 @@ TEST_CASE("scheduler separate thread resume") TEST_CASE("scheduler separate thread resume with return") { constexpr uint64_t expected_value{1337}; - coro::scheduler s{}; + coro::scheduler s{}; std::atomic*> token{}; - std::thread service{ - [&]() -> void + std::thread service{[&]() -> void { + while (token == nullptr) { - while(token == nullptr) - { - std::this_thread::sleep_for(1ms); - } - - token.load()->resume(expected_value); + std::this_thread::sleep_for(1ms); } + + token.load()->resume(expected_value); + }}; + + auto third_party_service = [&](int multiplier) -> coro::task { + auto output = co_await s.yield([&](coro::resume_token& t) { token = &t; }); + co_return output* multiplier; }; - auto third_party_service = [&](int multiplier) -> coro::task - { - auto output = co_await s.yield([&](coro::resume_token& t) { - token = &t; - }); - co_return output * multiplier; - }; - - auto func = [&]() -> coro::task - { - int multiplier{5}; + auto func = [&]() -> coro::task { + int multiplier{5}; uint64_t value = co_await third_party_service(multiplier); REQUIRE(value == (expected_value * multiplier)); }; @@ -362,15 +339,11 @@ TEST_CASE("scheduler with basic task") { constexpr std::size_t expected_value{5}; std::atomic counter{0}; - coro::scheduler s{}; + coro::scheduler s{}; - auto add_data = [&](uint64_t val) -> coro::task - { - co_return val; - }; + auto add_data = [&](uint64_t val) -> coro::task { co_return val; }; - auto func = [&]() -> coro::task - { + auto func = [&]() -> coro::task { counter += co_await add_data(expected_value); co_return; }; @@ -384,11 +357,10 @@ TEST_CASE("scheduler with basic task") TEST_CASE("schedule yield for") { constexpr std::chrono::milliseconds wait_for{50}; - std::atomic counter{0}; - coro::scheduler s{}; + std::atomic counter{0}; + coro::scheduler s{}; - auto func = [&]() -> coro::task - { + auto func = [&]() -> coro::task { ++counter; co_return; }; @@ -396,7 +368,7 @@ TEST_CASE("schedule yield for") auto start = std::chrono::steady_clock::now(); s.schedule_after(func(), wait_for); s.shutdown(); - auto stop = std::chrono::steady_clock::now(); + auto stop = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(stop - start); REQUIRE(counter == 1); @@ -407,16 +379,15 @@ TEST_CASE("scheduler trigger growth of internal tasks storage") { std::atomic counter{0}; constexpr std::size_t iterations{512}; - coro::scheduler s{coro::scheduler::options{.reserve_size = 1}}; + coro::scheduler s{coro::scheduler::options{.reserve_size = 1}}; - auto wait_func = [&](std::chrono::milliseconds wait_time) -> coro::task - { + auto wait_func = [&](std::chrono::milliseconds wait_time) -> coro::task { co_await s.yield_for(wait_time); ++counter; co_return; }; - for(std::size_t i = 0; i < iterations; ++i) + for (std::size_t i = 0; i < iterations; ++i) { s.schedule(wait_func(std::chrono::milliseconds{50})); } @@ -429,16 +400,10 @@ TEST_CASE("scheduler trigger growth of internal tasks storage") TEST_CASE("scheduler yield with scheduler event void") { std::atomic counter{0}; - coro::scheduler s{}; + coro::scheduler s{}; - auto func = [&]() -> coro::task - { - co_await s.yield( - [&](coro::resume_token& token) -> void - { - token.resume(); - } - ); + auto func = [&]() -> coro::task { + co_await s.yield([&](coro::resume_token& token) -> void { token.resume(); }); counter += 42; co_return; @@ -454,16 +419,10 @@ TEST_CASE("scheduler yield with scheduler event void") TEST_CASE("scheduler yield with scheduler event uint64_t") { std::atomic counter{0}; - coro::scheduler s{}; + coro::scheduler s{}; - auto func = [&]() -> coro::task - { - counter += co_await s.yield( - [&](coro::resume_token& token) -> void - { - token.resume(42); - } - ); + auto func = [&]() -> coro::task { + counter += co_await s.yield([&](coro::resume_token& token) -> void { token.resume(42); }); co_return; }; @@ -477,13 +436,12 @@ TEST_CASE("scheduler yield with scheduler event uint64_t") TEST_CASE("scheduler yield user event") { - std::string expected_result = "Here I am!"; + std::string expected_result = "Here I am!"; coro::scheduler s{}; - auto token = s.generate_resume_token(); + auto token = s.generate_resume_token(); // coro::resume_token token{s}; - auto func = [&]() -> coro::task - { + auto func = [&]() -> coro::task { co_await s.yield(token); REQUIRE(token.return_value() == expected_result); co_return; @@ -499,11 +457,10 @@ TEST_CASE("scheduler yield user event") TEST_CASE("scheduler yield user event multiple waiters") { std::atomic counter{0}; - coro::scheduler s{}; - auto token = s.generate_resume_token(); + coro::scheduler s{}; + auto token = s.generate_resume_token(); - auto func = [&](int amount) -> coro::task - { + auto func = [&](int amount) -> coro::task { co_await token; std::cerr << "amount=" << amount << "\n"; counter += amount; @@ -529,15 +486,14 @@ TEST_CASE("scheduler yield user event multiple waiters") TEST_CASE("scheduler manual process events with self generating coroutine (stack overflow check)") { - uint64_t counter{0}; - coro::scheduler s{coro::scheduler::options{ .thread_strategy = coro::scheduler::thread_strategy_t::manual }}; + uint64_t counter{0}; + coro::scheduler s{coro::scheduler::options{.thread_strategy = coro::scheduler::thread_strategy_t::manual}}; - auto func = [&](auto f) -> coro::task - { + auto func = [&](auto f) -> coro::task { ++counter; // this should detect stack overflows well enough - if(counter % 1'000'000 == 0) + if (counter % 1'000'000 == 0) { co_return; } @@ -549,7 +505,8 @@ TEST_CASE("scheduler manual process events with self generating coroutine (stack std::cerr << "Scheduling recursive function.\n"; s.schedule(func(func)); - while(s.process_events()) ; + while (s.process_events()) + ; std::cerr << "Recursive test done.\n"; } @@ -557,8 +514,7 @@ TEST_CASE("scheduler task throws") { coro::scheduler s{}; - auto func = []() -> coro::task - { + auto func = []() -> coro::task { // Is it possible to actually notify the user when running a task in a scheduler? // Seems like the user will need to manually catch. throw std::runtime_error{"I always throw."}; diff --git a/test/test_sync_wait.cpp b/test/test_sync_wait.cpp index 2192f70..10cc986 100644 --- a/test/test_sync_wait.cpp +++ b/test/test_sync_wait.cpp @@ -4,8 +4,7 @@ TEST_CASE("sync_wait task multiple suspends return integer with sync_wait") { - auto func = []() -> coro::task - { + auto func = []() -> coro::task { co_await std::suspend_always{}; co_await std::suspend_always{}; co_await std::suspend_always{}; @@ -18,14 +17,12 @@ TEST_CASE("sync_wait task multiple suspends return integer with sync_wait") TEST_CASE("sync_wait task co_await single") { - auto answer = []() -> coro::task - { + auto answer = []() -> coro::task { std::cerr << "\tThinking deep thoughts...\n"; co_return 42; }; - auto await_answer = [&]() -> coro::task - { + auto await_answer = [&]() -> coro::task { std::cerr << "\tStarting to wait for answer.\n"; auto a = answer(); std::cerr << "\tGot the coroutine, getting the value.\n"; @@ -45,8 +42,7 @@ TEST_CASE("sync_wait task co_await single") TEST_CASE("sync_wait_all accumulate") { std::atomic counter{0}; - auto func = [&](uint64_t amount) -> coro::task - { + auto func = [&](uint64_t amount) -> coro::task { std::cerr << "amount=" << amount << "\n"; counter += amount; co_return; diff --git a/test/test_task.cpp b/test/test_task.cpp index 6a57279..f45f15f 100644 --- a/test/test_task.cpp +++ b/test/test_task.cpp @@ -5,7 +5,6 @@ #include #include - TEST_CASE("task hello world") { using task_type = coro::task; @@ -63,7 +62,7 @@ TEST_CASE("task exception thrown") { auto value = task.promise().return_value(); } - catch(const std::exception& e) + catch (const std::exception& e) { thrown = true; REQUIRE(e.what() == throw_msg); @@ -74,10 +73,8 @@ TEST_CASE("task exception thrown") TEST_CASE("task in a task") { - auto outer_task = []() -> coro::task<> - { - auto inner_task = []() -> coro::task - { + auto outer_task = []() -> coro::task<> { + auto inner_task = []() -> coro::task { std::cerr << "inner_task start\n"; std::cerr << "inner_task stop\n"; co_return 42; @@ -96,14 +93,11 @@ TEST_CASE("task in a task") TEST_CASE("task in a task in a task") { - auto task1 = []() -> coro::task<> - { + auto task1 = []() -> coro::task<> { std::cerr << "task1 start\n"; - auto task2 = []() -> coro::task - { + auto task2 = []() -> coro::task { std::cerr << "\ttask2 start\n"; - auto task3 = []() -> coro::task - { + auto task3 = []() -> coro::task { std::cerr << "\t\ttask3 start\n"; std::cerr << "\t\ttask3 stop\n"; co_return 3; @@ -129,8 +123,7 @@ TEST_CASE("task in a task in a task") TEST_CASE("task multiple suspends return void") { - auto task = []() -> coro::task - { + auto task = []() -> coro::task { co_await std::suspend_always{}; co_await std::suspend_never{}; co_await std::suspend_always{}; @@ -153,8 +146,7 @@ TEST_CASE("task multiple suspends return void") TEST_CASE("task multiple suspends return integer") { - auto task = []() -> coro::task - { + auto task = []() -> coro::task { co_await std::suspend_always{}; co_await std::suspend_always{}; co_await std::suspend_always{}; @@ -177,14 +169,12 @@ TEST_CASE("task multiple suspends return integer") TEST_CASE("task resume from promise to coroutine handles of different types") { - auto task1 = [&]() -> coro::task - { + auto task1 = [&]() -> coro::task { std::cerr << "Task ran\n"; co_return 42; }(); - auto task2 = [&]() -> coro::task - { + auto task2 = [&]() -> coro::task { std::cerr << "Task 2 ran\n"; co_return; }(); @@ -211,8 +201,7 @@ TEST_CASE("task resume from promise to coroutine handles of different types") TEST_CASE("task throws void") { - auto task = []() -> coro::task - { + auto task = []() -> coro::task { throw std::runtime_error{"I always throw."}; co_return; }(); @@ -224,8 +213,7 @@ TEST_CASE("task throws void") TEST_CASE("task throws non-void l-value") { - auto task = []() -> coro::task - { + auto task = []() -> coro::task { throw std::runtime_error{"I always throw."}; co_return 42; }(); @@ -242,8 +230,7 @@ TEST_CASE("task throws non-void r-value") int m_value; }; - auto task = []() -> coro::task - { + auto task = []() -> coro::task { type return_value{42}; throw std::runtime_error{"I always throw."};