mirror of
https://gitlab.com/niansa/libcrosscoro.git
synced 2025-03-06 20:53:32 +01:00
addd latch, renamed "amre" to event
Remove the event return type, this should just be a task<T>
This commit is contained in:
parent
6c593cafad
commit
771e52e985
8 changed files with 263 additions and 95 deletions
|
@ -8,8 +8,9 @@ message("${PROJECT_NAME} CORO_BUILD_TESTS = ${CORO_BUILD_TESTS}")
|
|||
message("${PROJECT_NAME} CORO_CODE_COVERAGE = ${CORO_CODE_COVERAGE}")
|
||||
|
||||
set(LIBCORO_SOURCE_FILES
|
||||
src/coro/async_manual_reset_event.hpp
|
||||
src/coro/coro.hpp
|
||||
src/coro/event.hpp
|
||||
src/coro/latch.hpp
|
||||
src/coro/scheduler.hpp
|
||||
src/coro/sync_wait.hpp
|
||||
src/coro/task.hpp
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#pragma once
|
||||
|
||||
#include "coro/async_manual_reset_event.hpp"
|
||||
#include "coro/event.hpp"
|
||||
#include "coro/latch.hpp"
|
||||
#include "coro/scheduler.hpp"
|
||||
#include "coro/sync_wait.hpp"
|
||||
#include "coro/task.hpp"
|
||||
|
|
|
@ -7,28 +7,43 @@
|
|||
namespace coro
|
||||
{
|
||||
|
||||
template<typename return_type>
|
||||
class async_manual_reset_event
|
||||
class event
|
||||
{
|
||||
public:
|
||||
async_manual_reset_event() noexcept
|
||||
: m_state(nullptr)
|
||||
event(bool initially_set = false) noexcept
|
||||
: m_state((initially_set) ? static_cast<void*>(this) : nullptr)
|
||||
{
|
||||
}
|
||||
virtual ~event() = default;
|
||||
|
||||
async_manual_reset_event(const async_manual_reset_event&) = delete;
|
||||
async_manual_reset_event(async_manual_reset_event&&) = delete;
|
||||
auto operator=(const async_manual_reset_event&) -> async_manual_reset_event& = delete;
|
||||
auto operator=(async_manual_reset_event&&) -> async_manual_reset_event& = delete;
|
||||
event(const event&) = delete;
|
||||
event(event&&) = delete;
|
||||
auto operator=(const event&) -> event& = delete;
|
||||
auto operator=(event&&) -> event& = delete;
|
||||
|
||||
bool is_set() const noexcept
|
||||
{
|
||||
return m_state.load(std::memory_order_acquire) == this;
|
||||
}
|
||||
|
||||
auto set() noexcept -> void
|
||||
{
|
||||
void* old_value = m_state.exchange(this, std::memory_order_acq_rel);
|
||||
if(old_value != this)
|
||||
{
|
||||
auto* waiters = static_cast<awaiter*>(old_value);
|
||||
while(waiters != nullptr)
|
||||
{
|
||||
auto* next = waiters->m_next;
|
||||
waiters->m_awaiting_coroutine.resume();
|
||||
waiters = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct awaiter
|
||||
{
|
||||
awaiter(const async_manual_reset_event& event) noexcept
|
||||
awaiter(const event& event) noexcept
|
||||
: m_event(event)
|
||||
{
|
||||
|
||||
|
@ -70,12 +85,7 @@ public:
|
|||
|
||||
}
|
||||
|
||||
auto return_value() const & -> const return_type&
|
||||
{
|
||||
return m_event.m_return_value;
|
||||
}
|
||||
|
||||
const async_manual_reset_event& m_event;
|
||||
const event& m_event;
|
||||
std::coroutine_handle<> m_awaiting_coroutine;
|
||||
awaiter* m_next{nullptr};
|
||||
};
|
||||
|
@ -85,37 +95,14 @@ public:
|
|||
return awaiter(*this);
|
||||
}
|
||||
|
||||
auto set(return_type return_value) noexcept -> void
|
||||
{
|
||||
void* old_value = m_state.exchange(this, std::memory_order_acq_rel);
|
||||
if(old_value != this)
|
||||
{
|
||||
m_return_value = std::move(return_value);
|
||||
|
||||
auto* waiters = static_cast<awaiter*>(old_value);
|
||||
while(waiters != nullptr)
|
||||
{
|
||||
auto* next = waiters->m_next;
|
||||
waiters->m_awaiting_coroutine.resume();
|
||||
waiters = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto reset() noexcept -> void
|
||||
{
|
||||
void* old_value = this;
|
||||
m_state.compare_exchange_strong(old_value, nullptr, std::memory_order_acquire);
|
||||
}
|
||||
|
||||
auto return_value() const -> const return_type&
|
||||
{
|
||||
return m_return_value;
|
||||
}
|
||||
|
||||
private:
|
||||
protected:
|
||||
friend struct awaiter;
|
||||
return_type m_return_value;
|
||||
mutable std::atomic<void*> m_state;
|
||||
};
|
||||
|
53
src/coro/latch.hpp
Normal file
53
src/coro/latch.hpp
Normal file
|
@ -0,0 +1,53 @@
|
|||
#pragma once
|
||||
|
||||
#include "coro/event.hpp"
|
||||
|
||||
#include <atomic>
|
||||
|
||||
namespace coro
|
||||
{
|
||||
|
||||
class latch
|
||||
{
|
||||
public:
|
||||
latch(std::ptrdiff_t count) noexcept
|
||||
: m_count(count),
|
||||
m_event(count <= 0)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
latch(const latch&) = delete;
|
||||
latch(latch&&) = delete;
|
||||
auto operator=(const latch&) -> latch& = delete;
|
||||
auto operator=(latch&&) -> latch& = delete;
|
||||
|
||||
auto is_ready() const noexcept -> bool
|
||||
{
|
||||
return m_event.is_set();
|
||||
}
|
||||
|
||||
auto remaining() const noexcept -> std::size_t
|
||||
{
|
||||
return m_count.load(std::memory_order::acquire);
|
||||
}
|
||||
|
||||
auto count_down(std::ptrdiff_t n = 1) noexcept -> void
|
||||
{
|
||||
if(m_count.fetch_sub(n, std::memory_order::acq_rel) <= n)
|
||||
{
|
||||
m_event.set();
|
||||
}
|
||||
}
|
||||
|
||||
auto operator co_await() const noexcept -> event::awaiter
|
||||
{
|
||||
return m_event.operator co_await();
|
||||
}
|
||||
|
||||
private:
|
||||
std::atomic<std::ptrdiff_t> m_count;
|
||||
event m_event;
|
||||
};
|
||||
|
||||
} // namespace coro
|
|
@ -3,7 +3,8 @@ project(libcoro_test)
|
|||
|
||||
set(LIBCORO_TEST_SOURCE_FILES
|
||||
bench.cpp
|
||||
test_async_manual_reset_event.cpp
|
||||
test_event.cpp
|
||||
test_latch.cpp
|
||||
test_scheduler.cpp
|
||||
test_sync_wait.cpp
|
||||
test_task.cpp
|
||||
|
|
|
@ -1,53 +0,0 @@
|
|||
#include "catch.hpp"
|
||||
|
||||
#include <coro/coro.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
|
||||
template<typename return_type>
|
||||
auto mre_producer(coro::async_manual_reset_event<return_type>& event, return_type produced_value) -> void
|
||||
{
|
||||
// simulate complicated background task
|
||||
using namespace std::chrono_literals;
|
||||
std::this_thread::sleep_for(10ms);
|
||||
event.set(std::move(produced_value));
|
||||
}
|
||||
|
||||
template<typename return_type>
|
||||
auto mre_consumer(
|
||||
const coro::async_manual_reset_event<return_type>& event
|
||||
) -> coro::task<return_type>
|
||||
{
|
||||
co_await event;
|
||||
co_return event.return_value();
|
||||
}
|
||||
|
||||
TEST_CASE("manual reset event one watcher")
|
||||
{
|
||||
coro::async_manual_reset_event<uint64_t> event{};
|
||||
|
||||
auto value = mre_consumer(event);
|
||||
mre_producer<uint64_t>(event, 42);
|
||||
value.resume();
|
||||
|
||||
REQUIRE(value.promise().result() == 42);
|
||||
}
|
||||
|
||||
TEST_CASE("manual reset event multiple watcher")
|
||||
{
|
||||
std::string expected_value = "Hello World!";
|
||||
coro::async_manual_reset_event<std::string> event{};
|
||||
|
||||
auto value1 = mre_consumer(event);
|
||||
auto value2 = mre_consumer(event);
|
||||
auto value3 = mre_consumer(event);
|
||||
mre_producer<std::string>(event, expected_value);
|
||||
value1.resume();
|
||||
value2.resume();
|
||||
value3.resume();
|
||||
|
||||
REQUIRE(value1.promise().result() == expected_value);
|
||||
REQUIRE(value2.promise().result() == expected_value);
|
||||
REQUIRE(value3.promise().result() == expected_value);
|
||||
}
|
72
test/test_event.cpp
Normal file
72
test/test_event.cpp
Normal file
|
@ -0,0 +1,72 @@
|
|||
#include "catch.hpp"
|
||||
|
||||
#include <coro/coro.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
|
||||
TEST_CASE("event single awaiter")
|
||||
{
|
||||
coro::event e{};
|
||||
|
||||
auto func = [&]() -> coro::task<uint64_t> {
|
||||
co_await e;
|
||||
co_return 42;
|
||||
};
|
||||
|
||||
auto task = func();
|
||||
|
||||
task.resume();
|
||||
REQUIRE_FALSE(task.is_ready());
|
||||
e.set(); // this will automaticaly resume the task that is awaiting the event.
|
||||
REQUIRE(task.is_ready());
|
||||
REQUIRE(task.promise().result() == 42);
|
||||
}
|
||||
|
||||
|
||||
auto producer(coro::event& event) -> void
|
||||
{
|
||||
// Long running task that consumers are waiting for goes here...
|
||||
event.set();
|
||||
}
|
||||
|
||||
auto consumer(const coro::event& event) -> coro::task<uint64_t>
|
||||
{
|
||||
co_await event;
|
||||
// Normally consume from some object which has the stored result from the producer
|
||||
co_return 42;
|
||||
}
|
||||
|
||||
TEST_CASE("event one watcher")
|
||||
{
|
||||
coro::event e{};
|
||||
|
||||
auto value = consumer(e);
|
||||
value.resume(); // start co_awaiting event
|
||||
REQUIRE_FALSE(value.is_ready());
|
||||
|
||||
producer(e);
|
||||
|
||||
REQUIRE(value.promise().result() == 42);
|
||||
}
|
||||
|
||||
TEST_CASE("event multiple watchers")
|
||||
{
|
||||
coro::event e{};
|
||||
|
||||
auto value1 = consumer(e);
|
||||
auto value2 = consumer(e);
|
||||
auto value3 = consumer(e);
|
||||
value1.resume(); // start co_awaiting event
|
||||
value2.resume();
|
||||
value3.resume();
|
||||
REQUIRE_FALSE(value1.is_ready());
|
||||
REQUIRE_FALSE(value2.is_ready());
|
||||
REQUIRE_FALSE(value3.is_ready());
|
||||
|
||||
producer(e);
|
||||
|
||||
REQUIRE(value1.promise().result() == 42);
|
||||
REQUIRE(value2.promise().result() == 42);
|
||||
REQUIRE(value3.promise().result() == 42);
|
||||
}
|
106
test/test_latch.cpp
Normal file
106
test/test_latch.cpp
Normal file
|
@ -0,0 +1,106 @@
|
|||
#include "catch.hpp"
|
||||
|
||||
#include <coro/coro.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
|
||||
|
||||
TEST_CASE("latch count=0")
|
||||
{
|
||||
coro::latch l{0};
|
||||
|
||||
auto task = [&]() -> coro::task<uint64_t>
|
||||
{
|
||||
co_await l;
|
||||
co_return 42;
|
||||
}();
|
||||
|
||||
task.resume();
|
||||
REQUIRE(task.is_ready()); // The latch never waits due to zero count.
|
||||
REQUIRE(task.promise().result() == 42);
|
||||
}
|
||||
|
||||
TEST_CASE("latch count=1")
|
||||
{
|
||||
coro::latch l{1};
|
||||
|
||||
auto task = [&]() -> coro::task<uint64_t>
|
||||
{
|
||||
auto workers = l.remaining();
|
||||
co_await l;
|
||||
co_return workers;
|
||||
}();
|
||||
|
||||
task.resume();
|
||||
REQUIRE_FALSE(task.is_ready());
|
||||
|
||||
l.count_down();
|
||||
REQUIRE(task.is_ready());
|
||||
REQUIRE(task.promise().result() == 1);
|
||||
}
|
||||
|
||||
TEST_CASE("latch count=1 count_down=5")
|
||||
{
|
||||
coro::latch l{1};
|
||||
|
||||
auto task = [&]() -> coro::task<uint64_t>
|
||||
{
|
||||
auto workers = l.remaining();
|
||||
co_await l;
|
||||
co_return workers;
|
||||
}();
|
||||
|
||||
task.resume();
|
||||
REQUIRE_FALSE(task.is_ready());
|
||||
|
||||
l.count_down(5);
|
||||
REQUIRE(task.is_ready());
|
||||
REQUIRE(task.promise().result() == 1);
|
||||
}
|
||||
|
||||
TEST_CASE("latch count=5 count_down=1 x5")
|
||||
{
|
||||
coro::latch l{5};
|
||||
|
||||
auto task = [&]() -> coro::task<uint64_t>
|
||||
{
|
||||
auto workers = l.remaining();
|
||||
co_await l;
|
||||
co_return workers;
|
||||
}();
|
||||
|
||||
task.resume();
|
||||
REQUIRE_FALSE(task.is_ready());
|
||||
|
||||
l.count_down(1);
|
||||
REQUIRE_FALSE(task.is_ready());
|
||||
l.count_down(1);
|
||||
REQUIRE_FALSE(task.is_ready());
|
||||
l.count_down(1);
|
||||
REQUIRE_FALSE(task.is_ready());
|
||||
l.count_down(1);
|
||||
REQUIRE_FALSE(task.is_ready());
|
||||
l.count_down(1);
|
||||
REQUIRE(task.is_ready());
|
||||
REQUIRE(task.promise().result() == 5);
|
||||
}
|
||||
|
||||
TEST_CASE("latch count=5 count_down=5")
|
||||
{
|
||||
coro::latch l{5};
|
||||
|
||||
auto task = [&]() -> coro::task<uint64_t>
|
||||
{
|
||||
auto workers = l.remaining();
|
||||
co_await l;
|
||||
co_return workers;
|
||||
}();
|
||||
|
||||
task.resume();
|
||||
REQUIRE_FALSE(task.is_ready());
|
||||
|
||||
l.count_down(5);
|
||||
REQUIRE(task.is_ready());
|
||||
REQUIRE(task.promise().result() == 5);
|
||||
}
|
Loading…
Add table
Reference in a new issue