#pragma once #include #include #include #include #include #include #include #ifndef LIBASYNC_CUSTOM_PLATFORM #include #include #include namespace async::platform { using mutex = std::mutex; [[noreturn]] inline void panic(const char *str) { std::cerr << str << std::endl; std::terminate(); } } // namespace async::platform #else #include #endif #if __has_include() && !defined(LIBASYNC_FORCE_USE_EXPERIMENTAL) #include namespace corons = std; #else #include namespace corons = std::experimental; #endif namespace async { template requires requires(E &&e) { operator co_await(std::forward(e)); } auto make_awaiter(E &&e) { return operator co_await(std::forward(e)); } template requires requires(E &&e) { std::forward(e).operator co_await(); } auto make_awaiter(E &&e) { return std::forward(e).operator co_await(); } // ---------------------------------------------------------------------------- // sender_awaiter template. // ---------------------------------------------------------------------------- template struct [[nodiscard]] sender_awaiter { private: struct receiver { void set_value_inline(T result) { p_->result_.emplace(std::move(result)); } void set_value_noinline(T result) { p_->result_.emplace(std::move(result)); p_->h_.resume(); } sender_awaiter *p_; }; public: sender_awaiter(S sender) : operation_{execution::connect(std::move(sender), receiver{this})} { } bool await_ready() { return false; } bool await_suspend(corons::coroutine_handle<> h) { h_ = h; return !execution::start_inline(operation_); } T await_resume() { return std::move(*result_); } execution::operation_t operation_; corons::coroutine_handle<> h_; frg::optional result_; }; // Specialization of sender_awaiter for void return types. template struct [[nodiscard]] sender_awaiter { private: struct receiver { void set_value_inline() { // Do nothing. } void set_value_noinline() { p_->h_.resume(); } sender_awaiter *p_; }; public: sender_awaiter(S sender) : operation_{execution::connect(std::move(sender), receiver{this})} { } bool await_ready() { return false; } bool await_suspend(corons::coroutine_handle<> h) { h_ = h; return !execution::start_inline(operation_); } void await_resume() { // Do nothing. } execution::operation_t operation_; corons::coroutine_handle<> h_; }; // ---------------------------------------------------------------------------- // any_receiver. // ---------------------------------------------------------------------------- // This form of any_receiver is a broken concept: because it directly forwards // the value of the set_value() function, it requires a virtual call even // if we add an inline return path. template struct any_receiver { template any_receiver(R receiver) { static_assert(std::is_trivially_copyable_v); static_assert(sizeof(R) <= sizeof(void *)); static_assert(alignof(R) <= alignof(void *)); new (stor_) R(receiver); set_value_fptr_ = [] (void *p, T value) { auto *rp = static_cast(p); execution::set_value_noinline(*rp, std::move(value)); }; } void set_value(T value) { set_value_fptr_(stor_, std::move(value)); } void set_value_noinline(T value) { set_value_fptr_(stor_, std::move(value)); } private: alignas(alignof(void *)) char stor_[sizeof(void *)]; void (*set_value_fptr_) (void *, T); }; template<> struct any_receiver { template any_receiver(R receiver) { static_assert(std::is_trivially_copyable_v); new (stor_) R(receiver); set_value_fptr_ = [] (void *p) { auto *rp = static_cast(p); execution::set_value_noinline(*rp); }; } void set_value() { set_value_fptr_(stor_); } void set_value_noinline() { set_value_fptr_(stor_); } private: alignas(alignof(void *)) char stor_[sizeof(void *)]; void (*set_value_fptr_) (void *); }; // ---------------------------------------------------------------------------- // Legacy utilities. // ---------------------------------------------------------------------------- template struct callback; template struct callback { private: using storage = frg::aligned_storage; template static R invoke(storage object, Args... args) { return (*reinterpret_cast(&object))(std::move(args)...); } public: callback() : _function(nullptr) { } template::value && std::is_trivially_destructible::value>> callback(F functor) : _function(&invoke) { new (&_object) F{std::move(functor)}; } explicit operator bool () { return static_cast(_function); } R operator() (Args... args) { return _function(_object, std::move(args)...); } private: R (*_function)(storage, Args...); frg::aligned_storage _object; }; // ---------------------------------------------------------------------------- // run_queue implementation. // ---------------------------------------------------------------------------- struct run_queue; run_queue *get_current_queue(); struct run_queue_item { friend struct run_queue; friend struct current_queue_token; friend struct run_queue_token; run_queue_item() = default; run_queue_item(const run_queue_item &) = delete; run_queue_item &operator= (const run_queue_item &) = delete; void arm(callback cb) { assert(!_cb && "run_queue_item is already armed"); assert(cb && "cannot arm run_queue_item with a null callback"); _cb = cb; } private: callback _cb; frg::default_list_hook _hook; }; struct run_queue_token { run_queue_token(run_queue *rq) : rq_{rq} { } void run_iteration(); bool is_drained(); private: run_queue *rq_; }; struct run_queue { friend struct current_queue_token; friend struct run_queue_token; run_queue_token run_token() { return {this}; } void post(run_queue_item *node); private: frg::intrusive_list< run_queue_item, frg::locate_member< run_queue_item, frg::default_list_hook, &run_queue_item::_hook > > _run_list; }; // ---------------------------------------------------------------------------- // Top-level execution functions. // ---------------------------------------------------------------------------- template void run_forever(IoService ios) { while(true) { ios.wait(); } } template std::enable_if_t, void> run(Sender s) { struct receiver { void set_value_inline() { } void set_value_noinline() { } }; auto operation = execution::connect(std::move(s), receiver{}); if(execution::start_inline(operation)) return; platform::panic("libasync: Operation hasn't completed and we don't know how to wait"); } template std::enable_if_t, typename Sender::value_type> run(Sender s) { struct state { frg::optional value; }; struct receiver { receiver(state *stp) : stp_{stp} { } void set_value_inline(typename Sender::value_type value) { stp_->value.emplace(std::move(value)); } void set_value_noinline(typename Sender::value_type value) { stp_->value.emplace(std::move(value)); } private: state *stp_; }; state st; auto operation = execution::connect(std::move(s), receiver{&st}); if (execution::start_inline(operation)) return std::move(*st.value); platform::panic("libasync: Operation hasn't completed and we don't know how to wait"); } template std::enable_if_t, void> run(Sender s, IoService ios) { struct state { bool done = false; }; struct receiver { receiver(state *stp) : stp_{stp} { } void set_value_inline() { stp_->done = true; } void set_value_noinline() { stp_->done = true; } private: state *stp_; }; state st; auto operation = execution::connect(std::move(s), receiver{&st}); if(execution::start_inline(operation)) return; while(!st.done) { ios.wait(); } } template std::enable_if_t, typename Sender::value_type> run(Sender s, IoService ios) { struct state { bool done = false; frg::optional value; }; struct receiver { receiver(state *stp) : stp_{stp} { } void set_value_inline(typename Sender::value_type value) { stp_->value.emplace(std::move(value)); stp_->done = true; } void set_value_noinline(typename Sender::value_type value) { stp_->value.emplace(std::move(value)); stp_->done = true; } private: state *stp_; }; state st; auto operation = execution::connect(std::move(s), receiver{&st}); if(execution::start_inline(operation)) return std::move(*st.value); while(!st.done) { ios.wait(); } return std::move(*st.value); } // ---------------------------------------------------------------------------- // Detached coroutines. // ---------------------------------------------------------------------------- struct detached { struct promise_type { detached get_return_object() { return {}; } corons::suspend_never initial_suspend() { return {}; } corons::suspend_never final_suspend() noexcept { return {}; } void return_void() { // Nothing to do here. } void unhandled_exception() { platform::panic("libasync: Unhandled exception in coroutine"); } }; }; namespace detach_details_ { template struct control_block; template void finalize(control_block *cb); template struct final_receiver { final_receiver(control_block *cb) : cb_{cb} { } void set_value_inline() { finalize(cb_); } void set_value_noinline() { finalize(cb_); } private: control_block *cb_; }; // Heap-allocate data structure that holds the operation. // We cannot directly put the operation onto the heap as it is non-movable. template struct control_block { friend void finalize(control_block *cb) { auto allocator = std::move(cb->allocator); auto continuation = std::move(cb->continuation); frg::destruct(allocator, cb); continuation(); } control_block(Allocator allocator, S sender, Cont continuation) : allocator{std::move(allocator)}, operation{execution::connect( std::move(sender), final_receiver{this})}, continuation{std::move(continuation)} { } Allocator allocator; execution::operation_t> operation; Cont continuation; }; } template void detach_with_allocator(Allocator allocator, S sender, Cont continuation) { auto p = frg::construct>(allocator, allocator, std::move(sender), std::move(continuation)); execution::start_inline(p->operation); } template void detach_with_allocator(Allocator allocator, S sender) { detach_with_allocator(std::move(allocator), std::move(sender), [] { }); } template void detach(S sender) { return detach_with_allocator(frg::stl_allocator{}, std::move(sender)); } template void detach(S sender, Cont continuation) { return detach_with_allocator(frg::stl_allocator{}, std::move(sender), std::move(continuation)); } namespace spawn_details_ { template struct control_block; template void finalize(control_block *cb); template struct final_receiver { final_receiver(control_block *cb) : cb_{cb} { } template void set_value_inline(Args &&... args) { cb_->dr.set_value_inline(std::forward(args)...); finalize(cb_); } template void set_value_noinline(Args &&... args) { cb_->dr.set_value_noinline(std::forward(args)...); finalize(cb_); } private: control_block *cb_; }; // Heap-allocate data structure that holds the operation. // We cannot directly put the operation onto the heap as it is non-movable. template struct control_block { friend void finalize(control_block *cb) { auto allocator = std::move(cb->allocator); frg::destruct(allocator, cb); } control_block(Allocator allocator, S sender, R dr) : allocator{std::move(allocator)}, operation{execution::connect( std::move(sender), final_receiver{this})}, dr{std::move(dr)} { } Allocator allocator; execution::operation_t> operation; R dr; // Downstream receiver. }; } template void spawn_with_allocator(Allocator allocator, S sender, R receiver) { auto p = frg::construct>(allocator, allocator, std::move(sender), std::move(receiver)); execution::start_inline(p->operation); } } // namespace async