1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

Update docs on io_scheduler for inline processing (#84)

* Update docs on io_scheduler for inline processing

Support gcc 10.3.1 (fedora 33 updated)
Update ci.yml to run fedora 32,33,34 and support both
gcc 10.2.1 and 10.3.1

* fedora 32 -> gcc-c++ drop version

* Update ci.yml and test_latch.cpp
This commit is contained in:
Josh Baldwin 2021-05-22 19:58:46 -06:00 committed by GitHub
parent 310abc18bc
commit 78b6e19927
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 199 additions and 51 deletions

View file

@ -26,7 +26,9 @@
- coro::when_all(awaitable...) -> awaitable
* Schedulers
- [coro::thread_pool](#thread_pool) for coroutine cooperative multitasking
- [coro::io_scheduler](#io_scheduler) for driving i/o events, uses thread_pool for coroutine execution upon triggered events
- [coro::io_scheduler](#io_scheduler) for driving i/o events
- Can use coro::thread_pool for latency sensitive or long lived tasks.
- Can use inline task processing for thread per core or short lived tasks.
- Currently uses an epoll driver
- [coro::task_container](#task_container) for dynamic task lifetimes
* Coroutine Networking
@ -238,7 +240,18 @@ thread pool worker 0 is shutting down.
```
### io_scheduler
`coro::io_scheduler` is a i/o event scheduler that uses a statically sized pool (`coro::thread_pool`) to process the events that are ready. The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually driven via its `process_events()` function for integration into existing event loops. If using the dedicated thread to process i/o events the dedicated thread does not execute and of the tasks itself, it simply schedules them to be executed on the next availble worker thread in its embedded `coro::thread_pool`. Inline execution of tasks on the i/o dedicated thread is not supported since it can introduce poor latency when an expensive task is executing.
`coro::io_scheduler` is a i/o event scheduler that can use two methods of task processing:
* A background `coro::thread_pool`
* Inline task processing on the `coro::io_scheduler`'s event loop
Using a background `coro::thread_pool` will default to using `(std::thread::hardware_concurrency() - 1)` threads to process tasks. This processing strategy is best for longer tasks that would block the i/o scheduler or for tasks that are latency sensitive.
Using the inline processing strategy will have the event loop i/o thread process the tasks inline on that thread when events are received. This processing strategy is best for shorter task that will not block the i/o thread for long or for pure throughput by using thread per core architecture, e.g. spin up an inline i/o scheduler per core and inline process tasks on each scheduler.
The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually driven via its `process_events()` function for integration into existing event loops. By default i/o schedulers will spawn a dedicated event thread and use a thread pool to process tasks.
Before getting to an example there are two methods of scheduling work onto an i/o scheduler, the first is by having the caller maintain the lifetime of the task being scheduled and the second is by moving or transfering owership of the task into the i/o scheduler. The first can allow for return values but requires the caller to manage the lifetime of the coroutine while the second requires the return type of the task to be void but allows for variable or unknown task lifetimes. Transferring task lifetime to the scheduler can be useful, e.g. for a network request.
The example provided here shows an i/o scheduler that spins up a basic `coro::net::tcp_server` and a `coro::net::tcp_client` that will connect to each other and then send a request and a response.
@ -284,7 +297,8 @@ client: Hello from server 5
### Requirements
C++20 Compiler with coroutine support
g++10.2 is tested
g++10.2.1
g++10.3.1
CMake
make or ninja
pthreads

View file

@ -43,7 +43,7 @@ jobs:
run: |
cd build-release-g++
ctest -VV
build-fedora-31:
build-fedora-32:
name: fedora-32
runs-on: ubuntu-latest
container:
@ -55,8 +55,9 @@ jobs:
cmake \
git \
ninja-build \
gcc-c++-10.2.1 \
gcc-c++ \
lcov \
openssl \
openssl-devel
- name: Checkout # recurisve checkout requires git to be installed first
uses: actions/checkout@v2
@ -100,3 +101,72 @@ jobs:
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
path-to-lcov: build-debug-g++/libcoro_tests.info
build-fedora-33:
name: fedora-33
runs-on: ubuntu-latest
container:
image: fedora:33
steps:
- name: dnf
run: |
sudo dnf install -y \
cmake \
git \
ninja-build \
gcc-c++-10.3.1 \
openssl \
openssl-devel
- name: Checkout # recurisve checkout requires git to be installed first
uses: actions/checkout@v2
with:
submodules: recursive
- name: build-release-g++
run: |
mkdir build-release-g++
cd build-release-g++
cmake \
-GNinja \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_C_COMPILER=gcc \
-DCMAKE_CXX_COMPILER=g++ \
..
ninja
- name: test-release-g++
run: |
cd build-release-g++
ctest -VV
build-fedora-34:
name: fedora-34
runs-on: ubuntu-latest
container:
image: fedora:34
steps:
- name: dnf
run: |
sudo dnf install -y \
cmake \
git \
ninja-build \
gcc-c++ \
openssl \
openssl-devel
- name: Checkout # recurisve checkout requires git to be installed first
uses: actions/checkout@v2
with:
submodules: recursive
- name: build-release-g++
run: |
mkdir build-release-g++
cd build-release-g++
cmake \
-GNinja \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_C_COMPILER=gcc \
-DCMAKE_CXX_COMPILER=g++ \
..
ninja
- name: test-release-g++
run: |
cd build-release-g++
ctest -VV

View file

@ -26,7 +26,9 @@
- coro::when_all(awaitable...) -> awaitable
* Schedulers
- [coro::thread_pool](#thread_pool) for coroutine cooperative multitasking
- [coro::io_scheduler](#io_scheduler) for driving i/o events, uses thread_pool for coroutine execution upon triggered events
- [coro::io_scheduler](#io_scheduler) for driving i/o events
- Can use coro::thread_pool for latency sensitive or long lived tasks.
- Can use inline task processing for thread per core or short lived tasks.
- Currently uses an epoll driver
- [coro::task_container](#task_container) for dynamic task lifetimes
* Coroutine Networking
@ -690,7 +692,18 @@ thread pool worker 0 is shutting down.
```
### io_scheduler
`coro::io_scheduler` is a i/o event scheduler that uses a statically sized pool (`coro::thread_pool`) to process the events that are ready. The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually driven via its `process_events()` function for integration into existing event loops. If using the dedicated thread to process i/o events the dedicated thread does not execute and of the tasks itself, it simply schedules them to be executed on the next availble worker thread in its embedded `coro::thread_pool`. Inline execution of tasks on the i/o dedicated thread is not supported since it can introduce poor latency when an expensive task is executing.
`coro::io_scheduler` is a i/o event scheduler that can use two methods of task processing:
* A background `coro::thread_pool`
* Inline task processing on the `coro::io_scheduler`'s event loop
Using a background `coro::thread_pool` will default to using `(std::thread::hardware_concurrency() - 1)` threads to process tasks. This processing strategy is best for longer tasks that would block the i/o scheduler or for tasks that are latency sensitive.
Using the inline processing strategy will have the event loop i/o thread process the tasks inline on that thread when events are received. This processing strategy is best for shorter task that will not block the i/o thread for long or for pure throughput by using thread per core architecture, e.g. spin up an inline i/o scheduler per core and inline process tasks on each scheduler.
The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually driven via its `process_events()` function for integration into existing event loops. By default i/o schedulers will spawn a dedicated event thread and use a thread pool to process tasks.
Before getting to an example there are two methods of scheduling work onto an i/o scheduler, the first is by having the caller maintain the lifetime of the task being scheduled and the second is by moving or transfering owership of the task into the i/o scheduler. The first can allow for return values but requires the caller to manage the lifetime of the coroutine while the second requires the return type of the task to be void but allows for variable or unknown task lifetimes. Transferring task lifetime to the scheduler can be useful, e.g. for a network request.
The example provided here shows an i/o scheduler that spins up a basic `coro::net::tcp_server` and a `coro::net::tcp_client` that will connect to each other and then send a request and a response.
@ -959,7 +972,8 @@ client: Hello from server 5
### Requirements
C++20 Compiler with coroutine support
g++10.2 is tested
g++10.2.1
g++10.3.1
CMake
make or ninja
pthreads

View file

@ -4,8 +4,6 @@
#include <concepts>
#include <coroutine>
// #include <type_traits>
// #include <utility>
namespace coro::concepts
{

View file

@ -24,7 +24,7 @@ public:
auto initial_suspend() const { return std::suspend_always{}; }
auto final_suspend() const { return std::suspend_always{}; }
auto final_suspend() const noexcept(true) { return std::suspend_always{}; }
template<typename U = T, std::enable_if_t<!std::is_rvalue_reference<U>::value, int> = 0>
auto yield_value(std::remove_reference_t<T>& value) noexcept

View file

@ -4,11 +4,13 @@
#include "coro/fd.hpp"
#include "coro/net/socket.hpp"
#include "coro/poll.hpp"
#include "coro/task_container.hpp"
#include "coro/thread_pool.hpp"
#include <chrono>
#include <functional>
#include <map>
#include <memory>
#include <optional>
#include <sys/eventfd.h>
#include <thread>
@ -152,6 +154,18 @@ public:
*/
auto schedule() -> schedule_operation { return schedule_operation{*this}; }
/**
* Schedules a task onto the io_scheduler and moves ownership of the task to the io_scheduler.
* Only void return type tasks can be scheduled in this manner since the task submitter will no
* longer have control over the scheduled task.
* @param task The task to execute on this io_scheduler. It's lifetime ownership will be transferred
* to this io_scheduler.
*/
auto schedule(coro::task<void>&& task) -> void
{
static_cast<coro::task_container<coro::io_scheduler>*>(m_owned_tasks)->start(std::move(task));
}
/**
* Schedules the current task to run after the given amount of time has elapsed.
* @param amount The amount of time to wait before resuming execution of this task.
@ -304,6 +318,11 @@ private:
std::mutex m_scheduled_tasks_mutex{};
std::vector<std::coroutine_handle<>> m_scheduled_tasks{};
/// Tasks that have their ownership passed into the scheduler. This is a bit strange for now
/// but the concept doesn't pass since io_scheduler isn't fully defined yet.
/// The type is coro::task_container<coro::io_scheduler>*
void* m_owned_tasks{nullptr};
static constexpr const int m_shutdown_object{0};
static constexpr const void* m_shutdown_ptr = &m_shutdown_object;

View file

@ -123,9 +123,7 @@ public:
return completion_notifier{};
}
auto return_void() noexcept -> void {}
auto return_value()
auto return_void() -> void
{
if (m_exception)
{
@ -170,7 +168,8 @@ public:
{
if constexpr (std::is_same_v<void, return_type>)
{
m_coroutine.promise().return_value();
// Propagate exceptions.
m_coroutine.promise().return_void();
return;
}
else

View file

@ -46,7 +46,7 @@ struct promise_base
auto initial_suspend() { return std::suspend_always{}; }
auto final_suspend() { return final_awaitable{}; }
auto final_suspend() noexcept(true) { return final_awaitable{}; }
auto unhandled_exception() -> void { m_exception_ptr = std::current_exception(); }
@ -105,9 +105,7 @@ struct promise<void> : public promise_base
auto get_return_object() noexcept -> task_type;
auto return_void() noexcept -> void {}
auto return_value() const -> void
auto return_void() -> void
{
if (m_exception_ptr)
{
@ -119,7 +117,7 @@ struct promise<void> : public promise_base
} // namespace detail
template<typename return_type>
class task
class [[nodiscard]] task
{
public:
using task_type = task<return_type>;
@ -202,7 +200,19 @@ public:
{
struct awaitable : public awaitable_base
{
auto await_resume() -> decltype(auto) { return this->m_coroutine.promise().return_value(); }
auto await_resume() -> decltype(auto)
{
if constexpr (std::is_same_v<void, return_type>)
{
// Propagate uncaught exceptions.
this->m_coroutine.promise().return_void();
return;
}
else
{
return this->m_coroutine.promise().return_value();
}
}
};
return awaitable{m_coroutine};
@ -212,7 +222,19 @@ public:
{
struct awaitable : public awaitable_base
{
auto await_resume() -> decltype(auto) { return std::move(this->m_coroutine.promise()).return_value(); }
auto await_resume() -> decltype(auto)
{
if constexpr (std::is_same_v<void, return_type>)
{
// Propagate uncaught exceptions.
this->m_coroutine.promise().return_void();
return;
}
else
{
return std::move(this->m_coroutine.promise()).return_value();
}
}
};
return awaitable{m_coroutine};

View file

@ -69,7 +69,7 @@ public:
* call? Calling at regular intervals will reduce memory usage of completed
* tasks and allow for the task container to re-use allocated space.
*/
auto start(coro::task<void> user_task, garbage_collect_t cleanup = garbage_collect_t::yes) -> void
auto start(coro::task<void>&& user_task, garbage_collect_t cleanup = garbage_collect_t::yes) -> void
{
m_size.fetch_add(1, std::memory_order::relaxed);

View file

@ -340,15 +340,7 @@ public:
auto unhandled_exception() noexcept -> void { m_exception_ptr = std::current_exception(); }
auto return_void() noexcept -> void {}
auto start(when_all_latch& latch) -> void
{
m_latch = &latch;
coroutine_handle_type::from_promise(*this).resume();
}
auto return_value() -> void
auto return_void() noexcept -> void
{
if (m_exception_ptr)
{
@ -356,6 +348,12 @@ public:
}
}
auto start(when_all_latch& latch) -> void
{
m_latch = &latch;
coroutine_handle_type::from_promise(*this).resume();
}
private:
when_all_latch* m_latch{nullptr};
std::exception_ptr m_exception_ptr;

View file

@ -19,7 +19,8 @@ io_scheduler::io_scheduler(options opts)
m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)),
m_shutdown_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)),
m_timer_fd(timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)),
m_schedule_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK))
m_schedule_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)),
m_owned_tasks(new coro::task_container<coro::io_scheduler>(*this))
{
if (opts.execution_strategy == execution_strategy_t::process_tasks_on_thread_pool)
{
@ -69,6 +70,12 @@ io_scheduler::~io_scheduler()
close(m_schedule_fd);
m_schedule_fd = -1;
}
if (m_owned_tasks != nullptr)
{
delete static_cast<coro::task_container<coro::io_scheduler>*>(m_owned_tasks);
m_owned_tasks = nullptr;
}
}
auto io_scheduler::process_events(std::chrono::milliseconds timeout) -> std::size_t

