diff --git a/.githooks/readme-template.md b/.githooks/readme-template.md index 7206346..c61b01a 100644 --- a/.githooks/readme-template.md +++ b/.githooks/readme-template.md @@ -143,6 +143,8 @@ Its very easy to see the LIFO 'atomic' queue in action in the beginning where 22 ### shared_mutex The `coro::shared_mutex` is a thread safe async tool to allow for multiple shared users at once but also exclusive access. The lock is acquired strictly in a FIFO manner in that if the lock is currenty held by shared users and an exclusive attempts to lock, the exclusive waiter will suspend until all the _current_ shared users finish using the lock. Any new users that attempt to lock the mutex in a shared state once there is an exclusive waiter will also wait behind the exclusive waiter. This prevents the exclusive waiter from being starved. +The `coro::shared_mutex` requires a `executor_type` when constructed to be able to resume multiple shared waiters when an exclusive lock is released. This allows for all of the pending shared waiters to be resumed concurrently. + ```C++ ${EXAMPLE_CORO_SHARED_MUTEX_CPP} diff --git a/README.md b/README.md index 7eb0d7b..7faa133 100644 --- a/README.md +++ b/README.md @@ -363,6 +363,8 @@ Its very easy to see the LIFO 'atomic' queue in action in the beginning where 22 ### shared_mutex The `coro::shared_mutex` is a thread safe async tool to allow for multiple shared users at once but also exclusive access. The lock is acquired strictly in a FIFO manner in that if the lock is currenty held by shared users and an exclusive attempts to lock, the exclusive waiter will suspend until all the _current_ shared users finish using the lock. Any new users that attempt to lock the mutex in a shared state once there is an exclusive waiter will also wait behind the exclusive waiter. This prevents the exclusive waiter from being starved. +The `coro::shared_mutex` requires a `executor_type` when constructed to be able to resume multiple shared waiters when an exclusive lock is released. This allows for all of the pending shared waiters to be resumed concurrently. + ```C++ #include @@ -370,30 +372,30 @@ The `coro::shared_mutex` is a thread safe async tool to allow for multiple share int main() { - // Shared mutexes require a thread pool to be able to wake up multiple shared waiters when + // Shared mutexes require an excutor type to be able to wake up multiple shared waiters when // there is an exclusive lock holder releasing the lock. This example uses a single thread // to also show the interleaving of coroutines acquiring the shared lock in shared and // exclusive mode as they resume and suspend in a linear manner. Ideally the thread pool - // would have more than 1 thread to resume all shared waiters in parallel. - coro::thread_pool tp{coro::thread_pool::options{.thread_count = 1}}; + // executor would have more than 1 thread to resume all shared waiters in parallel. + auto tp = std::make_shared(coro::thread_pool::options{.thread_count = 1}); coro::shared_mutex mutex{tp}; auto make_shared_task = [&](uint64_t i) -> coro::task { - co_await tp.schedule(); + co_await tp->schedule(); { std::cerr << "shared task " << i << " lock_shared()\n"; auto scoped_lock = co_await mutex.lock_shared(); std::cerr << "shared task " << i << " lock_shared() acquired\n"; /// Immediately yield so the other shared tasks also acquire in shared state /// while this task currently holds the mutex in shared state. - co_await tp.yield(); + co_await tp->yield(); std::cerr << "shared task " << i << " unlock_shared()\n"; } co_return; }; auto make_exclusive_task = [&]() -> coro::task { - co_await tp.schedule(); + co_await tp->schedule(); std::cerr << "exclusive task lock()\n"; auto scoped_lock = co_await mutex.lock(); @@ -713,7 +715,7 @@ The example provided here shows an i/o scheduler that spins up a basic `coro::ne int main() { - coro::io_scheduler scheduler{coro::io_scheduler::options{ + auto scheduler = std::make_shared(coro::io_scheduler::options{ // The scheduler will spawn a dedicated event processing thread. This is the default, but // it is possible to use 'manual' and call 'process_events()' to drive the scheduler yourself. .thread_strategy = coro::io_scheduler::thread_strategy_t::spawn, @@ -733,7 +735,7 @@ int main() .on_thread_start_functor = [](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " starting\n"; }, .on_thread_stop_functor = - [](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " stopping\n"; }}}}; + [](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " stopping\n"; }}}); auto make_server_task = [&]() -> coro::task { // Start by creating a tcp server, we'll do this before putting it into the scheduler so @@ -743,7 +745,7 @@ int main() coro::net::tcp_server server{scheduler}; // Now scheduler this task onto the scheduler. - co_await scheduler.schedule(); + co_await scheduler->schedule(); // Wait for an incoming connection and accept it. auto poll_status = co_await server.poll(); @@ -824,7 +826,7 @@ int main() auto make_client_task = [&]() -> coro::task { // Immediately schedule onto the scheduler. - co_await scheduler.schedule(); + co_await scheduler->schedule(); // Create the tcp_client with the default settings, see tcp_client for how to set the // ip address, port, and optionally enabling SSL/TLS. @@ -878,7 +880,8 @@ All tasks that are stored within a `coro::task_container` must have a `void` ret int main() { - coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto scheduler = std::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_server_task = [&]() -> coro::task { // This is the task that will handle processing a client's requests. @@ -911,7 +914,7 @@ int main() // Spin up the tcp_server and schedule it onto the io_scheduler. coro::net::tcp_server server{scheduler}; - co_await scheduler.schedule(); + co_await scheduler->schedule(); // All incoming connections will be stored into the task container until they are completed. coro::task_container tc{scheduler}; @@ -929,7 +932,7 @@ int main() }; auto make_client_task = [&](size_t request_count) -> coro::task { - co_await scheduler.schedule(); + co_await scheduler->schedule(); coro::net::tcp_client client{scheduler}; co_await client.connect(); diff --git a/examples/coro_io_scheduler.cpp b/examples/coro_io_scheduler.cpp index 90e2fc0..4d45e2b 100644 --- a/examples/coro_io_scheduler.cpp +++ b/examples/coro_io_scheduler.cpp @@ -3,7 +3,7 @@ int main() { - coro::io_scheduler scheduler{coro::io_scheduler::options{ + auto scheduler = std::make_shared(coro::io_scheduler::options{ // The scheduler will spawn a dedicated event processing thread. This is the default, but // it is possible to use 'manual' and call 'process_events()' to drive the scheduler yourself. .thread_strategy = coro::io_scheduler::thread_strategy_t::spawn, @@ -23,7 +23,7 @@ int main() .on_thread_start_functor = [](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " starting\n"; }, .on_thread_stop_functor = - [](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " stopping\n"; }}}}; + [](size_t i) { std::cout << "io_scheduler::thread_pool worker " << i << " stopping\n"; }}}); auto make_server_task = [&]() -> coro::task { // Start by creating a tcp server, we'll do this before putting it into the scheduler so @@ -33,7 +33,7 @@ int main() coro::net::tcp_server server{scheduler}; // Now scheduler this task onto the scheduler. - co_await scheduler.schedule(); + co_await scheduler->schedule(); // Wait for an incoming connection and accept it. auto poll_status = co_await server.poll(); @@ -114,7 +114,7 @@ int main() auto make_client_task = [&]() -> coro::task { // Immediately schedule onto the scheduler. - co_await scheduler.schedule(); + co_await scheduler->schedule(); // Create the tcp_client with the default settings, see tcp_client for how to set the // ip address, port, and optionally enabling SSL/TLS. diff --git a/examples/coro_shared_mutex.cpp b/examples/coro_shared_mutex.cpp index c389516..e3eda66 100644 --- a/examples/coro_shared_mutex.cpp +++ b/examples/coro_shared_mutex.cpp @@ -3,30 +3,30 @@ int main() { - // Shared mutexes require a thread pool to be able to wake up multiple shared waiters when + // Shared mutexes require an excutor type to be able to wake up multiple shared waiters when // there is an exclusive lock holder releasing the lock. This example uses a single thread // to also show the interleaving of coroutines acquiring the shared lock in shared and // exclusive mode as they resume and suspend in a linear manner. Ideally the thread pool - // would have more than 1 thread to resume all shared waiters in parallel. - coro::thread_pool tp{coro::thread_pool::options{.thread_count = 1}}; + // executor would have more than 1 thread to resume all shared waiters in parallel. + auto tp = std::make_shared(coro::thread_pool::options{.thread_count = 1}); coro::shared_mutex mutex{tp}; auto make_shared_task = [&](uint64_t i) -> coro::task { - co_await tp.schedule(); + co_await tp->schedule(); { std::cerr << "shared task " << i << " lock_shared()\n"; auto scoped_lock = co_await mutex.lock_shared(); std::cerr << "shared task " << i << " lock_shared() acquired\n"; /// Immediately yield so the other shared tasks also acquire in shared state /// while this task currently holds the mutex in shared state. - co_await tp.yield(); + co_await tp->yield(); std::cerr << "shared task " << i << " unlock_shared()\n"; } co_return; }; auto make_exclusive_task = [&]() -> coro::task { - co_await tp.schedule(); + co_await tp->schedule(); std::cerr << "exclusive task lock()\n"; auto scoped_lock = co_await mutex.lock(); diff --git a/examples/coro_task_container.cpp b/examples/coro_task_container.cpp index fde1831..9085bc1 100644 --- a/examples/coro_task_container.cpp +++ b/examples/coro_task_container.cpp @@ -3,7 +3,8 @@ int main() { - coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto scheduler = std::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_server_task = [&]() -> coro::task { // This is the task that will handle processing a client's requests. @@ -36,7 +37,7 @@ int main() // Spin up the tcp_server and schedule it onto the io_scheduler. coro::net::tcp_server server{scheduler}; - co_await scheduler.schedule(); + co_await scheduler->schedule(); // All incoming connections will be stored into the task container until they are completed. coro::task_container tc{scheduler}; @@ -54,7 +55,7 @@ int main() }; auto make_client_task = [&](size_t request_count) -> coro::task { - co_await scheduler.schedule(); + co_await scheduler->schedule(); coro::net::tcp_client client{scheduler}; co_await client.connect(); diff --git a/inc/coro/event.hpp b/inc/coro/event.hpp index 8c8c343..f02006b 100644 --- a/inc/coro/event.hpp +++ b/inc/coro/event.hpp @@ -7,8 +7,6 @@ namespace coro { -class thread_pool; - /** * Event is a manully triggered thread safe signal that can be co_await()'ed by multiple awaiters. * Each awaiter should co_await the event and upon the event being set each awaiter will have their @@ -87,8 +85,8 @@ public: auto set() noexcept -> void; /** - * Sets this event and resumes all awaiters onto the given thread pool. This will distribute - * the waiters across the thread pools threads. + * Sets this event and resumes all awaiters onto the given executor. This will distribute + * the waiters across the executor's threads. */ template auto set(executor_type& e) noexcept -> void diff --git a/inc/coro/io_scheduler.hpp b/inc/coro/io_scheduler.hpp index 3145747..6e779f5 100644 --- a/inc/coro/io_scheduler.hpp +++ b/inc/coro/io_scheduler.hpp @@ -163,7 +163,8 @@ public: */ auto schedule(coro::task&& task) -> void { - static_cast*>(m_owned_tasks)->start(std::move(task)); + auto* ptr = static_cast*>(m_owned_tasks); + ptr->start(std::move(task)); } /** diff --git a/inc/coro/net/dns_resolver.hpp b/inc/coro/net/dns_resolver.hpp index 942fa7d..fc19f35 100644 --- a/inc/coro/net/dns_resolver.hpp +++ b/inc/coro/net/dns_resolver.hpp @@ -60,7 +60,7 @@ private: class dns_resolver { public: - explicit dns_resolver(io_scheduler& scheduler, std::chrono::milliseconds timeout); + explicit dns_resolver(std::shared_ptr scheduler, std::chrono::milliseconds timeout); dns_resolver(const dns_resolver&) = delete; dns_resolver(dns_resolver&&) = delete; auto operator=(const dns_resolver&) noexcept -> dns_resolver& = delete; @@ -74,7 +74,7 @@ public: private: /// The io scheduler to drive the events for dns lookups. - io_scheduler& m_io_scheduler; + std::shared_ptr m_io_scheduler; /// The global timeout per dns lookup request. std::chrono::milliseconds m_timeout{0}; diff --git a/inc/coro/net/tcp_client.hpp b/inc/coro/net/tcp_client.hpp index b79c99d..e89e891 100644 --- a/inc/coro/net/tcp_client.hpp +++ b/inc/coro/net/tcp_client.hpp @@ -41,8 +41,8 @@ public: * @param opts See tcp_client::options for more information. */ tcp_client( - io_scheduler& scheduler, - options opts = options{ + std::shared_ptr scheduler, + options opts = options{ .address = {net::ip_address::from_string("127.0.0.1")}, .port = 8080, .ssl_ctx = nullptr}); tcp_client(const tcp_client&) = delete; tcp_client(tcp_client&& other); @@ -280,10 +280,10 @@ private: /// The tcp_server creates already connected clients and provides a tcp socket pre-built. friend tcp_server; - tcp_client(io_scheduler& scheduler, net::socket socket, options opts); + tcp_client(std::shared_ptr scheduler, net::socket socket, options opts); /// The scheduler that will drive this tcp client. - io_scheduler* m_io_scheduler{nullptr}; + std::shared_ptr m_io_scheduler{nullptr}; /// Options for what server to connect to. options m_options{}; /// The tcp socket. @@ -293,12 +293,11 @@ private: /// SSL/TLS specific information if m_options.ssl_ctx != nullptr. ssl_info m_ssl_info{}; -private: static auto ssl_shutdown_and_free( - io_scheduler& io_scheduler, - net::socket s, - ssl_unique_ptr ssl_ptr, - std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task; + std::shared_ptr io_scheduler, + net::socket s, + ssl_unique_ptr ssl_ptr, + std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task; }; } // namespace coro::net diff --git a/inc/coro/net/tcp_server.hpp b/inc/coro/net/tcp_server.hpp index 5cf0063..61b4049 100644 --- a/inc/coro/net/tcp_server.hpp +++ b/inc/coro/net/tcp_server.hpp @@ -34,8 +34,8 @@ public: }; tcp_server( - io_scheduler& scheduler, - options opts = options{ + std::shared_ptr scheduler, + options opts = options{ .address = net::ip_address::from_string("0.0.0.0"), .port = 8080, .backlog = 128, .ssl_ctx = nullptr}); tcp_server(const tcp_server&) = delete; @@ -64,7 +64,7 @@ public: private: /// The io scheduler for awaiting new connections. - io_scheduler* m_io_scheduler{nullptr}; + std::shared_ptr m_io_scheduler{nullptr}; /// The bind and listen options for this server. options m_options; /// The socket for accepting new tcp connections on. diff --git a/inc/coro/net/udp_peer.hpp b/inc/coro/net/udp_peer.hpp index c70fa27..9886253 100644 --- a/inc/coro/net/udp_peer.hpp +++ b/inc/coro/net/udp_peer.hpp @@ -36,12 +36,12 @@ public: * Creates a udp peer that can send packets but not receive them. This udp peer will not explicitly * bind to a local ip+port. */ - explicit udp_peer(io_scheduler& scheduler, net::domain_t domain = net::domain_t::ipv4); + explicit udp_peer(std::shared_ptr scheduler, net::domain_t domain = net::domain_t::ipv4); /** * Creates a udp peer that can send and receive packets. This peer will bind to the given ip_port. */ - explicit udp_peer(io_scheduler& scheduler, const info& bind_info); + explicit udp_peer(std::shared_ptr scheduler, const info& bind_info); udp_peer(const udp_peer&) = delete; udp_peer(udp_peer&&) = default; @@ -58,7 +58,7 @@ public: auto poll(poll_op op, std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task { - co_return co_await m_io_scheduler.poll(m_socket, op, timeout); + co_return co_await m_io_scheduler->poll(m_socket, op, timeout); } /** @@ -137,7 +137,7 @@ public: private: /// The scheduler that will drive this udp client. - io_scheduler& m_io_scheduler; + std::shared_ptr m_io_scheduler; /// The udp socket. net::socket m_socket{-1}; /// Did the user request this udp socket is bound locally to receive packets? diff --git a/inc/coro/shared_mutex.hpp b/inc/coro/shared_mutex.hpp index 88f2f0a..2a86f81 100644 --- a/inc/coro/shared_mutex.hpp +++ b/inc/coro/shared_mutex.hpp @@ -75,11 +75,17 @@ class shared_mutex { public: /** - * @param e The thread pool for when multiple shared waiters can be woken up at the same time, - * each shared waiter will be scheduled to immediately run on this thread pool in + * @param e The executor for when multiple shared waiters can be woken up at the same time, + * each shared waiter will be scheduled to immediately run on this executor in * parallel. */ - explicit shared_mutex(executor_type& e) : m_executor(e) {} + explicit shared_mutex(std::shared_ptr e) : m_executor(std::move(e)) + { + if (m_executor == nullptr) + { + throw std::runtime_error{"shared_mutex cannot have a nullptr executor"}; + } + } ~shared_mutex() = default; shared_mutex(const shared_mutex&) = delete; @@ -254,7 +260,7 @@ private: }; /// This executor is for resuming multiple shared waiters. - executor_type& m_executor; + std::shared_ptr m_executor{nullptr}; std::mutex m_mutex; @@ -341,7 +347,7 @@ private: } ++m_shared_users; - m_executor.resume(to_resume->m_awaiting_coroutine); + m_executor->resume(to_resume->m_awaiting_coroutine); } while (m_head_waiter != nullptr && !m_head_waiter->m_exclusive); // Cannot unlock until the entire set of shared waiters has been traversed. I think this diff --git a/inc/coro/task_container.hpp b/inc/coro/task_container.hpp index ffc0102..ae1d255 100644 --- a/inc/coro/task_container.hpp +++ b/inc/coro/task_container.hpp @@ -6,11 +6,14 @@ #include #include #include +#include #include #include namespace coro { +class io_scheduler; + template class task_container { @@ -30,16 +33,18 @@ public: * from a coro::io_scheduler, this would usually be that coro::io_scheduler instance. * @param opts Task container options. */ - task_container(executor_type& e, const options opts = options{.reserve_size = 8, .growth_factor = 2}) + task_container( + std::shared_ptr e, const options opts = options{.reserve_size = 8, .growth_factor = 2}) : m_growth_factor(opts.growth_factor), - m_executor(e) + m_executor(std::move(e)), + m_executor_ptr(m_executor.get()) { - m_tasks.resize(opts.reserve_size); - for (std::size_t i = 0; i < opts.reserve_size; ++i) + if (m_executor == nullptr) { - m_task_indexes.emplace_back(i); + throw std::runtime_error{"task_container cannot have a nullptr executor"}; } - m_free_pos = m_task_indexes.begin(); + + init(opts.reserve_size); } task_container(const task_container&) = delete; task_container(task_container&&) = delete; @@ -157,7 +162,7 @@ public: while (!empty()) { garbage_collect(); - co_await m_executor.yield(); + co_await m_executor_ptr->yield(); } } @@ -221,7 +226,7 @@ private: auto make_cleanup_task(task user_task, task_position pos) -> coro::task { // Immediately move the task onto the executor. - co_await m_executor.schedule(); + co_await m_executor_ptr->schedule(); try { @@ -267,7 +272,31 @@ private: /// The amount to grow the containers by when all spaces are taken. double m_growth_factor{}; /// The executor to schedule tasks that have just started. - executor_type& m_executor; + std::shared_ptr m_executor{nullptr}; + /// This is used internally since io_scheduler cannot pass itself in as a shared_ptr. + executor_type* m_executor_ptr{nullptr}; + + /** + * Special constructor for internal types to create their embeded task containers. + */ + + friend io_scheduler; + task_container(executor_type& e, const options opts = options{.reserve_size = 8, .growth_factor = 2}) + : m_growth_factor(opts.growth_factor), + m_executor_ptr(&e) + { + init(opts.reserve_size); + } + + auto init(std::size_t reserve_size) -> void + { + m_tasks.resize(reserve_size); + for (std::size_t i = 0; i < reserve_size; ++i) + { + m_task_indexes.emplace_back(i); + } + m_free_pos = m_task_indexes.begin(); + } }; } // namespace coro diff --git a/src/net/dns_resolver.cpp b/src/net/dns_resolver.cpp index 6651fe9..0d4ac1f 100644 --- a/src/net/dns_resolver.cpp +++ b/src/net/dns_resolver.cpp @@ -46,11 +46,16 @@ dns_result::dns_result(coro::io_scheduler& scheduler, coro::event& resume, uint6 { } -dns_resolver::dns_resolver(io_scheduler& scheduler, std::chrono::milliseconds timeout) - : m_io_scheduler(scheduler), +dns_resolver::dns_resolver(std::shared_ptr scheduler, std::chrono::milliseconds timeout) + : m_io_scheduler(std::move(scheduler)), m_timeout(timeout), - m_task_container(scheduler) + m_task_container(m_io_scheduler) { + if (m_io_scheduler == nullptr) + { + throw std::runtime_error{"dns_resolver cannot have nullptr scheduler"}; + } + { std::scoped_lock g{m_ares_mutex}; if (m_ares_count == 0) @@ -92,13 +97,11 @@ dns_resolver::~dns_resolver() auto dns_resolver::host_by_name(const net::hostname& hn) -> coro::task> { coro::event resume_event{}; - auto result_ptr = std::make_unique(m_io_scheduler, resume_event, 2); + auto result_ptr = std::make_unique(*m_io_scheduler.get(), resume_event, 2); ares_gethostbyname(m_ares_channel, hn.data().data(), AF_INET, ares_dns_callback, result_ptr.get()); ares_gethostbyname(m_ares_channel, hn.data().data(), AF_INET6, ares_dns_callback, result_ptr.get()); - std::vector> poll_tasks{}; - // Add all required poll calls for ares to kick off the dns requests. ares_poll(); @@ -157,7 +160,7 @@ auto dns_resolver::ares_poll() -> void auto dns_resolver::make_poll_task(fd_t fd, poll_op ops) -> coro::task { - auto result = co_await m_io_scheduler.poll(fd, ops, m_timeout); + auto result = co_await m_io_scheduler->poll(fd, ops, m_timeout); switch (result) { case poll_status::event: diff --git a/src/net/tcp_client.cpp b/src/net/tcp_client.cpp index ec99ce7..1314020 100644 --- a/src/net/tcp_client.cpp +++ b/src/net/tcp_client.cpp @@ -4,27 +4,33 @@ namespace coro::net { using namespace std::chrono_literals; -tcp_client::tcp_client(io_scheduler& scheduler, options opts) - : m_io_scheduler(&scheduler), +tcp_client::tcp_client(std::shared_ptr scheduler, options opts) + : m_io_scheduler(std::move(scheduler)), m_options(std::move(opts)), m_socket(net::make_socket( net::socket::options{m_options.address.domain(), net::socket::type_t::tcp, net::socket::blocking_t::no})) { + if (m_io_scheduler == nullptr) + { + throw std::runtime_error{"tcp_client cannot have nullptr io_scheduler"}; + } } -tcp_client::tcp_client(io_scheduler& scheduler, net::socket socket, options opts) - : m_io_scheduler(&scheduler), +tcp_client::tcp_client(std::shared_ptr scheduler, net::socket socket, options opts) + : m_io_scheduler(std::move(scheduler)), m_options(std::move(opts)), m_socket(std::move(socket)), m_connect_status(connect_status::connected), m_ssl_info(ssl_connection_type::accept) { + // io_scheduler is assumed good since it comes from a tcp_server. + // Force the socket to be non-blocking. m_socket.blocking(coro::net::socket::blocking_t::no); } tcp_client::tcp_client(tcp_client&& other) - : m_io_scheduler(std::exchange(other.m_io_scheduler, nullptr)), + : m_io_scheduler(std::move(other.m_io_scheduler)), m_options(std::move(other.m_options)), m_socket(std::move(other.m_socket)), m_connect_status(std::exchange(other.m_connect_status, std::nullopt)), @@ -41,7 +47,7 @@ tcp_client::~tcp_client() { // Should the shutdown timeout be configurable? m_io_scheduler->schedule(ssl_shutdown_and_free( - *m_io_scheduler, std::move(m_socket), std::move(m_ssl_info.m_ssl_ptr), std::chrono::seconds{30})); + m_io_scheduler, std::move(m_socket), std::move(m_ssl_info.m_ssl_ptr), std::chrono::seconds{30})); } } @@ -49,7 +55,7 @@ auto tcp_client::operator=(tcp_client&& other) noexcept -> tcp_client& { if (std::addressof(other) != this) { - m_io_scheduler = std::exchange(other.m_io_scheduler, nullptr); + m_io_scheduler = std::move(other.m_io_scheduler); m_options = std::move(other.m_options); m_socket = std::move(other.m_socket); m_connect_status = std::exchange(other.m_connect_status, std::nullopt); @@ -202,8 +208,10 @@ auto tcp_client::ssl_handshake(std::chrono::milliseconds timeout) -> coro::task< } auto tcp_client::ssl_shutdown_and_free( - io_scheduler& io_scheduler, net::socket s, ssl_unique_ptr ssl_ptr, std::chrono::milliseconds timeout) - -> coro::task + std::shared_ptr io_scheduler, + net::socket s, + ssl_unique_ptr ssl_ptr, + std::chrono::milliseconds timeout) -> coro::task { while (true) { @@ -229,7 +237,7 @@ auto tcp_client::ssl_shutdown_and_free( co_return; } - auto pstatus = co_await io_scheduler.poll(s, op, timeout); + auto pstatus = co_await io_scheduler->poll(s, op, timeout); switch (pstatus) { case poll_status::timeout: diff --git a/src/net/tcp_server.cpp b/src/net/tcp_server.cpp index e8109b4..44f74ce 100644 --- a/src/net/tcp_server.cpp +++ b/src/net/tcp_server.cpp @@ -4,8 +4,8 @@ namespace coro::net { -tcp_server::tcp_server(io_scheduler& scheduler, options opts) - : m_io_scheduler(&scheduler), +tcp_server::tcp_server(std::shared_ptr scheduler, options opts) + : m_io_scheduler(std::move(scheduler)), m_options(std::move(opts)), m_accept_socket(net::make_accept_socket( net::socket::options{net::domain_t::ipv4, net::socket::type_t::tcp, net::socket::blocking_t::no}, @@ -13,10 +13,14 @@ tcp_server::tcp_server(io_scheduler& scheduler, options opts) m_options.port, m_options.backlog)) { + if (m_io_scheduler == nullptr) + { + throw std::runtime_error{"tcp_server cannot have a nullptr io_scheduler"}; + } } tcp_server::tcp_server(tcp_server&& other) - : m_io_scheduler(std::exchange(other.m_io_scheduler, nullptr)), + : m_io_scheduler(std::move(other.m_io_scheduler)), m_options(std::move(other.m_options)), m_accept_socket(std::move(other.m_accept_socket)) { @@ -26,7 +30,7 @@ auto tcp_server::operator=(tcp_server&& other) -> tcp_server& { if (std::addressof(other) != this) { - m_io_scheduler = std::exchange(other.m_io_scheduler, nullptr); + m_io_scheduler = std::move(other.m_io_scheduler); m_options = std::move(other.m_options); m_accept_socket = std::move(other.m_accept_socket); } @@ -45,7 +49,7 @@ auto tcp_server::accept() -> coro::net::tcp_client }; return tcp_client{ - *m_io_scheduler, + m_io_scheduler, std::move(s), tcp_client::options{ .address = net::ip_address{ip_addr_view, static_cast(client.sin_family)}, diff --git a/src/net/udp_peer.cpp b/src/net/udp_peer.cpp index 02353d4..5a4c423 100644 --- a/src/net/udp_peer.cpp +++ b/src/net/udp_peer.cpp @@ -2,14 +2,14 @@ namespace coro::net { -udp_peer::udp_peer(io_scheduler& scheduler, net::domain_t domain) - : m_io_scheduler(scheduler), +udp_peer::udp_peer(std::shared_ptr scheduler, net::domain_t domain) + : m_io_scheduler(std::move(scheduler)), m_socket(net::make_socket(net::socket::options{domain, net::socket::type_t::udp, net::socket::blocking_t::no})) { } -udp_peer::udp_peer(io_scheduler& scheduler, const info& bind_info) - : m_io_scheduler(scheduler), +udp_peer::udp_peer(std::shared_ptr scheduler, const info& bind_info) + : m_io_scheduler(std::move(scheduler)), m_socket(net::make_accept_socket( net::socket::options{bind_info.address.domain(), net::socket::type_t::udp, net::socket::blocking_t::no}, bind_info.address, diff --git a/test/bench.cpp b/test/bench.cpp index 5517a66..a9ed775 100644 --- a/test/bench.cpp +++ b/test/bench.cpp @@ -359,21 +359,21 @@ TEST_CASE("benchmark tcp_server echo server thread pool", "[benchmark]") struct server { - uint64_t id; - coro::io_scheduler scheduler{coro::io_scheduler::options{ + uint64_t id; + std::shared_ptr scheduler{std::make_shared(coro::io_scheduler::options{ .pool = coro::thread_pool::options{.thread_count = server_thread_count}, - .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool}}; - coro::task_container task_container{scheduler}; - uint64_t live_clients{0}; - coro::event wait_for_clients{}; + .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool})}; + // coro::task_container task_container{scheduler}; + uint64_t live_clients{0}; + coro::event wait_for_clients{}; }; struct client { - coro::io_scheduler scheduler{coro::io_scheduler::options{ + std::shared_ptr scheduler{std::make_shared(coro::io_scheduler::options{ .pool = coro::thread_pool::options{.thread_count = client_thread_count}, - .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool}}; - std::vector> tasks{}; + .execution_strategy = coro::io_scheduler::execution_strategy_t::process_tasks_on_thread_pool})}; + std::vector> tasks{}; }; auto make_on_connection_task = [&](server& s, coro::net::tcp_client client) -> coro::task { @@ -409,7 +409,7 @@ TEST_CASE("benchmark tcp_server echo server thread pool", "[benchmark]") }; auto make_server_task = [&](server& s) -> coro::task { - co_await s.scheduler.schedule(); + co_await s.scheduler->schedule(); coro::net::tcp_server server{s.scheduler}; @@ -426,7 +426,8 @@ TEST_CASE("benchmark tcp_server echo server thread pool", "[benchmark]") accepted.fetch_add(1, std::memory_order::release); s.live_clients++; - s.task_container.start(make_on_connection_task(s, std::move(c))); + s.scheduler->schedule(make_on_connection_task(s, std::move(c))); + // s.task_container.start(make_on_connection_task(s, std::move(c))); } } } @@ -439,7 +440,7 @@ TEST_CASE("benchmark tcp_server echo server thread pool", "[benchmark]") std::map g_histogram; auto make_client_task = [&](client& c) -> coro::task { - co_await c.scheduler.schedule(); + co_await c.scheduler->schedule(); std::map histogram; coro::net::tcp_client client{c.scheduler}; @@ -490,7 +491,7 @@ TEST_CASE("benchmark tcp_server echo server thread pool", "[benchmark]") server s{}; s.id = server_id++; coro::sync_wait(make_server_task(s)); - s.scheduler.shutdown(); + s.scheduler->shutdown(); }}); } @@ -513,7 +514,7 @@ TEST_CASE("benchmark tcp_server echo server thread pool", "[benchmark]") c.tasks.emplace_back(make_client_task(c)); } coro::sync_wait(coro::when_all(std::move(c.tasks))); - c.scheduler.shutdown(); + c.scheduler->shutdown(); }}); } @@ -557,17 +558,19 @@ TEST_CASE("benchmark tcp_server echo server inline", "[benchmark]") struct server { - uint64_t id; - coro::io_scheduler scheduler{coro::io_scheduler::options{.execution_strategy = estrat::process_tasks_inline}}; - coro::task_container task_container{scheduler}; - uint64_t live_clients{0}; - coro::event wait_for_clients{}; + uint64_t id; + std::shared_ptr scheduler{std::make_shared( + coro::io_scheduler::options{.execution_strategy = estrat::process_tasks_inline})}; + // coro::task_container task_container{scheduler}; + uint64_t live_clients{0}; + coro::event wait_for_clients{}; }; struct client { - coro::io_scheduler scheduler{coro::io_scheduler::options{.execution_strategy = estrat::process_tasks_inline}}; - std::vector> tasks{}; + std::shared_ptr scheduler{std::make_shared( + coro::io_scheduler::options{.execution_strategy = estrat::process_tasks_inline})}; + std::vector> tasks{}; }; auto make_on_connection_task = [&](server& s, coro::net::tcp_client client) -> coro::task { @@ -603,7 +606,7 @@ TEST_CASE("benchmark tcp_server echo server inline", "[benchmark]") }; auto make_server_task = [&](server& s) -> coro::task { - co_await s.scheduler.schedule(); + co_await s.scheduler->schedule(); coro::net::tcp_server server{s.scheduler}; @@ -620,7 +623,8 @@ TEST_CASE("benchmark tcp_server echo server inline", "[benchmark]") accepted.fetch_add(1, std::memory_order::release); s.live_clients++; - s.task_container.start(make_on_connection_task(s, std::move(c))); + s.scheduler->schedule(make_on_connection_task(s, std::move(c))); + // s.task_container.start(make_on_connection_task(s, std::move(c))); } } } @@ -633,7 +637,7 @@ TEST_CASE("benchmark tcp_server echo server inline", "[benchmark]") std::map g_histogram; auto make_client_task = [&](client& c) -> coro::task { - co_await c.scheduler.schedule(); + co_await c.scheduler->schedule(); std::map histogram; coro::net::tcp_client client{c.scheduler}; @@ -684,7 +688,7 @@ TEST_CASE("benchmark tcp_server echo server inline", "[benchmark]") server s{}; s.id = server_id++; coro::sync_wait(make_server_task(s)); - s.scheduler.shutdown(); + s.scheduler->shutdown(); }}); } @@ -707,7 +711,7 @@ TEST_CASE("benchmark tcp_server echo server inline", "[benchmark]") c.tasks.emplace_back(make_client_task(c)); } coro::sync_wait(coro::when_all(std::move(c.tasks))); - c.scheduler.shutdown(); + c.scheduler->shutdown(); }}); } diff --git a/test/net/test_dns_resolver.cpp b/test/net/test_dns_resolver.cpp index 600f5c5..8a93c66 100644 --- a/test/net/test_dns_resolver.cpp +++ b/test/net/test_dns_resolver.cpp @@ -6,11 +6,12 @@ TEST_CASE("dns_resolver basic", "[dns]") { - coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto scheduler = std::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); coro::net::dns_resolver dns_resolver{scheduler, std::chrono::milliseconds{5000}}; auto make_host_by_name_task = [&](coro::net::hostname hn) -> coro::task { - co_await scheduler.schedule(); + co_await scheduler->schedule(); auto result_ptr = co_await std::move(dns_resolver.host_by_name(hn)); if (result_ptr->status() == coro::net::dns_status::complete) @@ -26,8 +27,8 @@ TEST_CASE("dns_resolver basic", "[dns]") coro::sync_wait(make_host_by_name_task(coro::net::hostname{"www.example.com"})); - std::cerr << "io_scheduler.size() before shutdown = " << scheduler.size() << "\n"; - scheduler.shutdown(); - std::cerr << "io_scheduler.size() after shutdown = " << scheduler.size() << "\n"; - REQUIRE(scheduler.empty()); + std::cerr << "io_scheduler.size() before shutdown = " << scheduler->size() << "\n"; + scheduler->shutdown(); + std::cerr << "io_scheduler.size() after shutdown = " << scheduler->size() << "\n"; + REQUIRE(scheduler->empty()); } \ No newline at end of file diff --git a/test/net/test_tcp_server.cpp b/test/net/test_tcp_server.cpp index 8b1a7b3..f8b40fc 100644 --- a/test/net/test_tcp_server.cpp +++ b/test/net/test_tcp_server.cpp @@ -9,10 +9,11 @@ TEST_CASE("tcp_server ping server", "[tcp_server]") const std::string client_msg{"Hello from client"}; const std::string server_msg{"Reply from server!"}; - coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto scheduler = std::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_client_task = [&]() -> coro::task { - co_await scheduler.schedule(); + co_await scheduler->schedule(); coro::net::tcp_client client{scheduler}; std::cerr << "client connect\n"; @@ -44,7 +45,7 @@ TEST_CASE("tcp_server ping server", "[tcp_server]") }; auto make_server_task = [&]() -> coro::task { - co_await scheduler.schedule(); + co_await scheduler->schedule(); coro::net::tcp_server server{scheduler}; // Poll for client connection. @@ -83,7 +84,8 @@ TEST_CASE("tcp_server ping server", "[tcp_server]") TEST_CASE("tcp_server with ssl", "[tcp_server]") { - coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto scheduler = std::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); coro::net::ssl_context client_ssl_context{}; @@ -94,7 +96,7 @@ TEST_CASE("tcp_server with ssl", "[tcp_server]") std::string server_msg = "Hello world from SSL server!!"; auto make_client_task = [&]() -> coro::task { - co_await scheduler.schedule(); + co_await scheduler->schedule(); coro::net::tcp_client client{scheduler, coro::net::tcp_client::options{.ssl_ctx = &client_ssl_context}}; @@ -150,7 +152,7 @@ TEST_CASE("tcp_server with ssl", "[tcp_server]") }; auto make_server_task = [&]() -> coro::task { - co_await scheduler.schedule(); + co_await scheduler->schedule(); coro::net::tcp_server server{scheduler, coro::net::tcp_server::options{.ssl_ctx = &server_ssl_context}}; diff --git a/test/net/test_udp_peers.cpp b/test/net/test_udp_peers.cpp index 62f0014..79c41c1 100644 --- a/test/net/test_udp_peers.cpp +++ b/test/net/test_udp_peers.cpp @@ -6,10 +6,11 @@ TEST_CASE("udp one way") { const std::string msg{"aaaaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbbbbcccccccccccccccccc"}; - coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto scheduler = std::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_send_task = [&]() -> coro::task { - co_await scheduler.schedule(); + co_await scheduler->schedule(); coro::net::udp_peer peer{scheduler}; coro::net::udp_peer::info peer_info{}; @@ -21,7 +22,7 @@ TEST_CASE("udp one way") }; auto make_recv_task = [&]() -> coro::task { - co_await scheduler.schedule(); + co_await scheduler->schedule(); coro::net::udp_peer::info self_info{.address = coro::net::ip_address::from_string("0.0.0.0")}; coro::net::udp_peer self{scheduler, self_info}; @@ -49,7 +50,8 @@ TEST_CASE("udp echo peers") const std::string peer1_msg{"Hello from peer1!"}; const std::string peer2_msg{"Hello from peer2!!"}; - coro::io_scheduler scheduler{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}}; + auto scheduler = std::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 1}}); auto make_peer_task = [&scheduler]( uint16_t my_port, @@ -57,7 +59,7 @@ TEST_CASE("udp echo peers") bool send_first, const std::string my_msg, const std::string peer_msg) -> coro::task { - co_await scheduler.schedule(); + co_await scheduler->schedule(); coro::net::udp_peer::info my_info{.address = coro::net::ip_address::from_string("0.0.0.0"), .port = my_port}; coro::net::udp_peer::info peer_info{ .address = coro::net::ip_address::from_string("127.0.0.1"), .port = peer_port}; diff --git a/test/test_shared_mutex.cpp b/test/test_shared_mutex.cpp index e087eca..7dbbd73 100644 --- a/test/test_shared_mutex.cpp +++ b/test/test_shared_mutex.cpp @@ -7,7 +7,7 @@ TEST_CASE("mutex single waiter not locked exclusive", "[shared_mutex]") { - coro::thread_pool tp{coro::thread_pool::options{.thread_count = 1}}; + auto tp = std::make_shared(coro::thread_pool::options{.thread_count = 1}); std::vector output; coro::shared_mutex m{tp}; @@ -41,7 +41,7 @@ TEST_CASE("mutex single waiter not locked exclusive", "[shared_mutex]") TEST_CASE("mutex single waiter not locked shared", "[shared_mutex]") { - coro::thread_pool tp{coro::thread_pool::options{.thread_count = 1}}; + auto tp = std::make_shared(coro::thread_pool::options{.thread_count = 1}); std::vector values{1, 2, 3}; coro::shared_mutex m{tp}; @@ -80,13 +80,14 @@ TEST_CASE("mutex single waiter not locked shared", "[shared_mutex]") TEST_CASE("mutex many shared and exclusive waiters interleaved", "[shared_mutex]") { - coro::io_scheduler tp{coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 8}}}; + auto tp = std::make_shared( + coro::io_scheduler::options{.pool = coro::thread_pool::options{.thread_count = 8}}); coro::shared_mutex m{tp}; std::atomic read_value{false}; auto make_shared_task = [&]() -> coro::task { - co_await tp.schedule(); + co_await tp->schedule(); std::cerr << "make_shared_task shared lock acquiring\n"; auto scoped_lock = co_await m.lock_shared(); std::cerr << "make_shared_task shared lock acquired\n"; @@ -97,14 +98,14 @@ TEST_CASE("mutex many shared and exclusive waiters interleaved", "[shared_mutex] auto make_exclusive_task = [&]() -> coro::task { // Let some readers get through. - co_await tp.yield_for(std::chrono::milliseconds{50}); + co_await tp->yield_for(std::chrono::milliseconds{50}); { std::cerr << "make_shared_task exclusive lock acquiring\n"; auto scoped_lock = co_await m.lock(); std::cerr << "make_shared_task exclusive lock acquired\n"; // Stack readers on the mutex - co_await tp.yield_for(std::chrono::milliseconds{50}); + co_await tp->yield_for(std::chrono::milliseconds{50}); read_value.exchange(true, std::memory_order::release); std::cerr << "make_shared_task exclusive lock releasing\n"; } @@ -113,7 +114,7 @@ TEST_CASE("mutex many shared and exclusive waiters interleaved", "[shared_mutex] }; auto make_shared_tasks_task = [&]() -> coro::task { - co_await tp.schedule(); + co_await tp->schedule(); std::vector> shared_tasks{}; @@ -123,7 +124,7 @@ TEST_CASE("mutex many shared and exclusive waiters interleaved", "[shared_mutex] shared_tasks.emplace_back(make_shared_task()); shared_tasks.back().resume(); - co_await tp.yield_for(std::chrono::milliseconds{1}); + co_await tp->yield_for(std::chrono::milliseconds{1}); for (const auto& st : shared_tasks) {