1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

Enable -Wall and -Wextra

Renamed some scheduler internals, made scheduler
bench tests use std::memory_order_relaxed for counters.
This commit is contained in:
jbaldwin 2020-09-30 22:57:54 -06:00
parent cbd1046161
commit 6c593cafad
4 changed files with 91 additions and 69 deletions

View file

@ -26,7 +26,7 @@ if(${CMAKE_CXX_COMPILER_ID} MATCHES "GNU")
message(FATAL_ERROR "gcc version ${CMAKE_CXX_COMPILER_VERSION} is unsupported, please upgrade to at least 10.2.0") message(FATAL_ERROR "gcc version ${CMAKE_CXX_COMPILER_VERSION} is unsupported, please upgrade to at least 10.2.0")
endif() endif()
target_compile_options(${PROJECT_NAME} PUBLIC -fcoroutines) target_compile_options(${PROJECT_NAME} PUBLIC -fcoroutines -Wall -Wextra -pipe)
elseif(${CMAKE_CXX_COMPILER_ID} MATCHES "Clang") elseif(${CMAKE_CXX_COMPILER_ID} MATCHES "Clang")
message(FATAL_ERROR "Clang is currently not supported.") message(FATAL_ERROR "Clang is currently not supported.")
endif() endif()

View file

@ -68,7 +68,7 @@ public:
bool is_set() const noexcept bool is_set() const noexcept
{ {
return m_state.load(std::memory_order_acquire) == this; return m_state.load(std::memory_order::acquire) == this;
} }
struct awaiter struct awaiter
@ -91,7 +91,7 @@ public:
m_awaiting_coroutine = awaiting_coroutine; m_awaiting_coroutine = awaiting_coroutine;
// This value will update if other threads write to it via acquire. // This value will update if other threads write to it via acquire.
void* old_value = m_token.m_state.load(std::memory_order_acquire); void* old_value = m_token.m_state.load(std::memory_order::acquire);
do do
{ {
// Resume immediately if already in the set state. // Resume immediately if already in the set state.
@ -104,8 +104,8 @@ public:
} while(!m_token.m_state.compare_exchange_weak( } while(!m_token.m_state.compare_exchange_weak(
old_value, old_value,
this, this,
std::memory_order_release, std::memory_order::release,
std::memory_order_acquire)); std::memory_order::acquire));
return true; return true;
} }
@ -128,7 +128,7 @@ public:
auto reset() noexcept -> void auto reset() noexcept -> void
{ {
void* old_value = this; void* old_value = this;
m_state.compare_exchange_strong(old_value, nullptr, std::memory_order_acquire); m_state.compare_exchange_strong(old_value, nullptr, std::memory_order::acquire);
} }
protected: protected:
@ -413,11 +413,11 @@ public:
if(m_thread_strategy == thread_strategy_t::spawn) if(m_thread_strategy == thread_strategy_t::spawn)
{ {
m_scheduler_thread = std::thread([this] { this->run(); }); m_scheduler_thread = std::thread([this] { process_events_dedicated_thread(); });
} }
else if(m_thread_strategy == thread_strategy_t::adopt) else if(m_thread_strategy == thread_strategy_t::adopt)
{ {
run(); process_events_dedicated_thread();
} }
// else manual mode, the user must call process_events. // else manual mode, the user must call process_events.
} }
@ -450,7 +450,7 @@ public:
*/ */
auto schedule(coro::task<void> task) -> bool auto schedule(coro::task<void> task) -> bool
{ {
if(m_shutdown_requested.load(std::memory_order_relaxed)) if(m_shutdown_requested.load(std::memory_order::relaxed))
{ {
return false; return false;
} }
@ -461,15 +461,20 @@ public:
// queue and processing through the normal pipeline. This simplifies the code and also makes // queue and processing through the normal pipeline. This simplifies the code and also makes
// the order in which newly submitted tasks are more fair in regards to FIFO. // the order in which newly submitted tasks are more fair in regards to FIFO.
m_size.fetch_add(1, std::memory_order_relaxed); m_size.fetch_add(1, std::memory_order::relaxed);
{ {
std::lock_guard<std::mutex> lk{m_accept_mutex}; std::lock_guard<std::mutex> lk{m_accept_mutex};
m_accept_queue.emplace_back(std::move(task)); m_accept_queue.emplace_back(std::move(task));
} }
// Send an event if one isn't already set. // Send an event if one isn't already set. We use strong here to avoid spurious failures
// but if it fails due to it actually being set we don't want to retry.
bool expected{false}; bool expected{false};
if(m_event_set.compare_exchange_strong(expected, true, std::memory_order_release, std::memory_order_relaxed)) if(m_event_set.compare_exchange_strong(
expected,
true,
std::memory_order::release,
std::memory_order::relaxed))
{ {
uint64_t value{1}; uint64_t value{1};
::write(m_accept_fd, &value, sizeof(value)); ::write(m_accept_fd, &value, sizeof(value));
@ -486,7 +491,7 @@ public:
*/ */
auto schedule_after(coro::task<void> task, std::chrono::milliseconds after) -> bool auto schedule_after(coro::task<void> task, std::chrono::milliseconds after) -> bool
{ {
if(m_shutdown_requested.load(std::memory_order_relaxed)) if(m_shutdown_requested.load(std::memory_order::relaxed))
{ {
return false; return false;
} }
@ -638,14 +643,14 @@ public:
auto process_events(std::chrono::milliseconds timeout = std::chrono::milliseconds{1000}) -> std::size_t auto process_events(std::chrono::milliseconds timeout = std::chrono::milliseconds{1000}) -> std::size_t
{ {
process_events_internal_set_thread(timeout); process_events_external_thread(timeout);
return m_size.load(std::memory_order_relaxed); return m_size.load(std::memory_order::relaxed);
} }
/** /**
* @return The number of active tasks still executing and unprocessed submitted tasks. * @return The number of active tasks still executing and unprocessed submitted tasks.
*/ */
auto size() const -> std::size_t { return m_size.load(std::memory_order_relaxed); } auto size() const -> std::size_t { return m_size.load(std::memory_order::relaxed); }
/** /**
* @return True if there are no tasks executing or waiting to be executed in this scheduler. * @return True if there are no tasks executing or waiting to be executed in this scheduler.
@ -661,12 +666,12 @@ public:
* Is there a thread processing this schedulers events? * Is there a thread processing this schedulers events?
* If this is in thread strategy spawn or adopt this will always be true until shutdown. * If this is in thread strategy spawn or adopt this will always be true until shutdown.
*/ */
auto is_running() const noexcept -> bool { return m_running.load(std::memory_order_relaxed); } auto is_running() const noexcept -> bool { return m_running.load(std::memory_order::relaxed); }
/** /**
* @return True if this scheduler has been requested to shutdown. * @return True if this scheduler has been requested to shutdown.
*/ */
auto is_shutdown() const noexcept -> bool { return m_shutdown_requested.load(std::memory_order_relaxed); } auto is_shutdown() const noexcept -> bool { return m_shutdown_requested.load(std::memory_order::relaxed); }
/** /**
* Requests the scheduler to finish processing all of its current tasks and shutdown. * Requests the scheduler to finish processing all of its current tasks and shutdown.
@ -678,7 +683,7 @@ public:
*/ */
auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) -> void auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) -> void
{ {
if(!m_shutdown_requested.exchange(true, std::memory_order_release)) if(!m_shutdown_requested.exchange(true, std::memory_order::release))
{ {
// Signal the event loop to stop asap. // Signal the event loop to stop asap.
uint64_t value{1}; uint64_t value{1};
@ -717,7 +722,7 @@ private:
std::atomic<std::size_t> m_size{0}; std::atomic<std::size_t> m_size{0};
/// The maximum number of tasks to process inline before polling for more tasks. /// The maximum number of tasks to process inline before polling for more tasks.
static constexpr const std::size_t task_inline_process_amount{128}; static constexpr const std::size_t task_inline_process_amount{64};
/// Pre-allocated memory area for tasks to process. /// Pre-allocated memory area for tasks to process.
std::array<task_variant, task_inline_process_amount> m_processing_tasks; std::array<task_variant, task_inline_process_amount> m_processing_tasks;
@ -766,7 +771,7 @@ private:
// Signal to the event loop there is a task to resume if one hasn't already been sent. // Signal to the event loop there is a task to resume if one hasn't already been sent.
bool expected{false}; bool expected{false};
if(m_event_set.compare_exchange_strong(expected, true, std::memory_order_release, std::memory_order_relaxed)) if(m_event_set.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed))
{ {
uint64_t value{1}; uint64_t value{1};
::write(m_accept_fd, &value, sizeof(value)); ::write(m_accept_fd, &value, sizeof(value));
@ -793,12 +798,30 @@ private:
else else
{ {
// This task completed synchronously. // This task completed synchronously.
m_size.fetch_sub(1, std::memory_order_relaxed); m_size.fetch_sub(1, std::memory_order::relaxed);
} }
} }
else else
{ {
m_size.fetch_sub(1, std::memory_order_relaxed); m_size.fetch_sub(1, std::memory_order::relaxed);
}
}
inline auto process_task_variant(task_variant& tv) -> void
{
if(std::holds_alternative<coro::task<void>>(tv))
{
auto& task = std::get<coro::task<void>>(tv);
task_start(task);
}
else
{
auto handle = std::get<std::coroutine_handle<>>(tv);
if(!handle.done())
{
handle.resume();
}
} }
} }
@ -818,34 +841,22 @@ private:
// The queue is empty, we are done here. // The queue is empty, we are done here.
if(amount == 0) if(amount == 0)
{ {
return; // no more pending tasks return;
} }
for(std::size_t i = 0; i < amount; ++i) for(std::size_t i = 0 ; i < amount; ++i)
{ {
auto& task_v = m_processing_tasks[i]; process_task_variant(m_processing_tasks[i]);
if(std::holds_alternative<coro::task<void>>(task_v))
{
auto& task = std::get<coro::task<void>>(task_v);
task_start(task);
}
else
{
auto handle = std::get<std::coroutine_handle<>>(task_v);
if(!handle.done())
{
handle.resume();
}
}
} }
} }
auto process_events_internal(std::chrono::milliseconds user_timeout) -> void auto process_events_poll_execute(std::chrono::milliseconds user_timeout) -> void
{ {
std::atomic_thread_fence(std::memory_order_acquire); // Need to acquire m_accept_queue size to determine if there are any pending tasks.
std::atomic_thread_fence(std::memory_order::acquire);
bool tasks_ready = !m_accept_queue.empty(); bool tasks_ready = !m_accept_queue.empty();
// bool tasks_ready = m_event_set.load(std::memory_order_acquire); // bool tasks_ready = m_event_set.load(std::memory_order::acquire);
auto timeout = (tasks_ready) ? m_no_timeout : user_timeout; auto timeout = (tasks_ready) ? m_no_timeout : user_timeout;
// Poll is run every iteration to make sure 'waiting' events are properly put into // Poll is run every iteration to make sure 'waiting' events are properly put into
@ -853,7 +864,7 @@ private:
auto event_count = epoll_wait(m_epoll_fd, m_events.data(), m_max_events, timeout.count()); auto event_count = epoll_wait(m_epoll_fd, m_events.data(), m_max_events, timeout.count());
if(event_count > 0) if(event_count > 0)
{ {
for(std::size_t i = 0; i < event_count; ++i) for(std::size_t i = 0; i < static_cast<std::size_t>(event_count); ++i)
{ {
void* handle_ptr = m_events[i].data.ptr; void* handle_ptr = m_events[i].data.ptr;
@ -870,8 +881,8 @@ private:
while(!m_event_set.compare_exchange_weak( while(!m_event_set.compare_exchange_weak(
expected, expected,
false, false,
std::memory_order_release, std::memory_order::release,
std::memory_order_relaxed)) { } std::memory_order::relaxed)) { }
tasks_ready = true; tasks_ready = true;
} }
@ -892,41 +903,41 @@ private:
if(!m_task_manager.delete_tasks_empty()) if(!m_task_manager.delete_tasks_empty())
{ {
m_size.fetch_sub(m_task_manager.gc(), std::memory_order_relaxed); m_size.fetch_sub(m_task_manager.gc(), std::memory_order::relaxed);
} }
} }
auto process_events_internal_set_thread(std::chrono::milliseconds user_timeout) -> void auto process_events_external_thread(std::chrono::milliseconds user_timeout) -> void
{ {
// Do not allow two threads to process events at the same time. // Do not allow two threads to process events at the same time.
bool expected{false}; bool expected{false};
if(m_running.compare_exchange_strong( if(m_running.compare_exchange_strong(
expected, expected,
true, true,
std::memory_order_release, std::memory_order::release,
std::memory_order_relaxed)) std::memory_order::relaxed))
{ {
process_events_internal(user_timeout); process_events_poll_execute(user_timeout);
m_running.exchange(false, std::memory_order_release); m_running.exchange(false, std::memory_order::release);
} }
} }
auto run() -> void auto process_events_dedicated_thread() -> void
{ {
m_running.exchange(true, std::memory_order_release); m_running.exchange(true, std::memory_order::release);
// Execute tasks until stopped or there are more tasks to complete. // Execute tasks until stopped or there are more tasks to complete.
while(!m_shutdown_requested.load(std::memory_order_relaxed) || m_size.load(std::memory_order_relaxed) > 0) while(!m_shutdown_requested.load(std::memory_order::relaxed) || m_size.load(std::memory_order::relaxed) > 0)
{ {
process_events_internal(m_default_timeout); process_events_poll_execute(m_default_timeout);
} }
m_running.exchange(false, std::memory_order_release); m_running.exchange(false, std::memory_order::release);
} }
}; };
template<typename return_type> template<typename return_type>
inline auto resume_token<return_type>::resume(return_type result) noexcept -> void inline auto resume_token<return_type>::resume(return_type result) noexcept -> void
{ {
void* old_value = m_state.exchange(this, std::memory_order_acq_rel); void* old_value = m_state.exchange(this, std::memory_order::acq_rel);
if(old_value != this) if(old_value != this)
{ {
m_result = std::move(result); m_result = std::move(result);
@ -957,7 +968,7 @@ inline auto resume_token<return_type>::resume(return_type result) noexcept -> vo
inline auto resume_token<void>::resume() noexcept -> void inline auto resume_token<void>::resume() noexcept -> void
{ {
void* old_value = m_state.exchange(this, std::memory_order_acq_rel); void* old_value = m_state.exchange(this, std::memory_order::acq_rel);
if(old_value != this) if(old_value != this)
{ {
auto* waiters = static_cast<awaiter*>(old_value); auto* waiters = static_cast<awaiter*>(old_value);

View file

@ -37,7 +37,7 @@ TEST_CASE("benchmark counter func direct call")
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
auto func = [&]() -> void auto func = [&]() -> void
{ {
++counter; counter.fetch_add(1, std::memory_order::relaxed);
return; return;
}; };
@ -58,7 +58,7 @@ TEST_CASE("benchmark counter func coro::sync_wait(awaitable)")
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
auto func = [&]() -> coro::task<void> auto func = [&]() -> coro::task<void>
{ {
++counter; counter.fetch_add(1, std::memory_order::relaxed);
co_return; co_return;
}; };
@ -80,7 +80,7 @@ TEST_CASE("benchmark counter func coro::sync_wait_all(awaitable)")
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
auto func = [&]() -> coro::task<void> auto func = [&]() -> coro::task<void>
{ {
++counter; counter.fetch_add(1, std::memory_order::relaxed);
co_return; co_return;
}; };
@ -103,7 +103,7 @@ TEST_CASE("benchmark counter task scheduler")
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
auto func = [&]() -> coro::task<void> auto func = [&]() -> coro::task<void>
{ {
++counter; counter.fetch_add(1, std::memory_order::relaxed);
co_return; co_return;
}; };
@ -137,7 +137,7 @@ TEST_CASE("benchmark counter task scheduler yield -> resume from main")
auto wait_func = [&](std::size_t index) -> coro::task<void> auto wait_func = [&](std::size_t index) -> coro::task<void>
{ {
co_await s.yield<void>(tokens[index]); co_await s.yield<void>(tokens[index]);
++counter; counter.fetch_add(1, std::memory_order::relaxed);
co_return; co_return;
}; };
@ -178,7 +178,7 @@ TEST_CASE("benchmark counter task scheduler yield -> resume from coroutine")
auto wait_func = [&](std::size_t index) -> coro::task<void> auto wait_func = [&](std::size_t index) -> coro::task<void>
{ {
co_await s.yield<void>(tokens[index]); co_await s.yield<void>(tokens[index]);
++counter; counter.fetch_add(1, std::memory_order::relaxed);
co_return; co_return;
}; };
@ -221,7 +221,7 @@ TEST_CASE("benchmark counter task scheduler resume from coroutine -> yield")
auto wait_func = [&](std::size_t index) -> coro::task<void> auto wait_func = [&](std::size_t index) -> coro::task<void>
{ {
co_await s.yield<void>(tokens[index]); co_await s.yield<void>(tokens[index]);
++counter; counter.fetch_add(1, std::memory_order::relaxed);
co_return; co_return;
}; };
@ -264,7 +264,7 @@ TEST_CASE("benchmark counter task scheduler yield (all) -> resume (all) from cor
auto wait_func = [&](std::size_t index) -> coro::task<void> auto wait_func = [&](std::size_t index) -> coro::task<void>
{ {
co_await s.yield<void>(tokens[index]); co_await s.yield<void>(tokens[index]);
++counter; counter.fetch_add(1, std::memory_order::relaxed);
co_return; co_return;
}; };

View file

@ -10,6 +10,17 @@
using namespace std::chrono_literals; using namespace std::chrono_literals;
TEST_CASE("scheduler sizeof()")
{
std::cerr << "sizeof(coro::scheduler)=[" << sizeof(coro::scheduler) << "]\n";
std::cerr << "sizeof(coro:task<void>)=[" << sizeof(coro::task<void>) << "]\n";
std::cerr << "sizeof(std::coroutine_handle<>)=[" << sizeof(std::coroutine_handle<>) << "]\n";
std::cerr << "sizeof(std::variant<std::coroutine_handle<>>)=[" << sizeof(std::variant<std::coroutine_handle<>>) << "]\n";
REQUIRE(true);
}
TEST_CASE("scheduler submit single task") TEST_CASE("scheduler submit single task")
{ {
std::atomic<uint64_t> counter{0}; std::atomic<uint64_t> counter{0};
@ -103,7 +114,7 @@ TEST_CASE("scheduler task with multiple yields on event")
co_return; co_return;
}; };
auto resume_task = [&](coro::resume_token<uint64_t>& token, int expected) { auto resume_task = [&](coro::resume_token<uint64_t>& token, uint64_t expected) {
token.resume(1); token.resume(1);
while(counter != expected) while(counter != expected)
{ {
@ -398,7 +409,7 @@ TEST_CASE("scheduler trigger growth of internal tasks storage")
constexpr std::size_t iterations{512}; constexpr std::size_t iterations{512};
coro::scheduler s{coro::scheduler::options{.reserve_size = 1}}; coro::scheduler s{coro::scheduler::options{.reserve_size = 1}};
auto wait_func = [&](uint64_t id, std::chrono::milliseconds wait_time) -> coro::task<void> auto wait_func = [&](std::chrono::milliseconds wait_time) -> coro::task<void>
{ {
co_await s.yield_for(wait_time); co_await s.yield_for(wait_time);
++counter; ++counter;
@ -407,7 +418,7 @@ TEST_CASE("scheduler trigger growth of internal tasks storage")
for(std::size_t i = 0; i < iterations; ++i) for(std::size_t i = 0; i < iterations; ++i)
{ {
s.schedule(wait_func(i, std::chrono::milliseconds{50})); s.schedule(wait_func(std::chrono::milliseconds{50}));
} }
s.shutdown(); s.shutdown();