1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

Rename scheduler to io_scheduler (#16)

Closes #15
This commit is contained in:
Josh Baldwin 2020-11-01 12:08:09 -07:00 committed by GitHub
parent 76b41a6ca0
commit ddd3c76c53
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 88 additions and 91 deletions

View file

@ -14,9 +14,9 @@ set(LIBCORO_SOURCE_FILES
inc/coro/coro.hpp
inc/coro/event.hpp src/event.cpp
inc/coro/generator.hpp
inc/coro/io_scheduler.hpp
inc/coro/latch.hpp
inc/coro/promise.hpp
inc/coro/scheduler.hpp
inc/coro/shutdown.hpp
inc/coro/sync_wait.hpp src/sync_wait.cpp
inc/coro/task.hpp

View file

@ -3,9 +3,9 @@
#include "coro/awaitable.hpp"
#include "coro/event.hpp"
#include "coro/generator.hpp"
#include "coro/io_scheduler.hpp"
#include "coro/latch.hpp"
#include "coro/promise.hpp"
#include "coro/scheduler.hpp"
#include "coro/sync_wait.hpp"
#include "coro/task.hpp"
#include "coro/thread_pool.hpp"

View file

@ -28,14 +28,14 @@
namespace coro
{
class scheduler;
class io_scheduler;
namespace detail
{
class resume_token_base
{
public:
resume_token_base(scheduler* eng) noexcept : m_scheduler(eng), m_state(nullptr) {}
resume_token_base(io_scheduler* s) noexcept : m_scheduler(s), m_state(nullptr) {}
virtual ~resume_token_base() = default;
@ -113,7 +113,7 @@ public:
protected:
friend struct awaiter;
scheduler* m_scheduler{nullptr};
io_scheduler* m_scheduler{nullptr};
mutable std::atomic<void*> m_state;
};
@ -122,9 +122,9 @@ protected:
template<typename return_type>
class resume_token final : public detail::resume_token_base
{
friend scheduler;
friend io_scheduler;
resume_token() : detail::resume_token_base(nullptr) {}
resume_token(scheduler& s) : detail::resume_token_base(&s) {}
resume_token(io_scheduler& s) : detail::resume_token_base(&s) {}
public:
~resume_token() override = default;
@ -147,9 +147,9 @@ private:
template<>
class resume_token<void> final : public detail::resume_token_base
{
friend scheduler;
friend io_scheduler;
resume_token() : detail::resume_token_base(nullptr) {}
resume_token(scheduler& s) : detail::resume_token_base(&s) {}
resume_token(io_scheduler& s) : detail::resume_token_base(&s) {}
public:
~resume_token() override = default;
@ -172,7 +172,7 @@ enum class poll_op
read_write = EPOLLIN | EPOLLOUT
};
class scheduler
class io_scheduler
{
private:
using task_variant = std::variant<coro::task<void>, std::coroutine_handle<>>;
@ -352,7 +352,7 @@ public:
/**
* @param options Various scheduler options to tune how it behaves.
*/
scheduler(const options opts = options{8, 2, thread_strategy_t::spawn})
io_scheduler(const options opts = options{8, 2, thread_strategy_t::spawn})
: m_epoll_fd(epoll_create1(EPOLL_CLOEXEC)),
m_accept_fd(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)),
m_thread_strategy(opts.thread_strategy),
@ -377,12 +377,12 @@ public:
// else manual mode, the user must call process_events.
}
scheduler(const scheduler&) = delete;
scheduler(scheduler&&) = delete;
auto operator=(const scheduler&) -> scheduler& = delete;
auto operator=(scheduler &&) -> scheduler& = delete;
io_scheduler(const io_scheduler&) = delete;
io_scheduler(io_scheduler&&) = delete;
auto operator=(const io_scheduler&) -> io_scheduler& = delete;
auto operator=(io_scheduler &&) -> io_scheduler& = delete;
~scheduler()
~io_scheduler()
{
shutdown();
if (m_epoll_fd != -1)
@ -458,9 +458,7 @@ public:
auto poll(fd_t fd, poll_op op) -> coro::task<void>
{
co_await unsafe_yield<void>([&](resume_token<void>& token) {
struct epoll_event e
{
};
epoll_event e{};
e.events = static_cast<uint32_t>(op) | EPOLLONESHOT | EPOLLET;
e.data.ptr = &token;
epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, fd, &e);
@ -494,7 +492,6 @@ public:
{
co_await poll(fd, poll_op::write);
co_return ::write(fd, buffer.data(), buffer.size());
;
}
/**

View file

@ -5,8 +5,8 @@ set(LIBCORO_TEST_SOURCE_FILES
bench.cpp
test_event.cpp
test_generator.cpp
test_io_scheduler.cpp
test_latch.cpp
test_scheduler.cpp
test_sync_wait.cpp
test_task.cpp
test_thread_pool.cpp

View file

@ -146,11 +146,11 @@ TEST_CASE("benchmark thread_pool{2} counter task")
REQUIRE(tp.empty());
}
TEST_CASE("benchmark counter task scheduler")
TEST_CASE("benchmark counter task io_scheduler")
{
constexpr std::size_t iterations = default_iterations;
coro::scheduler s1{};
coro::io_scheduler s1{};
std::atomic<uint64_t> counter{0};
auto func = [&]() -> coro::task<void> {
counter.fetch_add(1, std::memory_order::relaxed);
@ -165,17 +165,17 @@ TEST_CASE("benchmark counter task scheduler")
}
s1.shutdown();
print_stats("benchmark counter task through scheduler", iterations, start, sc::now());
print_stats("benchmark counter task through io_scheduler", iterations, start, sc::now());
REQUIRE(s1.empty());
REQUIRE(counter == iterations);
}
TEST_CASE("benchmark counter task scheduler yield -> resume from main")
TEST_CASE("benchmark counter task io_scheduler yield -> resume from main")
{
constexpr std::size_t iterations = default_iterations;
constexpr std::size_t ops = iterations * 2; // the external resume is still a resume op
coro::scheduler s{};
coro::io_scheduler s{};
std::vector<coro::resume_token<void>> tokens{};
for (std::size_t i = 0; i < iterations; ++i)
{
@ -205,17 +205,17 @@ TEST_CASE("benchmark counter task scheduler yield -> resume from main")
s.shutdown();
auto stop = sc::now();
print_stats("benchmark counter task scheduler yield -> resume from main", ops, start, stop);
print_stats("benchmark counter task io_scheduler yield -> resume from main", ops, start, stop);
REQUIRE(s.empty());
REQUIRE(counter == iterations);
}
TEST_CASE("benchmark counter task scheduler yield -> resume from coroutine")
TEST_CASE("benchmark counter task io_scheduler yield -> resume from coroutine")
{
constexpr std::size_t iterations = default_iterations;
constexpr std::size_t ops = iterations * 2; // each iteration executes 2 coroutines.
coro::scheduler s{};
coro::io_scheduler s{};
std::vector<coro::resume_token<void>> tokens{};
for (std::size_t i = 0; i < iterations; ++i)
{
@ -246,17 +246,17 @@ TEST_CASE("benchmark counter task scheduler yield -> resume from coroutine")
s.shutdown();
auto stop = sc::now();
print_stats("benchmark counter task scheduler yield -> resume from coroutine", ops, start, stop);
print_stats("benchmark counter task io_scheduler yield -> resume from coroutine", ops, start, stop);
REQUIRE(s.empty());
REQUIRE(counter == iterations);
}
TEST_CASE("benchmark counter task scheduler resume from coroutine -> yield")
TEST_CASE("benchmark counter task io_scheduler resume from coroutine -> yield")
{
constexpr std::size_t iterations = default_iterations;
constexpr std::size_t ops = iterations * 2; // each iteration executes 2 coroutines.
coro::scheduler s{};
coro::io_scheduler s{};
std::vector<coro::resume_token<void>> tokens{};
for (std::size_t i = 0; i < iterations; ++i)
{
@ -287,17 +287,17 @@ TEST_CASE("benchmark counter task scheduler resume from coroutine -> yield")
s.shutdown();
auto stop = sc::now();
print_stats("benchmark counter task scheduler resume from coroutine -> yield", ops, start, stop);
print_stats("benchmark counter task io_scheduler resume from coroutine -> yield", ops, start, stop);
REQUIRE(s.empty());
REQUIRE(counter == iterations);
}
TEST_CASE("benchmark counter task scheduler yield (all) -> resume (all) from coroutine with reserve")
TEST_CASE("benchmark counter task io_scheduler yield (all) -> resume (all) from coroutine with reserve")
{
constexpr std::size_t iterations = default_iterations;
constexpr std::size_t ops = iterations * 2; // each iteration executes 2 coroutines.
coro::scheduler s{coro::scheduler::options{.reserve_size = iterations}};
coro::io_scheduler s{coro::io_scheduler::options{.reserve_size = iterations}};
std::vector<coro::resume_token<void>> tokens{};
for (std::size_t i = 0; i < iterations; ++i)
{
@ -332,7 +332,7 @@ TEST_CASE("benchmark counter task scheduler yield (all) -> resume (all) from cor
s.shutdown();
auto stop = sc::now();
print_stats("benchmark counter task scheduler yield -> resume from coroutine with reserve", ops, start, stop);
print_stats("benchmark counter task io_scheduler yield -> resume from coroutine with reserve", ops, start, stop);
REQUIRE(s.empty());
REQUIRE(counter == iterations);
}

View file

@ -10,9 +10,9 @@
using namespace std::chrono_literals;
TEST_CASE("scheduler sizeof()")
TEST_CASE("io_scheduler sizeof()")
{
std::cerr << "sizeof(coro::scheduler)=[" << sizeof(coro::scheduler) << "]\n";
std::cerr << "sizeof(coro::io_scheduler)=[" << sizeof(coro::io_scheduler) << "]\n";
std::cerr << "sizeof(coro:task<void>)=[" << sizeof(coro::task<void>) << "]\n";
std::cerr << "sizeof(std::coroutine_handle<>)=[" << sizeof(std::coroutine_handle<>) << "]\n";
@ -22,10 +22,10 @@ TEST_CASE("scheduler sizeof()")
REQUIRE(true);
}
TEST_CASE("scheduler submit single task")
TEST_CASE("io_scheduler submit single task")
{
std::atomic<uint64_t> counter{0};
coro::scheduler s{};
coro::io_scheduler s{};
// Note that captures are only safe as long as the lambda object outlives the execution
// of the coroutine. In all of these tests the lambda is created on the root test function
@ -46,17 +46,17 @@ TEST_CASE("scheduler submit single task")
REQUIRE(counter == 1);
}
TEST_CASE("scheduler submit single task with move and auto initializing lambda")
TEST_CASE("io_scheduler submit single task with move and auto initializing lambda")
{
// This example test will auto invoke the lambda object, return the task and then destruct.
// Because the lambda immediately goes out of scope the task must capture all variables
// through its parameters directly.
std::atomic<uint64_t> counter{0};
coro::scheduler s{};
coro::io_scheduler s{};
auto task = [](std::atomic<uint64_t>& counter) -> coro::task<void> {
std::cerr << "Hello world from scheduler task!\n";
std::cerr << "Hello world from io_scheduler task!\n";
counter++;
co_return;
}(counter);
@ -68,11 +68,11 @@ TEST_CASE("scheduler submit single task with move and auto initializing lambda")
REQUIRE(counter == 1);
}
TEST_CASE("scheduler submit mutiple tasks")
TEST_CASE("io_scheduler submit mutiple tasks")
{
constexpr std::size_t n = 1000;
std::atomic<uint64_t> counter{0};
coro::scheduler s{};
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
counter++;
@ -87,10 +87,10 @@ TEST_CASE("scheduler submit mutiple tasks")
REQUIRE(counter == n);
}
TEST_CASE("scheduler task with multiple yields on event")
TEST_CASE("io_scheduler task with multiple yields on event")
{
std::atomic<uint64_t> counter{0};
coro::scheduler s{};
coro::io_scheduler s{};
auto token = s.generate_resume_token<uint64_t>();
// coro::resume_token<uint64_t> token{s};
@ -134,10 +134,10 @@ TEST_CASE("scheduler task with multiple yields on event")
REQUIRE(s.empty());
}
TEST_CASE("scheduler task with read poll")
TEST_CASE("io_scheduler task with read poll")
{
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::scheduler s{};
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
// Poll will block until there is data to read.
@ -155,11 +155,11 @@ TEST_CASE("scheduler task with read poll")
close(trigger_fd);
}
TEST_CASE("scheduler task with read")
TEST_CASE("io_scheduler task with read")
{
constexpr uint64_t expected_value{42};
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::scheduler s{};
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
uint64_t val{0};
@ -178,7 +178,7 @@ TEST_CASE("scheduler task with read")
close(trigger_fd);
}
TEST_CASE("scheduler task with read and write same fd")
TEST_CASE("io_scheduler task with read and write same fd")
{
// Since this test uses an eventfd, only 1 task at a time can poll/read/write to that
// event descriptor through the scheduler. It could be possible to modify the scheduler
@ -188,7 +188,7 @@ TEST_CASE("scheduler task with read and write same fd")
constexpr uint64_t expected_value{9001};
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::scheduler s{};
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
auto bytes_written = co_await s.write(
@ -210,13 +210,13 @@ TEST_CASE("scheduler task with read and write same fd")
close(trigger_fd);
}
TEST_CASE("scheduler task with read and write pipe")
TEST_CASE("io_scheduler task with read and write pipe")
{
const std::string msg{"coroutines are really cool but not that EASY!"};
int pipe_fd[2];
pipe2(pipe_fd, O_NONBLOCK);
coro::scheduler s{};
coro::io_scheduler s{};
auto read_func = [&]() -> coro::task<void> {
std::string buffer(4096, '0');
@ -241,7 +241,7 @@ TEST_CASE("scheduler task with read and write pipe")
close(pipe_fd[1]);
}
static auto standalone_read(coro::scheduler& s, coro::scheduler::fd_t socket, std::span<char> buffer)
static auto standalone_read(coro::io_scheduler& s, coro::io_scheduler::fd_t socket, std::span<char> buffer)
-> coro::task<ssize_t>
{
// do other stuff in larger function
@ -249,11 +249,11 @@ static auto standalone_read(coro::scheduler& s, coro::scheduler::fd_t socket, st
// do more stuff in larger function
}
TEST_CASE("scheduler standalone read task")
TEST_CASE("io_scheduler standalone read task")
{
constexpr ssize_t expected_value{1111};
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::scheduler s{};
constexpr ssize_t expected_value{1111};
auto trigger_fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
ssize_t v{0};
@ -273,9 +273,9 @@ TEST_CASE("scheduler standalone read task")
close(trigger_fd);
}
TEST_CASE("scheduler separate thread resume")
TEST_CASE("io_scheduler separate thread resume")
{
coro::scheduler s{};
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
// User manual resume token, create one specifically for each task being generated
@ -302,10 +302,10 @@ TEST_CASE("scheduler separate thread resume")
s.shutdown();
}
TEST_CASE("scheduler separate thread resume with return")
TEST_CASE("io_scheduler separate thread resume with return")
{
constexpr uint64_t expected_value{1337};
coro::scheduler s{};
coro::io_scheduler s{};
std::atomic<coro::resume_token<uint64_t>*> token{};
@ -335,11 +335,11 @@ TEST_CASE("scheduler separate thread resume with return")
s.shutdown();
}
TEST_CASE("scheduler with basic task")
TEST_CASE("io_scheduler with basic task")
{
constexpr std::size_t expected_value{5};
std::atomic<uint64_t> counter{0};
coro::scheduler s{};
coro::io_scheduler s{};
auto add_data = [&](uint64_t val) -> coro::task<int> { co_return val; };
@ -354,11 +354,11 @@ TEST_CASE("scheduler with basic task")
REQUIRE(counter == expected_value);
}
TEST_CASE("schedule yield for")
TEST_CASE("io_scheduler yield for")
{
constexpr std::chrono::milliseconds wait_for{50};
std::atomic<uint64_t> counter{0};
coro::scheduler s{};
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
++counter;
@ -375,11 +375,11 @@ TEST_CASE("schedule yield for")
REQUIRE(duration >= wait_for);
}
TEST_CASE("scheduler trigger growth of internal tasks storage")
TEST_CASE("io_scheduler trigger growth of internal tasks storage")
{
std::atomic<uint64_t> counter{0};
constexpr std::size_t iterations{512};
coro::scheduler s{coro::scheduler::options{.reserve_size = 1}};
coro::io_scheduler s{coro::io_scheduler::options{.reserve_size = 1}};
auto wait_func = [&](std::chrono::milliseconds wait_time) -> coro::task<void> {
co_await s.yield_for(wait_time);
@ -397,10 +397,10 @@ TEST_CASE("scheduler trigger growth of internal tasks storage")
REQUIRE(counter == iterations);
}
TEST_CASE("scheduler yield with scheduler event void")
TEST_CASE("io_scheduler yield with scheduler event void")
{
std::atomic<uint64_t> counter{0};
coro::scheduler s{};
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
co_await s.yield<void>([&](coro::resume_token<void>& token) -> void { token.resume(); });
@ -416,10 +416,10 @@ TEST_CASE("scheduler yield with scheduler event void")
REQUIRE(counter == 42);
}
TEST_CASE("scheduler yield with scheduler event uint64_t")
TEST_CASE("io_scheduler yield with scheduler event uint64_t")
{
std::atomic<uint64_t> counter{0};
coro::scheduler s{};
coro::io_scheduler s{};
auto func = [&]() -> coro::task<void> {
counter += co_await s.yield<uint64_t>([&](coro::resume_token<uint64_t>& token) -> void { token.resume(42); });
@ -434,11 +434,11 @@ TEST_CASE("scheduler yield with scheduler event uint64_t")
REQUIRE(counter == 42);
}
TEST_CASE("scheduler yield user event")
TEST_CASE("io_scheduler yield user event")
{
std::string expected_result = "Here I am!";
coro::scheduler s{};
auto token = s.generate_resume_token<std::string>();
std::string expected_result = "Here I am!";
coro::io_scheduler s{};
auto token = s.generate_resume_token<std::string>();
// coro::resume_token<std::string> token{s};
auto func = [&]() -> coro::task<void> {
@ -454,11 +454,11 @@ TEST_CASE("scheduler yield user event")
s.shutdown();
}
TEST_CASE("scheduler yield user event multiple waiters")
TEST_CASE("io_scheduler yield user event multiple waiters")
{
std::atomic<int> counter{0};
coro::scheduler s{};
auto token = s.generate_resume_token<void>();
std::atomic<int> counter{0};
coro::io_scheduler s{};
auto token = s.generate_resume_token<void>();
auto func = [&](int amount) -> coro::task<void> {
co_await token;
@ -484,10 +484,10 @@ TEST_CASE("scheduler yield user event multiple waiters")
REQUIRE(counter == 25);
}
TEST_CASE("scheduler manual process events with self generating coroutine (stack overflow check)")
TEST_CASE("io_scheduler manual process events with self generating coroutine (stack overflow check)")
{
uint64_t counter{0};
coro::scheduler s{coro::scheduler::options{.thread_strategy = coro::scheduler::thread_strategy_t::manual}};
uint64_t counter{0};
coro::io_scheduler s{coro::io_scheduler::options{.thread_strategy = coro::io_scheduler::thread_strategy_t::manual}};
auto func = [&](auto f) -> coro::task<void> {
++counter;
@ -510,9 +510,9 @@ TEST_CASE("scheduler manual process events with self generating coroutine (stack
std::cerr << "Recursive test done.\n";
}
TEST_CASE("scheduler task throws")
TEST_CASE("io_scheduler task throws")
{
coro::scheduler s{};
coro::io_scheduler s{};
auto func = []() -> coro::task<void> {
// Is it possible to actually notify the user when running a task in a scheduler?
@ -527,10 +527,10 @@ TEST_CASE("scheduler task throws")
REQUIRE(s.empty());
}
TEST_CASE("scheduler task throws after resume")
TEST_CASE("io_scheduler task throws after resume")
{
coro::scheduler s{};
auto token = s.generate_resume_token<void>();
coro::io_scheduler s{};
auto token = s.generate_resume_token<void>();
auto func = [&]() -> coro::task<void> {
co_await token;