diff --git a/.githooks/readme-template.md b/.githooks/readme-template.md index 06acdc0..7206346 100644 --- a/.githooks/readme-template.md +++ b/.githooks/readme-template.md @@ -26,7 +26,9 @@ - coro::when_all(awaitable...) -> awaitable * Schedulers - [coro::thread_pool](#thread_pool) for coroutine cooperative multitasking - - [coro::io_scheduler](#io_scheduler) for driving i/o events, uses thread_pool for coroutine execution upon triggered events + - [coro::io_scheduler](#io_scheduler) for driving i/o events + - Can use coro::thread_pool for latency sensitive or long lived tasks. + - Can use inline task processing for thread per core or short lived tasks. - Currently uses an epoll driver - [coro::task_container](#task_container) for dynamic task lifetimes * Coroutine Networking @@ -238,7 +240,18 @@ thread pool worker 0 is shutting down. ``` ### io_scheduler -`coro::io_scheduler` is a i/o event scheduler that uses a statically sized pool (`coro::thread_pool`) to process the events that are ready. The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually driven via its `process_events()` function for integration into existing event loops. If using the dedicated thread to process i/o events the dedicated thread does not execute and of the tasks itself, it simply schedules them to be executed on the next availble worker thread in its embedded `coro::thread_pool`. Inline execution of tasks on the i/o dedicated thread is not supported since it can introduce poor latency when an expensive task is executing. +`coro::io_scheduler` is a i/o event scheduler that can use two methods of task processing: + +* A background `coro::thread_pool` +* Inline task processing on the `coro::io_scheduler`'s event loop + +Using a background `coro::thread_pool` will default to using `(std::thread::hardware_concurrency() - 1)` threads to process tasks. This processing strategy is best for longer tasks that would block the i/o scheduler or for tasks that are latency sensitive. + +Using the inline processing strategy will have the event loop i/o thread process the tasks inline on that thread when events are received. This processing strategy is best for shorter task that will not block the i/o thread for long or for pure throughput by using thread per core architecture, e.g. spin up an inline i/o scheduler per core and inline process tasks on each scheduler. + +The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually driven via its `process_events()` function for integration into existing event loops. By default i/o schedulers will spawn a dedicated event thread and use a thread pool to process tasks. + +Before getting to an example there are two methods of scheduling work onto an i/o scheduler, the first is by having the caller maintain the lifetime of the task being scheduled and the second is by moving or transfering owership of the task into the i/o scheduler. The first can allow for return values but requires the caller to manage the lifetime of the coroutine while the second requires the return type of the task to be void but allows for variable or unknown task lifetimes. Transferring task lifetime to the scheduler can be useful, e.g. for a network request. The example provided here shows an i/o scheduler that spins up a basic `coro::net::tcp_server` and a `coro::net::tcp_client` that will connect to each other and then send a request and a response. @@ -284,7 +297,8 @@ client: Hello from server 5 ### Requirements C++20 Compiler with coroutine support - g++10.2 is tested + g++10.2.1 + g++10.3.1 CMake make or ninja pthreads diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7fa56ed..4c1288d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,7 +43,7 @@ jobs: run: | cd build-release-g++ ctest -VV - build-fedora-31: + build-fedora-32: name: fedora-32 runs-on: ubuntu-latest container: @@ -55,8 +55,9 @@ jobs: cmake \ git \ ninja-build \ - gcc-c++-10.2.1 \ + gcc-c++ \ lcov \ + openssl \ openssl-devel - name: Checkout # recurisve checkout requires git to be installed first uses: actions/checkout@v2 @@ -100,3 +101,72 @@ jobs: with: github-token: ${{ secrets.GITHUB_TOKEN }} path-to-lcov: build-debug-g++/libcoro_tests.info + + build-fedora-33: + name: fedora-33 + runs-on: ubuntu-latest + container: + image: fedora:33 + steps: + - name: dnf + run: | + sudo dnf install -y \ + cmake \ + git \ + ninja-build \ + gcc-c++-10.3.1 \ + openssl \ + openssl-devel + - name: Checkout # recurisve checkout requires git to be installed first + uses: actions/checkout@v2 + with: + submodules: recursive + - name: build-release-g++ + run: | + mkdir build-release-g++ + cd build-release-g++ + cmake \ + -GNinja \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_C_COMPILER=gcc \ + -DCMAKE_CXX_COMPILER=g++ \ + .. + ninja + - name: test-release-g++ + run: | + cd build-release-g++ + ctest -VV + build-fedora-34: + name: fedora-34 + runs-on: ubuntu-latest + container: + image: fedora:34 + steps: + - name: dnf + run: | + sudo dnf install -y \ + cmake \ + git \ + ninja-build \ + gcc-c++ \ + openssl \ + openssl-devel + - name: Checkout # recurisve checkout requires git to be installed first + uses: actions/checkout@v2 + with: + submodules: recursive + - name: build-release-g++ + run: | + mkdir build-release-g++ + cd build-release-g++ + cmake \ + -GNinja \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_C_COMPILER=gcc \ + -DCMAKE_CXX_COMPILER=g++ \ + .. + ninja + - name: test-release-g++ + run: | + cd build-release-g++ + ctest -VV \ No newline at end of file diff --git a/README.md b/README.md index 0ae3dd9..7eb0d7b 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,9 @@ - coro::when_all(awaitable...) -> awaitable * Schedulers - [coro::thread_pool](#thread_pool) for coroutine cooperative multitasking - - [coro::io_scheduler](#io_scheduler) for driving i/o events, uses thread_pool for coroutine execution upon triggered events + - [coro::io_scheduler](#io_scheduler) for driving i/o events + - Can use coro::thread_pool for latency sensitive or long lived tasks. + - Can use inline task processing for thread per core or short lived tasks. - Currently uses an epoll driver - [coro::task_container](#task_container) for dynamic task lifetimes * Coroutine Networking @@ -690,7 +692,18 @@ thread pool worker 0 is shutting down. ``` ### io_scheduler -`coro::io_scheduler` is a i/o event scheduler that uses a statically sized pool (`coro::thread_pool`) to process the events that are ready. The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually driven via its `process_events()` function for integration into existing event loops. If using the dedicated thread to process i/o events the dedicated thread does not execute and of the tasks itself, it simply schedules them to be executed on the next availble worker thread in its embedded `coro::thread_pool`. Inline execution of tasks on the i/o dedicated thread is not supported since it can introduce poor latency when an expensive task is executing. +`coro::io_scheduler` is a i/o event scheduler that can use two methods of task processing: + +* A background `coro::thread_pool` +* Inline task processing on the `coro::io_scheduler`'s event loop + +Using a background `coro::thread_pool` will default to using `(std::thread::hardware_concurrency() - 1)` threads to process tasks. This processing strategy is best for longer tasks that would block the i/o scheduler or for tasks that are latency sensitive. + +Using the inline processing strategy will have the event loop i/o thread process the tasks inline on that thread when events are received. This processing strategy is best for shorter task that will not block the i/o thread for long or for pure throughput by using thread per core architecture, e.g. spin up an inline i/o scheduler per core and inline process tasks on each scheduler. + +The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually driven via its `process_events()` function for integration into existing event loops. By default i/o schedulers will spawn a dedicated event thread and use a thread pool to process tasks. + +Before getting to an example there are two methods of scheduling work onto an i/o scheduler, the first is by having the caller maintain the lifetime of the task being scheduled and the second is by moving or transfering owership of the task into the i/o scheduler. The first can allow for return values but requires the caller to manage the lifetime of the coroutine while the second requires the return type of the task to be void but allows for variable or unknown task lifetimes. Transferring task lifetime to the scheduler can be useful, e.g. for a network request. The example provided here shows an i/o scheduler that spins up a basic `coro::net::tcp_server` and a `coro::net::tcp_client` that will connect to each other and then send a request and a response. @@ -959,7 +972,8 @@ client: Hello from server 5 ### Requirements C++20 Compiler with coroutine support - g++10.2 is tested + g++10.2.1 + g++10.3.1 CMake make or ninja pthreads diff --git a/inc/coro/concepts/executor.hpp b/inc/coro/concepts/executor.hpp index d413291..0bf6a63 100644 --- a/inc/coro/concepts/executor.hpp +++ b/inc/coro/concepts/executor.hpp @@ -4,8 +4,6 @@ #include #include -// #include -// #include namespace coro::concepts { diff --git a/inc/coro/generator.hpp b/inc/coro/generator.hpp index 086da41..73e5307 100644 --- a/inc/coro/generator.hpp +++ b/inc/coro/generator.hpp @@ -24,7 +24,7 @@ public: auto initial_suspend() const { return std::suspend_always{}; } - auto final_suspend() const { return std::suspend_always{}; } + auto final_suspend() const noexcept(true) { return std::suspend_always{}; } template::value, int> = 0> auto yield_value(std::remove_reference_t& value) noexcept diff --git a/inc/coro/io_scheduler.hpp b/inc/coro/io_scheduler.hpp index 27352cc..3145747 100644 --- a/inc/coro/io_scheduler.hpp +++ b/inc/coro/io_scheduler.hpp @@ -4,11 +4,13 @@ #include "coro/fd.hpp" #include "coro/net/socket.hpp" #include "coro/poll.hpp" +#include "coro/task_container.hpp" #include "coro/thread_pool.hpp" #include #include #include +#include #include #include #include @@ -152,6 +154,18 @@ public: */ auto schedule() -> schedule_operation { return schedule_operation{*this}; } + /** + * Schedules a task onto the io_scheduler and moves ownership of the task to the io_scheduler. + * Only void return type tasks can be scheduled in this manner since the task submitter will no + * longer have control over the scheduled task. + * @param task The task to execute on this io_scheduler. It's lifetime ownership will be transferred + * to this io_scheduler. + */ + auto schedule(coro::task&& task) -> void + { + static_cast*>(m_owned_tasks)->start(std::move(task)); + } + /** * Schedules the current task to run after the given amount of time has elapsed. * @param amount The amount of time to wait before resuming execution of this task. @@ -304,6 +318,11 @@ private: std::mutex m_scheduled_tasks_mutex{}; std::vector> m_scheduled_tasks{}; + /// Tasks that have their ownership passed into the scheduler. This is a bit strange for now + /// but the concept doesn't pass since io_scheduler isn't fully defined yet. + /// The type is coro::task_container* + void* m_owned_tasks{nullptr}; + static constexpr const int m_shutdown_object{0}; static constexpr const void* m_shutdown_ptr = &m_shutdown_object; diff --git a/inc/coro/sync_wait.hpp b/inc/coro/sync_wait.hpp index 14e99fb..31f21c6 100644 --- a/inc/coro/sync_wait.hpp +++ b/inc/coro/sync_wait.hpp @@ -123,9 +123,7 @@ public: return completion_notifier{}; } - auto return_void() noexcept -> void {} - - auto return_value() + auto return_void() -> void { if (m_exception) { @@ -170,7 +168,8 @@ public: { if constexpr (std::is_same_v) { - m_coroutine.promise().return_value(); + // Propagate exceptions. + m_coroutine.promise().return_void(); return; } else diff --git a/inc/coro/task.hpp b/inc/coro/task.hpp index a85e751..8f61aa9 100644 --- a/inc/coro/task.hpp +++ b/inc/coro/task.hpp @@ -46,7 +46,7 @@ struct promise_base auto initial_suspend() { return std::suspend_always{}; } - auto final_suspend() { return final_awaitable{}; } + auto final_suspend() noexcept(true) { return final_awaitable{}; } auto unhandled_exception() -> void { m_exception_ptr = std::current_exception(); } @@ -105,9 +105,7 @@ struct promise : public promise_base auto get_return_object() noexcept -> task_type; - auto return_void() noexcept -> void {} - - auto return_value() const -> void + auto return_void() -> void { if (m_exception_ptr) { @@ -119,7 +117,7 @@ struct promise : public promise_base } // namespace detail template -class task +class [[nodiscard]] task { public: using task_type = task; @@ -202,7 +200,19 @@ public: { struct awaitable : public awaitable_base { - auto await_resume() -> decltype(auto) { return this->m_coroutine.promise().return_value(); } + auto await_resume() -> decltype(auto) + { + if constexpr (std::is_same_v) + { + // Propagate uncaught exceptions. + this->m_coroutine.promise().return_void(); + return; + } + else + { + return this->m_coroutine.promise().return_value(); + } + } }; return awaitable{m_coroutine}; @@ -212,7 +222,19 @@ public: { struct awaitable : public awaitable_base { - auto await_resume() -> decltype(auto) { return std::move(this->m_coroutine.promise()).return_value(); } + auto await_resume() -> decltype(auto) + { + if constexpr (std::is_same_v) + { + // Propagate uncaught exceptions. + this->m_coroutine.promise().return_void(); + return; + } + else + { + return std::move(this->m_coroutine.promise()).return_value(); + } + } }; return awaitable{m_coroutine}; diff --git a/inc/coro/task_container.hpp b/inc/coro/task_container.hpp index 6718d26..ffc0102 100644 --- a/inc/coro/task_container.hpp +++ b/inc/coro/task_container.hpp @@ -69,7 +69,7 @@ public: * call? Calling at regular intervals will reduce memory usage of completed * tasks and allow for the task container to re-use allocated space. */ - auto start(coro::task user_task, garbage_collect_t cleanup = garbage_collect_t::yes) -> void + auto start(coro::task&& user_task, garbage_collect_t cleanup = garbage_collect_t::yes) -> void { m_size.fetch_add(1, std::memory_order::relaxed); diff --git a/inc/coro/when_all.hpp b/inc/coro/when_all.hpp index 86cc22a..6b64f09 100644 --- a/inc/coro/when_all.hpp +++ b/inc/coro/when_all.hpp @@ -340,15 +340,7 @@ public: auto unhandled_exception() noexcept -> void { m_exception_ptr = std::current_exception(); } - auto return_void() noexcept -> void {} - - auto start(when_all_latch& latch) -> void - { - m_latch = &latch; - coroutine_handle_type::from_promise(*this).resume(); - } - - auto return_value() -> void + auto return_void() noexcept -> void { if (m_exception_ptr) { @@ -356,6 +348,12 @@ public: } } + auto start(when_all_latch& latch) -> void + { + m_latch = &latch; + coroutine_handle_type::from_promise(*this).resume(); + } + private: when_all_latch* m_latch{nullptr}; std::exception_ptr m_exception_ptr; diff --git a/src/io_scheduler.cpp b/src/io_scheduler.cpp index 8be2c3b..0ac2ecf 100644 --- a/src/io_scheduler.cpp +++ b/src/io_scheduler.cpp @@ -19,7 +19,8 @@ io_scheduler::io_scheduler(options opts) m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)), m_shutdown_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)), m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)), - m_schedule_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)) + m_schedule_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)), + m_owned_tasks(new coro::task_container(*this)) { if (opts.execution_strategy == execution_strategy_t::process_tasks_on_thread_pool) { @@ -69,6 +70,12 @@ io_scheduler::~io_scheduler() close(m_schedule_fd); m_schedule_fd = -1; } + + if (m_owned_tasks != nullptr) + { + delete static_cast*>(m_owned_tasks); + m_owned_tasks = nullptr; + } } auto io_scheduler::process_events(std::chrono::milliseconds timeout) -> std::size_t diff --git a/src/net/tcp_client.cpp b/src/net/tcp_client.cpp index 828b1de..ec99ce7 100644 --- a/src/net/tcp_client.cpp +++ b/src/net/tcp_client.cpp @@ -40,8 +40,8 @@ tcp_client::~tcp_client() if (m_ssl_info.m_ssl_ptr != nullptr && !m_ssl_info.m_ssl_error) { // Should the shutdown timeout be configurable? - ssl_shutdown_and_free( - *m_io_scheduler, std::move(m_socket), std::move(m_ssl_info.m_ssl_ptr), std::chrono::seconds{30}); + m_io_scheduler->schedule(ssl_shutdown_and_free( + *m_io_scheduler, std::move(m_socket), std::move(m_ssl_info.m_ssl_ptr), std::chrono::seconds{30})); } } @@ -205,9 +205,6 @@ auto tcp_client::ssl_shutdown_and_free( io_scheduler& io_scheduler, net::socket s, ssl_unique_ptr ssl_ptr, std::chrono::milliseconds timeout) -> coro::task { - // Immediately transfer onto the scheduler thread pool for background processing. - co_await io_scheduler.schedule(); - while (true) { auto r = SSL_shutdown(ssl_ptr.get()); diff --git a/test/test_latch.cpp b/test/test_latch.cpp index 259af92..26c7673 100644 --- a/test/test_latch.cpp +++ b/test/test_latch.cpp @@ -9,10 +9,12 @@ TEST_CASE("latch count=0", "[latch]") { coro::latch l{0}; - auto task = [&]() -> coro::task { + auto make_task = [&]() -> coro::task { co_await l; co_return 42; - }(); + }; + + auto task = make_task(); task.resume(); REQUIRE(task.is_ready()); // The latch never waits due to zero count. @@ -23,11 +25,13 @@ TEST_CASE("latch count=1", "[latch]") { coro::latch l{1}; - auto task = [&]() -> coro::task { + auto make_task = [&]() -> coro::task { auto workers = l.remaining(); co_await l; co_return workers; - }(); + }; + + auto task = make_task(); task.resume(); REQUIRE_FALSE(task.is_ready()); @@ -41,11 +45,13 @@ TEST_CASE("latch count=1 count_down=5", "[latch]") { coro::latch l{1}; - auto task = [&]() -> coro::task { + auto make_task = [&]() -> coro::task { auto workers = l.remaining(); co_await l; co_return workers; - }(); + }; + + auto task = make_task(); task.resume(); REQUIRE_FALSE(task.is_ready()); @@ -59,11 +65,13 @@ TEST_CASE("latch count=5 count_down=1 x5", "[latch]") { coro::latch l{5}; - auto task = [&]() -> coro::task { + auto make_task = [&]() -> coro::task { auto workers = l.remaining(); co_await l; co_return workers; - }(); + }; + + auto task = make_task(); task.resume(); REQUIRE_FALSE(task.is_ready()); @@ -85,11 +93,13 @@ TEST_CASE("latch count=5 count_down=5", "[latch]") { coro::latch l{5}; - auto task = [&]() -> coro::task { + auto make_task = [&]() -> coro::task { auto workers = l.remaining(); co_await l; co_return workers; - }(); + }; + + auto task = make_task(); task.resume(); REQUIRE_FALSE(task.is_ready()); diff --git a/test/test_task.cpp b/test/test_task.cpp index fe832ff..469bb0b 100644 --- a/test/test_task.cpp +++ b/test/test_task.cpp @@ -206,9 +206,9 @@ TEST_CASE("task throws void", "[task]") co_return; }(); - task.resume(); + REQUIRE_NOTHROW(task.resume()); REQUIRE(task.is_ready()); - REQUIRE_THROWS_AS(task.promise().return_value(), std::runtime_error); + REQUIRE_THROWS_AS(task.promise().return_void(), std::runtime_error); } TEST_CASE("task throws non-void l-value", "[task]") @@ -218,7 +218,7 @@ TEST_CASE("task throws non-void l-value", "[task]") co_return 42; }(); - task.resume(); + REQUIRE_NOTHROW(task.resume()); REQUIRE(task.is_ready()); REQUIRE_THROWS_AS(task.promise().return_value(), std::runtime_error); }