View file

@ -40,8 +40,8 @@ tcp_client::~tcp_client()
if (m_ssl_info.m_ssl_ptr != nullptr && !m_ssl_info.m_ssl_error)
{
// Should the shutdown timeout be configurable?
ssl_shutdown_and_free(
*m_io_scheduler, std::move(m_socket), std::move(m_ssl_info.m_ssl_ptr), std::chrono::seconds{30});
m_io_scheduler->schedule(ssl_shutdown_and_free(
*m_io_scheduler, std::move(m_socket), std::move(m_ssl_info.m_ssl_ptr), std::chrono::seconds{30}));
}
}
@ -205,9 +205,6 @@ auto tcp_client::ssl_shutdown_and_free(
io_scheduler& io_scheduler, net::socket s, ssl_unique_ptr ssl_ptr, std::chrono::milliseconds timeout)
-> coro::task<void>
{
// Immediately transfer onto the scheduler thread pool for background processing.
co_await io_scheduler.schedule();
while (true)
{
auto r = SSL_shutdown(ssl_ptr.get());

View file

@ -9,10 +9,12 @@ TEST_CASE("latch count=0", "[latch]")
{
coro::latch l{0};
auto task = [&]() -> coro::task<uint64_t> {
auto make_task = [&]() -> coro::task<uint64_t> {
co_await l;
co_return 42;
}();
};
auto task = make_task();
task.resume();
REQUIRE(task.is_ready()); // The latch never waits due to zero count.
@ -23,11 +25,13 @@ TEST_CASE("latch count=1", "[latch]")
{
coro::latch l{1};
auto task = [&]() -> coro::task<uint64_t> {
auto make_task = [&]() -> coro::task<uint64_t> {
auto workers = l.remaining();
co_await l;
co_return workers;
}();
};
auto task = make_task();
task.resume();
REQUIRE_FALSE(task.is_ready());
@ -41,11 +45,13 @@ TEST_CASE("latch count=1 count_down=5", "[latch]")
{
coro::latch l{1};
auto task = [&]() -> coro::task<uint64_t> {
auto make_task = [&]() -> coro::task<uint64_t> {
auto workers = l.remaining();
co_await l;
co_return workers;
}();
};
auto task = make_task();
task.resume();
REQUIRE_FALSE(task.is_ready());
@ -59,11 +65,13 @@ TEST_CASE("latch count=5 count_down=1 x5", "[latch]")
{
coro::latch l{5};
auto task = [&]() -> coro::task<uint64_t> {
auto make_task = [&]() -> coro::task<uint64_t> {
auto workers = l.remaining();
co_await l;
co_return workers;
}();
};
auto task = make_task();
task.resume();
REQUIRE_FALSE(task.is_ready());
@ -85,11 +93,13 @@ TEST_CASE("latch count=5 count_down=5", "[latch]")
{
coro::latch l{5};
auto task = [&]() -> coro::task<uint64_t> {
auto make_task = [&]() -> coro::task<uint64_t> {
auto workers = l.remaining();
co_await l;
co_return workers;
}();
};
auto task = make_task();
task.resume();
REQUIRE_FALSE(task.is_ready());

View file

@ -206,9 +206,9 @@ TEST_CASE("task throws void", "[task]")
co_return;
}();
task.resume();
REQUIRE_NOTHROW(task.resume());
REQUIRE(task.is_ready());
REQUIRE_THROWS_AS(task.promise().return_value(), std::runtime_error);
REQUIRE_THROWS_AS(task.promise().return_void(), std::runtime_error);
}
TEST_CASE("task throws non-void l-value", "[task]")
@ -218,7 +218,7 @@ TEST_CASE("task throws non-void l-value", "[task]")
co_return 42;
}();
task.resume();
REQUIRE_NOTHROW(task.resume());
REQUIRE(task.is_ready());
REQUIRE_THROWS_AS(task.promise().return_value(), std::runtime_error);
}