1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

io_scheduler example (#57)

This commit is contained in:
Josh Baldwin 2021-02-15 18:52:45 -07:00 committed by GitHub
parent 017a4e2621
commit 60a74af219
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 412 additions and 12 deletions

View file

@ -58,4 +58,8 @@ template_contents=$(cat 'README.md')
example_contents=$(cat 'examples/coro_thread_pool.cpp')
echo "${template_contents/\$\{EXAMPLE_CORO_THREAD_POOL_CPP\}/$example_contents}" > README.md
template_contents=$(cat 'README.md')
example_contents=$(cat 'examples/coro_io_scheduler.cpp')
echo "${template_contents/\$\{EXAMPLE_CORO_IO_SCHEDULER_CPP\}/$example_contents}" > README.md
git add README.md

View file

@ -30,7 +30,9 @@
- coro::net::dns_resolver for async dns
- Uses libc-ares
- coro::net::tcp_client
- Supports SSL/TLS via OpenSSL
- coro::net::tcp_server
- Supports SSL/TLS via OpenSSL
- coro::net::udp_peer
## Usage
@ -157,7 +159,27 @@ thread pool worker 1 is shutting down.
thread pool worker 2 is shutting down.
thread pool worker 3 is shutting down.
thread pool worker 0 is shutting down.
````
```
### coro::io_scheduler
`coro::io_scheduler`
```C++
${EXAMPLE_CORO_IO_SCHEDULER_CPP}
```
Example output:
```bash
io_scheduler::thread_pool worker 0 starting
io_scheduler::process event thread start
io_scheduler::thread_pool worker 1 starting
server: Hello from client.
client: Hello from server.
io_scheduler::thread_pool worker 0 stopping
io_scheduler::thread_pool worker 1 stopping
io_scheduler::process event thread stop
```
### Requirements
C++20 Compiler with coroutine support
@ -175,7 +197,7 @@ This project uses gitsubmodules, to properly checkout this project use:
git clone --recurse-submodules <libcoro-url>
This project depends on the following projects:
This project depends on the following git sub-modules:
* [libc-ares](https://github.com/c-ares/c-ares) For async DNS resolver.
* [catch2](https://github.com/catchorg/Catch2) For testing.

169
README.md
View file

@ -30,7 +30,9 @@
- coro::net::dns_resolver for async dns
- Uses libc-ares
- coro::net::tcp_client
- Supports SSL/TLS via OpenSSL
- coro::net::tcp_server
- Supports SSL/TLS via OpenSSL
- coro::net::udp_peer
## Usage
@ -454,7 +456,170 @@ thread pool worker 1 is shutting down.
thread pool worker 2 is shutting down.
thread pool worker 3 is shutting down.
thread pool worker 0 is shutting down.
````
```
### coro::io_scheduler
`coro::io_scheduler`
```C++
#include <coro/coro.hpp>
#include <iostream>
int main()
{
coro::io_scheduler scheduler{coro::io_scheduler::options{
// The scheduler will spawn a dedicated event processing thread. This is the default, but
// it is possible to use 'manual' and call 'process_events()' to drive the scheduler yourself.
.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn,
// If the scheduler is in spawn mode this functor is called upon starting the dedicated
// event processor thread.
.on_io_thread_start_functor = [] { std::cout << "io_scheduler::process event thread start\n"; },
// If the scheduler is in spawn mode this functor is called upon stopping the dedicated
// event process thread.
.on_io_thread_stop_functor = [] { std::cout << "io_scheduler::process event thread stop\n"; },
// The io scheduler uses a coro::thread_pool to process the events or tasks it is given.
// The tasks are not processed inline on the dedicated event processor thread so events can
// be received and handled as soon as a worker thread is available. See the coro::thread_pool
// for the available options and their descriptions.
.pool =
coro::thread_pool::options{
.thread_count = 2,
.on_thread_start_functor =
[](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " starting\n"; },
.on_thread_stop_functor =
[](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " stopping\n"; }}}};
auto make_server_task = [&]() -> coro::task<void> {
// Start by creating a tcp server, we'll do this before putting it into the scheduler so
// it is immediately available for the client to connect since this will create a socket,
// bind the socket and start listening on that socket. See tcp_server for more details on
// how to specify the local address and port to bind to as well as enabling SSL/TLS.
coro::net::tcp_server server{scheduler};
// Now scheduler this task onto the scheduler.
co_await scheduler.schedule();
// Wait for an incoming connection and accept it.
auto poll_status = co_await server.poll();
if (poll_status != coro::poll_status::event)
{
co_return; // Handle error, see poll_status for detailed error states.
}
// Accept the incoming client connection.
auto client = server.accept();
// Verify the incoming connection was accepted correctly.
if (!client.socket().is_valid())
{
co_return; // Handle error.
}
// Now wait for the client message, this message is small enough it should always arrive
// with a single recv() call.
poll_status = co_await client.poll(coro::poll_op::read);
if (poll_status != coro::poll_status::event)
{
co_return; // Handle error.
}
// Prepare a buffer and recv() the client's message. This function returns the recv() status
// as well as a span<char> that overlaps the given buffer for the bytes that were read. This
// can be used to resize the buffer or work with the bytes without modifying the buffer at all.
std::string request(256, '\0');
auto [recv_status, recv_bytes] = client.recv(request);
if (recv_status != coro::net::recv_status::ok)
{
co_return; // Handle error, see net::recv_status for detailed error states.
}
request.resize(recv_bytes.size());
std::cout << "server: " << request << "\n";
// Make sure the client socket can be written to.
poll_status = co_await client.poll(coro::poll_op::write);
if (poll_status != coro::poll_status::event)
{
co_return; // Handle error.
}
// Send the server response to the client.
// This message is small enough that it will be sent in a single send() call, but to demonstrate
// how to use the 'remaining' portion of the send() result this is wrapped in a loop until
// all the bytes are sent.
std::string response = "Hello from server.";
std::span<const char> remaining = response;
do
{
// Optimistically send() prior to polling.
auto [send_status, r] = client.send(remaining);
if (send_status != coro::net::send_status::ok)
{
co_return; // Handle error, see net::send_status for detailed error states.
}
if (r.empty())
{
break; // The entire message has been sent.
}
// Re-assign remaining bytes for the next loop iteration and poll for the socket to be
// able to be written to again.
remaining = r;
auto pstatus = co_await client.poll(coro::poll_op::write);
if (pstatus != coro::poll_status::event)
{
co_return; // Handle error.
}
} while (true);
co_return;
};
auto make_client_task = [&]() -> coro::task<void> {
// Immediately schedule onto the scheduler.
co_await scheduler.schedule();
// Create the tcp_client with the default settings, see tcp_client for how to set the
// ip address, port, and optionally enabling SSL/TLS.
coro::net::tcp_client client{scheduler};
// Ommitting error checking code for the client, each step should check the status and
// verify the number of bytes sent or received.
// Connect to the server.
co_await client.connect();
// Send the request data.
client.send(std::string_view{"Hello from client."});
// Wait for the response an receive it.
co_await client.poll(coro::poll_op::read);
std::string response(256, '\0');
auto [recv_status, recv_bytes] = client.recv(response);
response.resize(recv_bytes.size());
std::cout << "client: " << response << "\n";
co_return;
};
// Create and wait for the server and client tasks to complete.
coro::sync_wait(coro::when_all(make_server_task(), make_client_task()));
}
```
Example output:
```bash
io_scheduler::thread_pool worker 0 starting
io_scheduler::process event thread start
io_scheduler::thread_pool worker 1 starting
server: Hello from client.
client: Hello from server.
io_scheduler::thread_pool worker 0 stopping
io_scheduler::thread_pool worker 1 stopping
io_scheduler::process event thread stop
```
### Requirements
C++20 Compiler with coroutine support
@ -472,7 +637,7 @@ This project uses gitsubmodules, to properly checkout this project use:
git clone --recurse-submodules <libcoro-url>
This project depends on the following projects:
This project depends on the following git sub-modules:
* [libc-ares](https://github.com/c-ares/c-ares) For async DNS resolver.
* [catch2](https://github.com/catchorg/Catch2) For testing.

View file

@ -25,13 +25,18 @@ add_executable(coro_thread_pool coro_thread_pool.cpp)
target_compile_features(coro_thread_pool PUBLIC cxx_std_20)
target_link_libraries(coro_thread_pool PUBLIC libcoro)
add_executable(coro_io_scheduler coro_io_scheduler.cpp)
target_compile_features(coro_io_scheduler PUBLIC cxx_std_20)
target_link_libraries(coro_io_scheduler PUBLIC libcoro)
if(${CMAKE_CXX_COMPILER_ID} MATCHES "GNU")
target_compile_options(coro_task PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_generator PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_event PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_latch PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_mutex PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_thread_pool PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_task PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_generator PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_event PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_latch PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_mutex PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_thread_pool PUBLIC -fcoroutines -Wall -Wextra -pipe)
target_compile_options(coro_io_scheduler PUBLIC -fcoroutines -Wall -Wextra -pipe)
elseif(${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
message(FATAL_ERROR "Clang is currently not supported.")
else()

View file

@ -0,0 +1,144 @@
#include <coro/coro.hpp>
#include <iostream>
int main()
{
coro::io_scheduler scheduler{coro::io_scheduler::options{
// The scheduler will spawn a dedicated event processing thread. This is the default, but
// it is possible to use 'manual' and call 'process_events()' to drive the scheduler yourself.
.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn,
// If the scheduler is in spawn mode this functor is called upon starting the dedicated
// event processor thread.
.on_io_thread_start_functor = [] { std::cout << "io_scheduler::process event thread start\n"; },
// If the scheduler is in spawn mode this functor is called upon stopping the dedicated
// event process thread.
.on_io_thread_stop_functor = [] { std::cout << "io_scheduler::process event thread stop\n"; },
// The io scheduler uses a coro::thread_pool to process the events or tasks it is given.
// The tasks are not processed inline on the dedicated event processor thread so events can
// be received and handled as soon as a worker thread is available. See the coro::thread_pool
// for the available options and their descriptions.
.pool =
coro::thread_pool::options{
.thread_count = 2,
.on_thread_start_functor =
[](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " starting\n"; },
.on_thread_stop_functor =
[](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " stopping\n"; }}}};
auto make_server_task = [&]() -> coro::task<void> {
// Start by creating a tcp server, we'll do this before putting it into the scheduler so
// it is immediately available for the client to connect since this will create a socket,
// bind the socket and start listening on that socket. See tcp_server for more details on
// how to specify the local address and port to bind to as well as enabling SSL/TLS.
coro::net::tcp_server server{scheduler};
// Now scheduler this task onto the scheduler.
co_await scheduler.schedule();
// Wait for an incoming connection and accept it.
auto poll_status = co_await server.poll();
if (poll_status != coro::poll_status::event)
{
co_return; // Handle error, see poll_status for detailed error states.
}
// Accept the incoming client connection.
auto client = server.accept();
// Verify the incoming connection was accepted correctly.
if (!client.socket().is_valid())
{
co_return; // Handle error.
}
// Now wait for the client message, this message is small enough it should always arrive
// with a single recv() call.
poll_status = co_await client.poll(coro::poll_op::read);
if (poll_status != coro::poll_status::event)
{
co_return; // Handle error.
}
// Prepare a buffer and recv() the client's message. This function returns the recv() status
// as well as a span<char> that overlaps the given buffer for the bytes that were read. This
// can be used to resize the buffer or work with the bytes without modifying the buffer at all.
std::string request(256, '\0');
auto [recv_status, recv_bytes] = client.recv(request);
if (recv_status != coro::net::recv_status::ok)
{
co_return; // Handle error, see net::recv_status for detailed error states.
}
request.resize(recv_bytes.size());
std::cout << "server: " << request << "\n";
// Make sure the client socket can be written to.
poll_status = co_await client.poll(coro::poll_op::write);
if (poll_status != coro::poll_status::event)
{
co_return; // Handle error.
}
// Send the server response to the client.
// This message is small enough that it will be sent in a single send() call, but to demonstrate
// how to use the 'remaining' portion of the send() result this is wrapped in a loop until
// all the bytes are sent.
std::string response = "Hello from server.";
std::span<const char> remaining = response;
do
{
// Optimistically send() prior to polling.
auto [send_status, r] = client.send(remaining);
if (send_status != coro::net::send_status::ok)
{
co_return; // Handle error, see net::send_status for detailed error states.
}
if (r.empty())
{
break; // The entire message has been sent.
}
// Re-assign remaining bytes for the next loop iteration and poll for the socket to be
// able to be written to again.
remaining = r;
auto pstatus = co_await client.poll(coro::poll_op::write);
if (pstatus != coro::poll_status::event)
{
co_return; // Handle error.
}
} while (true);
co_return;
};
auto make_client_task = [&]() -> coro::task<void> {
// Immediately schedule onto the scheduler.
co_await scheduler.schedule();
// Create the tcp_client with the default settings, see tcp_client for how to set the
// ip address, port, and optionally enabling SSL/TLS.
coro::net::tcp_client client{scheduler};
// Ommitting error checking code for the client, each step should check the status and
// verify the number of bytes sent or received.
// Connect to the server.
co_await client.connect();
// Send the request data.
client.send(std::string_view{"Hello from client."});
// Wait for the response an receive it.
co_await client.poll(coro::poll_op::read);
std::string response(256, '\0');
auto [recv_status, recv_bytes] = client.recv(response);
response.resize(recv_bytes.size());
std::cout << "client: " << response << "\n";
co_return;
};
// Create and wait for the server and client tasks to complete.
coro::sync_wait(coro::when_all(make_server_task(), make_client_task()));
}

View file

@ -39,10 +39,14 @@ public:
struct options
{
thread_strategy_t thread_strategy{thread_strategy_t::spawn};
/// Should the io scheduler spawn a dedicated event processor?
thread_strategy_t thread_strategy{thread_strategy_t::spawn};
/// If spawning a dedicated event processor a functor to call upon that thread starting.
std::function<void()> on_io_thread_start_functor{nullptr};
/// If spawning a dedicated event processor a functor to call upon that thread stopping.
std::function<void()> on_io_thread_stop_functor{nullptr};
thread_pool::options pool{
/// Thread pool options for the task processor threads. See thread pool for more details.
thread_pool::options pool{
.thread_count = ((std::thread::hardware_concurrency() > 1) ? (std::thread::hardware_concurrency() - 1) : 1),
.on_thread_start_functor = nullptr,
.on_thread_stop_functor = nullptr};
@ -66,17 +70,65 @@ public:
virtual ~io_scheduler() override;
/**
* Given a thread_strategy_t::manual this function should be called at regular intervals to
* process events that are ready. If a using thread_strategy_t::spawn this is run continously
* on a dedicated background thread and does not need to be manually invoked.
* @param timeout If no events are ready how long should the function wait for events to be ready?
* Passing zero (default) for the timeout will check for any events that are
* ready now, and then return. This could be zero events. Passing -1 means block
* indefinitely until an event happens.
* @param return The number of tasks currently executing or waiting to execute.
*/
auto process_events(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> std::size_t;
/**
* Schedules the current task to run after the given amount of time has elapsed.
* @param amount The amount of time to wait before resuming execution of this task.
* Given zero or negative amount of time this behaves identical to schedule().
*/
[[nodiscard]] auto schedule_after(std::chrono::milliseconds amount) -> coro::task<void>;
/**
* Schedules the current task to run at a given time point in the future.
* @param time The time point to resume execution of this task. Given 'now' or a time point
* in the past this behaves identical to schedule().
*/
[[nodiscard]] auto schedule_at(time_point time) -> coro::task<void>;
/**
* Yields the current task for the given amount of time.
* @param amount The amount of time to yield for before resuming executino of this task.
* Given zero or negative amount of time this behaves identical to yield().
*/
[[nodiscard]] auto yield_for(std::chrono::milliseconds amount) -> coro::task<void>;
/**
* Yields the current task until the given time point in the future.
* @param time The time point to resume execution of this task. Given 'now' or a time point in the
* in the past this behaves identical to yield().
*/
[[nodiscard]] auto yield_until(time_point time) -> coro::task<void>;
/**
* Polls the given file descriptor for the given operations.
* @param fd The file descriptor to poll for events.
* @param op The operations to poll for.
* @param timeout The amount of time to wait for the events to trigger. A timeout of zero will
* block indefinitely until the event triggers.
* @return The result of the poll operation.
*/
[[nodiscard]] auto poll(fd_t fd, coro::poll_op op, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<poll_status>;
/**
* Polls the given coro::net::socket for the given operations.
* @param sock The socket to poll for events on.
* @param op The operations to poll for.
* @param timeout The amount of time to wait for the events to trigger. A timeout of zero will
* block indefinitely until the event triggers.
* @return THe result of the poll operation.
*/
[[nodiscard]] auto poll(
const net::socket& sock, coro::poll_op op, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<poll_status>
@ -84,6 +136,14 @@ public:
return poll(sock.native_handle(), op, timeout);
}
/**
* Starts the shutdown of the io scheduler. All currently executing and pending tasks will complete
* prior to shutting down.
* @param wait_for_tasks Given shutdown_t::sync this function will block until all oustanding
* tasks are completed. Given shutdown_t::async this function will trigger
* the shutdown process but return immediately. In this case the io_scheduler's
* destructor will block if any background threads haven't joined.
*/
auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) noexcept -> void override;
private: