1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

coro::task_container example (#58)

This commit is contained in:
Josh Baldwin 2021-02-15 20:17:11 -07:00 committed by GitHub
parent 60a74af219
commit 4aee0dc6f8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 297 additions and 41 deletions

View file

@ -62,4 +62,8 @@ template_contents=$(cat 'README.md')
example_contents=$(cat 'examples/coro_io_scheduler.cpp')
echo "${template_contents/\$\{EXAMPLE_CORO_IO_SCHEDULER_CPP\}/$example_contents}" > README.md
template_contents=$(cat 'README.md')
example_contents=$(cat 'examples/coro_task_container.cpp')
echo "${template_contents/\$\{EXAMPLE_CORO_TASK_CONTAINER_CPP\}/$example_contents}" > README.md
git add README.md

View file

@ -132,7 +132,7 @@ $ ./examples/coro_mutex
```
### coro::thread_pool
`coro::thread_pool` is a staticaly sized pool of worker threads to execute scheduled coroutines from a FIFO queue. To schedule a coroutine on a thread pool the pool's `schedule()` function should be `co_awaited` to transfer the execution from the current thread to a thread pool worker thread. Its important to note that scheduling will first place the coroutine into the FIFO queue and will be picked up by the first available thread in the pool, e.g. there could be a delay if there is a lot of work queued up.
`coro::thread_pool` is a statically sized pool of worker threads to execute scheduled coroutines from a FIFO queue. To schedule a coroutine on a thread pool the pool's `schedule()` function should be `co_awaited` to transfer the execution from the current thread to a thread pool worker thread. Its important to note that scheduling will first place the coroutine into the FIFO queue and will be picked up by the first available thread in the pool, e.g. there could be a delay if there is a lot of work queued up.
```C++
${EXAMPLE_CORO_THREAD_POOL_CPP}
@ -140,6 +140,7 @@ ${EXAMPLE_CORO_THREAD_POOL_CPP}
Example output (will vary based on threads):
```bash
$ ./examples/coro_thread_pool
thread pool worker 0 is starting up.
thread pool worker 2 is starting up.
thread pool worker 3 is starting up.
@ -162,7 +163,9 @@ thread pool worker 0 is shutting down.
```
### coro::io_scheduler
`coro::io_scheduler`
`coro::io_scheduler` is a i/o event scheduler that uses a statically sized pool (`coro::thread_pool`) to process the events that are ready. The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually driven via its `process_events()` function for integration into existing event loops. If using the dedicated thread to process i/o events the dedicated thread does not execute and of the tasks itself, it simply schedules them to be executed on the next availble worker thread in its embedded `coro::thread_pool`. Inline execution of tasks on the i/o dedicated thread is not supported since it can introduce poor latency when an expensive task is executing.
The example provided here shows an i/o scheduler that spins up a basic `coro::net::tcp_server` and a `coro::net::tcp_client` that will connect to each other and then send a request and a response.
```C++
${EXAMPLE_CORO_IO_SCHEDULER_CPP}
@ -170,6 +173,7 @@ ${EXAMPLE_CORO_IO_SCHEDULER_CPP}
Example output:
```bash
$ ./examples/coro_io_scheduler
io_scheduler::thread_pool worker 0 starting
io_scheduler::process event thread start
io_scheduler::thread_pool worker 1 starting
@ -178,7 +182,29 @@ client: Hello from server.
io_scheduler::thread_pool worker 0 stopping
io_scheduler::thread_pool worker 1 stopping
io_scheduler::process event thread stop
```
### coro::task_container
`coro::task_container` is a special container type that will maintain the lifetime of tasks that do not have a known lifetime. This is extremely useful for tasks that hold open connections to clients and possibly process multiple requests from that client before shutting down. The task doesn't know how long it will be alive but at some point in the future it will complete and need to have its resources cleaned up. The `coro::task_container` does this by wrapping the users task into anothe coroutine task that will mark itself for deletion upon completing within the parent task container. The task container should then run garbage collection periodically, or by default when a new task is added, to prune completed tasks from the container.
All tasks that are stored within a `coro::task_container` must have a `void` return type since their result cannot be accessed due to the task's lifetime being indeterminate.
```C++
${EXAMPLE_CORO_TASK_CONTAINER_CPP}
```
```bash
$ ./examples/coro_task_container
server: Hello from client 1
client: Hello from server 1
server: Hello from client 2
client: Hello from server 2
server: Hello from client 3
client: Hello from server 3
server: Hello from client 4
client: Hello from server 4
server: Hello from client 5
client: Hello from server 5
```
### Requirements

110
README.md
View file

@ -352,7 +352,7 @@ $ ./examples/coro_mutex
```
### coro::thread_pool
`coro::thread_pool` is a staticaly sized pool of worker threads to execute scheduled coroutines from a FIFO queue. To schedule a coroutine on a thread pool the pool's `schedule()` function should be `co_awaited` to transfer the execution from the current thread to a thread pool worker thread. Its important to note that scheduling will first place the coroutine into the FIFO queue and will be picked up by the first available thread in the pool, e.g. there could be a delay if there is a lot of work queued up.
`coro::thread_pool` is a statically sized pool of worker threads to execute scheduled coroutines from a FIFO queue. To schedule a coroutine on a thread pool the pool's `schedule()` function should be `co_awaited` to transfer the execution from the current thread to a thread pool worker thread. Its important to note that scheduling will first place the coroutine into the FIFO queue and will be picked up by the first available thread in the pool, e.g. there could be a delay if there is a lot of work queued up.
```C++
#include <coro/coro.hpp>
@ -437,6 +437,7 @@ int main()
Example output (will vary based on threads):
```bash
$ ./examples/coro_thread_pool
thread pool worker 0 is starting up.
thread pool worker 2 is starting up.
thread pool worker 3 is starting up.
@ -459,7 +460,9 @@ thread pool worker 0 is shutting down.
```
### coro::io_scheduler
`coro::io_scheduler`
`coro::io_scheduler` is a i/o event scheduler that uses a statically sized pool (`coro::thread_pool`) to process the events that are ready. The `coro::io_scheduler` can use a dedicated spawned thread for processing events that are ready or it can be maually driven via its `process_events()` function for integration into existing event loops. If using the dedicated thread to process i/o events the dedicated thread does not execute and of the tasks itself, it simply schedules them to be executed on the next availble worker thread in its embedded `coro::thread_pool`. Inline execution of tasks on the i/o dedicated thread is not supported since it can introduce poor latency when an expensive task is executing.
The example provided here shows an i/o scheduler that spins up a basic `coro::net::tcp_server` and a `coro::net::tcp_client` that will connect to each other and then send a request and a response.
```C++
#include <coro/coro.hpp>
@ -610,6 +613,7 @@ int main()
Example output:
```bash
$ ./examples/coro_io_scheduler
io_scheduler::thread_pool worker 0 starting
io_scheduler::process event thread start
io_scheduler::thread_pool worker 1 starting
@ -618,7 +622,109 @@ client: Hello from server.
io_scheduler::thread_pool worker 0 stopping
io_scheduler::thread_pool worker 1 stopping
io_scheduler::process event thread stop
```
### coro::task_container
`coro::task_container` is a special container type that will maintain the lifetime of tasks that do not have a known lifetime. This is extremely useful for tasks that hold open connections to clients and possibly process multiple requests from that client before shutting down. The task doesn't know how long it will be alive but at some point in the future it will complete and need to have its resources cleaned up. The `coro::task_container` does this by wrapping the users task into anothe coroutine task that will mark itself for deletion upon completing within the parent task container. The task container should then run garbage collection periodically, or by default when a new task is added, to prune completed tasks from the container.
All tasks that are stored within a `coro::task_container` must have a `void` return type since their result cannot be accessed due to the task's lifetime being indeterminate.
```C++
#include <coro/coro.hpp>
#include <iostream>
int main()
{
coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
auto make_server_task = [&]() -> coro::task<void> {
// This is the task that will handle processing a client's requests.
auto serve_client = [](coro::net::tcp_client client) -> coro::task<void> {
size_t requests{1};
while (true)
{
// Continue to accept more requests until the client closes the connection.
co_await client.poll(coro::poll_op::read);
std::string request(64, '\0');
auto [recv_status, recv_bytes] = client.recv(request);
if (recv_status == coro::net::recv_status::closed)
{
break;
}
request.resize(recv_bytes.size());
std::cout << "server: " << request << "\n";
auto response = "Hello from server " + std::to_string(requests);
client.send(response);
++requests;
}
co_return;
};
// Spin up the tcp_server and schedule it onto the io_scheduler.
coro::net::tcp_server server{scheduler};
co_await scheduler.schedule();
// All incoming connections will be stored into the task container until they are completed.
coro::task_container tc{scheduler};
// Wait for an incoming connection and accept it, this example will only use 1 connection.
co_await server.poll();
auto client = server.accept();
// Store the task that will serve the client into the container and immediately begin executing it
// on the task container's thread pool, which is the same as the scheduler.
tc.start(serve_client(std::move(client)));
// Wait for all clients to complete before shutting down the tcp_server.
co_await tc.garbage_collect_and_yield_until_empty();
co_return;
};
auto make_client_task = [&](size_t request_count) -> coro::task<void> {
co_await scheduler.schedule();
coro::net::tcp_client client{scheduler};
co_await client.connect();
// Send N requests on the same connection and wait for the server response to each one.
for (size_t i = 1; i <= request_count; ++i)
{
// Send the request data.
auto request = "Hello from client " + std::to_string(i);
client.send(request);
co_await client.poll(coro::poll_op::read);
std::string response(64, '\0');
auto [recv_status, recv_bytes] = client.recv(response);
response.resize(recv_bytes.size());
std::cout << "client: " << response << "\n";
}
co_return; // Upon exiting the tcp_client will close its connection to the server.
};
coro::sync_wait(coro::when_all(make_server_task(), make_client_task(5)));
}
```
```bash
$ ./examples/coro_task_container
server: Hello from client 1
client: Hello from server 1
server: Hello from client 2
client: Hello from server 2
server: Hello from client 3
client: Hello from server 3
server: Hello from client 4
client: Hello from server 4
server: Hello from client 5
client: Hello from server 5
```
### Requirements

View file

@ -29,14 +29,19 @@ add_executable(coro_io_scheduler coro_io_scheduler.cpp)
target_compile_features(coro_io_scheduler PUBLIC cxx_std_20)
target_link_libraries(coro_io_scheduler PUBLIC libcoro)
add_executable(coro_task_container coro_task_container.cpp)
target_compile_features(coro_task_container PUBLIC cxx_std_20)
target_link_libraries(coro_task_container PUBLIC libcoro)
if(${CMAKE_CXX_COMPILER_ID} MATCHES "GNU")
target_compile_options(coro_task PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_generator PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_event PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_latch PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_mutex PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_thread_pool PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_io_scheduler PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_task PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_generator PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_event PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_latch PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_mutex PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_thread_pool PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_io_scheduler PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_task_container PUBLIC -fcoroutines -Wall -Wextra -pipe)
elseif(${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
message(FATAL_ERROR "Clang is currently not supported.")
else()

View file

@ -0,0 +1,81 @@
#include <coro/coro.hpp>
#include <iostream>
int main()
{
coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}};
auto make_server_task = [&]() -> coro::task<void> {
// This is the task that will handle processing a client's requests.
auto serve_client = [](coro::net::tcp_client client) -> coro::task<void> {
size_t requests{1};
while (true)
{
// Continue to accept more requests until the client closes the connection.
co_await client.poll(coro::poll_op::read);
std::string request(64, '\0');
auto [recv_status, recv_bytes] = client.recv(request);
if (recv_status == coro::net::recv_status::closed)
{
break;
}
request.resize(recv_bytes.size());
std::cout << "server: " << request << "\n";
auto response = "Hello from server " + std::to_string(requests);
client.send(response);
++requests;
}
co_return;
};
// Spin up the tcp_server and schedule it onto the io_scheduler.
coro::net::tcp_server server{scheduler};
co_await scheduler.schedule();
// All incoming connections will be stored into the task container until they are completed.
coro::task_container tc{scheduler};
// Wait for an incoming connection and accept it, this example will only use 1 connection.
co_await server.poll();
auto client = server.accept();
// Store the task that will serve the client into the container and immediately begin executing it
// on the task container's thread pool, which is the same as the scheduler.
tc.start(serve_client(std::move(client)));
// Wait for all clients to complete before shutting down the tcp_server.
co_await tc.garbage_collect_and_yield_until_empty();
co_return;
};
auto make_client_task = [&](size_t request_count) -> coro::task<void> {
co_await scheduler.schedule();
coro::net::tcp_client client{scheduler};
co_await client.connect();
// Send N requests on the same connection and wait for the server response to each one.
for (size_t i = 1; i <= request_count; ++i)
{
// Send the request data.
auto request = "Hello from client " + std::to_string(i);
client.send(request);
co_await client.poll(coro::poll_op::read);
std::string response(64, '\0');
auto [recv_status, recv_bytes] = client.recv(response);
response.resize(recv_bytes.size());
std::cout << "client: " << response << "\n";
}
co_return; // Upon exiting the tcp_client will close its connection to the server.
};
coro::sync_wait(coro::when_all(make_server_task(), make_client_task(5)));
}

View file

@ -85,7 +85,7 @@ private:
/// are not setup when ares_poll() is called.
std::unordered_set<io_scheduler::fd_t> m_active_sockets{};
task_container m_task_container{};
task_container m_task_container;
/// Global count to track if c-ares has been initialized or cleaned up.
static uint64_t m_ares_count;

View file

@ -9,6 +9,8 @@
namespace coro
{
class thread_pool;
class task_container
{
public:
@ -22,14 +24,19 @@ public:
double growth_factor{2};
};
explicit task_container(const options opts = options{.reserve_size = 8, .growth_factor = 2});
/**
* @param tp Tasks started in the container are scheduled onto this thread pool. For tasks created
* from a coro::io_scheduler, this would usually be that coro::io_scheduler instance.
* @param opts Task container options.
*/
task_container(thread_pool& tp, const options opts = options{.reserve_size = 8, .growth_factor = 2});
task_container(const task_container&) = delete;
task_container(task_container&&) = delete;
auto operator=(const task_container&) -> task_container& = delete;
auto operator=(task_container&&) -> task_container& = delete;
~task_container();
enum class garbage_collect
enum class garbage_collect_t
{
/// Execute garbage collection.
yes,
@ -38,23 +45,20 @@ public:
};
/**
* Stores a users task and sets a continuation coroutine to automatically mark the task
* as deleted upon the coroutines completion.
* @param user_task The scheduled user's task to store since it has suspended after its
* first execution.
* Stores a user task and starts its execution on the container's thread pool.
* @param user_task The scheduled user's task to store in this task container and start its execution.
* @param cleanup Should the task container run garbage collect at the beginning of this store
* call? Calling at regular intervals will reduce memory usage of completed
* tasks and allow for the task container to re-use allocated space.
* @return The task just stored wrapped in the self cleanup task.
*/
auto store(coro::task<void> user_task, garbage_collect cleanup = garbage_collect::yes) -> coro::task<void>&;
auto start(coro::task<void> user_task, garbage_collect_t cleanup = garbage_collect_t::yes) -> void;
/**
* Garbage collects any tasks that are marked as deleted. This frees up space to be re-used by
* the task container for newly stored tasks.
* @return The number of tasks that were deleted.
*/
auto gc() -> std::size_t;
auto garbage_collect() -> std::size_t;
/**
* @return The number of tasks that are awaiting deletion.
@ -93,6 +97,15 @@ public:
return m_tasks.size();
}
/**
* Will continue to garbage collect and yield until all tasks are complete. This method can be
* co_await'ed to make it easier to wait for the task container to have all its tasks complete.
*
* This does not shut down the task container, but can be used when shutting down, or if your
* logic requires all the tasks contained within to complete, it is similar to coro::latch.
*/
auto garbage_collect_and_yield_until_empty() -> coro::task<void>;
private:
/**
* Grows each task container by the growth factor.
@ -135,6 +148,8 @@ private:
task_position m_free_pos{};
/// The amount to grow the containers by when all spaces are taken.
double m_growth_factor{};
/// The thread pool to schedule tasks that have just started.
thread_pool& m_thread_pool;
};
} // namespace coro

View file

@ -48,7 +48,8 @@ dns_result::dns_result(coro::io_scheduler& scheduler, coro::event& resume, uint6
dns_resolver::dns_resolver(io_scheduler& scheduler, std::chrono::milliseconds timeout)
: m_io_scheduler(scheduler),
m_timeout(timeout)
m_timeout(timeout),
m_task_container(scheduler)
{
{
std::scoped_lock g{m_ares_mutex};
@ -149,7 +150,7 @@ auto dns_resolver::ares_poll() -> void
// If this socket is not currently actively polling, start polling!
if (m_active_sockets.emplace(fd).second)
{
m_task_container.store(make_poll_task(fd, poll_ops[i])).resume();
m_task_container.start(make_poll_task(fd, poll_ops[i]));
}
}
}

View file

@ -1,10 +1,13 @@
#include "coro/task_container.hpp"
#include "coro/thread_pool.hpp"
#include <iostream>
namespace coro
{
task_container::task_container(const options opts) : m_growth_factor(opts.growth_factor)
task_container::task_container(thread_pool& tp, const options opts)
: m_growth_factor(opts.growth_factor),
m_thread_pool(tp)
{
m_tasks.resize(opts.reserve_size);
for (std::size_t i = 0; i < opts.reserve_size; ++i)
@ -16,20 +19,20 @@ task_container::task_container(const options opts) : m_growth_factor(opts.growth
task_container::~task_container()
{
// TODO: Not entirely sure how to best do this as this could hold up the thread that could
// be finishing the remaining tasks..
// This will hang the current thread.. but if tasks are not complete thats also pretty bad.
while (!empty())
{
gc();
garbage_collect();
}
}
auto task_container::store(coro::task<void> user_task, garbage_collect cleanup) -> coro::task<void>&
auto task_container::start(coro::task<void> user_task, garbage_collect_t cleanup) -> void
{
m_size.fetch_add(1, std::memory_order::relaxed);
std::scoped_lock lk{m_mutex};
if (cleanup == garbage_collect::yes)
if (cleanup == garbage_collect_t::yes)
{
gc_internal();
}
@ -47,15 +50,25 @@ auto task_container::store(coro::task<void> user_task, garbage_collect cleanup)
// Mark the current used slot as used.
std::advance(m_free_pos, 1);
return m_tasks[index];
// Start executing from the cleanup task to schedule the user's task onto the thread pool.
m_tasks[index].resume();
}
auto task_container::gc() -> std::size_t
auto task_container::garbage_collect() -> std::size_t
{
std::scoped_lock lk{m_mutex};
return gc_internal();
}
auto task_container::garbage_collect_and_yield_until_empty() -> coro::task<void>
{
while (!empty())
{
garbage_collect();
co_await m_thread_pool.yield();
}
}
auto task_container::grow() -> task_position
{
// Save an index at the current last item.
@ -95,8 +108,12 @@ auto task_container::gc_internal() -> std::size_t
auto task_container::make_cleanup_task(task<void> user_task, task_position pos) -> coro::task<void>
{
// Immediately move the task onto the thread pool.
co_await m_thread_pool.schedule();
try
{
// Await the users task to complete.
co_await user_task;
}
catch (const std::exception& e)
@ -105,18 +122,19 @@ auto task_container::make_cleanup_task(task<void> user_task, task_position pos)
// since the co_await will unwrap the unhandled exception on the task.
// The user's task should ideally be wrapped in a catch all and handle it themselves, but
// that cannot be guaranteed.
std::cerr << "task_container user_task had an unhandled exception e.what()= " << e.what() << "\n";
std::cerr << "coro::task_container user_task had an unhandled exception e.what()= " << e.what() << "\n";
}
catch (...)
{
// don't crash if they throw something that isn't derived from std::exception
std::cerr << "task_container user_task had unhandle exception, not derived from std::exception.\n";
std::cerr << "coro::task_container user_task had unhandle exception, not derived from std::exception.\n";
}
{
std::scoped_lock lk{m_mutex};
m_tasks_to_delete.push_back(pos);
}
std::scoped_lock lk{m_mutex};
m_tasks_to_delete.push_back(pos);
// This has to be done within scope lock to make sure this coroutine task completes before the
// task container object destructs -- if it was waiting on .empty() to become true.
m_size.fetch_sub(1, std::memory_order::relaxed);
co_return;
}

View file

@ -327,7 +327,7 @@ TEST_CASE("benchmark tcp_server echo server", "[benchmark]")
uint64_t id;
coro::io_scheduler scheduler{
coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = server_thread_count}}};
coro::task_container task_container{};
coro::task_container task_container{scheduler};
uint64_t live_clients{0};
coro::event wait_for_clients{};
};
@ -342,7 +342,7 @@ TEST_CASE("benchmark tcp_server echo server", "[benchmark]")
auto make_on_connection_task = [&](server& s, coro::net::tcp_client client) -> coro::task<void> {
std::string in(64, '\0');
// Echo the messages until the socket is closed. a 'done' message arrives.
// Echo the messages until the socket is closed.
while (true)
{
auto pstatus = co_await client.poll(coro::poll_op::read);
@ -389,7 +389,7 @@ TEST_CASE("benchmark tcp_server echo server", "[benchmark]")
accepted.fetch_add(1, std::memory_order::release);
s.live_clients++;
s.task_container.store(make_on_connection_task(s, std::move(c))).resume();
s.task_container.start(make_on_connection_task(s, std::move(c)));
}
}
}