1
0
Fork 0
mirror of https://gitlab.com/niansa/libasync.git synced 2025-03-06 20:53:29 +01:00
libasync/include/async/oneshot-event.hpp
2021-05-20 17:15:03 +02:00

155 lines
3 KiB
C++

#pragma once
#include <async/algorithm.hpp>
#include <async/cancellation.hpp>
#include <frg/functional.hpp>
#include <frg/list.hpp>
namespace async {
struct oneshot_event {
private:
struct node {
friend struct oneshot_event;
node() = default;
node(const node &) = delete;
node &operator= (const node &) = delete;
virtual void complete() = 0;
protected:
~node() = default;
private:
// Protected by mutex_.
frg::default_list_hook<node> _hook;
};
public:
void raise() {
// Grab all items and mark them as retired while we hold the lock.
frg::intrusive_list<
node,
frg::locate_member<
node,
frg::default_list_hook<node>,
&node::_hook
>
> items;
{
frg::unique_lock lock(mutex_);
assert(!raised_);
items.splice(items.end(), queue_);
raised_ = true;
}
// Now invoke the individual callbacks.
while(!items.empty()) {
auto item = items.front();
items.pop_front();
item->complete();
}
}
// ----------------------------------------------------------------------------------
// wait() and its boilerplate.
// ----------------------------------------------------------------------------------
template<typename Receiver>
struct wait_operation final : private node {
wait_operation(oneshot_event *evt, cancellation_token ct, Receiver r)
: evt_{evt}, ct_{std::move(ct)}, r_{std::move(r)}, cobs_{this} { }
bool start_inline() {
bool cancelled = false;
{
frg::unique_lock lock(evt_->mutex_);
if(!evt_->raised_) {
if(!cobs_.try_set(ct_)) {
cancelled = true;
}else{
evt_->queue_.push_back(this);
return false;
}
}
}
execution::set_value_inline(r_, !cancelled);
return true;
}
private:
void cancel() {
bool cancelled = false;
{
frg::unique_lock lock(evt_->mutex_);
if(!evt_->raised_) {
cancelled = true;
auto it = evt_->queue_.iterator_to(this);
evt_->queue_.erase(it);
}
}
execution::set_value_noinline(r_, !cancelled_);
}
void complete() override {
if(cobs_.try_reset())
execution::set_value_noinline(r_, true);
}
oneshot_event *evt_;
cancellation_token ct_;
Receiver r_;
cancellation_observer<frg::bound_mem_fn<&wait_operation::cancel>> cobs_;
bool cancelled_ = false;
};
struct [[nodiscard]] wait_sender {
using value_type = bool;
template<typename Receiver>
friend wait_operation<Receiver> connect(wait_sender s, Receiver r) {
return {s.evt, s.ct, std::move(r)};
}
sender_awaiter<wait_sender, bool> operator co_await () {
return {*this};
}
oneshot_event *evt;
cancellation_token ct;
};
wait_sender wait(cancellation_token ct) {
return {this, ct};
}
auto wait() {
return async::transform(wait(cancellation_token{}), [] (bool waitSuccess) {
assert(waitSuccess);
});
}
private:
platform::mutex mutex_;
bool raised_ = false;
frg::intrusive_list<
node,
frg::locate_member<
node,
frg::default_list_hook<node>,
&node::_hook
>
> queue_;
};
} // namespace async