1
0
Fork 0
mirror of https://gitlab.com/niansa/cosched.git synced 2025-03-06 20:53:26 +01:00

Replaced libasync with basic-coro

We don't need full-blown libasync here...
This commit is contained in:
niansa/tuxifan 2023-05-05 10:24:05 +02:00
parent 860e097972
commit 3def9bb02c
12 changed files with 501 additions and 34 deletions

View file

@ -4,15 +4,18 @@ project(cosched LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_FLAGS "-fsanitize=undefined,address -fno-omit-frame-pointer")
set(CMAKE_EXE_LINKER_FLAGS "-fsanitize=undefined,address -fno-omit-frame-pointer")
add_subdirectory(libasync)
add_library(cosched STATIC
scheduler.cpp include/scheduler.hpp
scheduled_thread.cpp include/scheduled_thread.hpp
basic-coro/SingleEvent.cpp
)
target_include_directories(cosched PUBLIC include/ basic-coro/include/)
add_library(cosched STATIC scheduler.cpp include/scheduler.hpp scheduled_thread.cpp include/scheduled_thread.hpp)
target_link_libraries(cosched PUBLIC async)
target_include_directories(cosched PUBLIC include/)
#add_executable(test test.cpp)
#target_link_libraries(test PRIVATE cosched)
add_executable(test test.cpp)
target_link_libraries(test PRIVATE cosched)
install(TARGETS cosched
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})

21
basic-coro/LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 Maksymilian Kadukowski
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

68
basic-coro/README.md Normal file
View file

@ -0,0 +1,68 @@
# basic-coro - c++ coroutine library
Library that implements helper types for using c++ coroutines. Please be aware that this is a training project for me - I wanted to learn more about CMake, gtest and git submodules.
## Usage
### Prerequisites
* g++-10
### Installing
```
mkdir build && cd build
cmake -D CMAKE_CXX_COMPILER=g++-10 ..
make install
```
This will install appropriate headers into `./include/` and static linked library into `./lib/`.
### Classes
Library includes following classes:
* `SingleEvent<T>` which models `co_await` enabled event that can be set,
* `AwaitableTask<T>` which models `co_await` enabled task.
Please note that these classes are not multithreading enabled. There is no synchronization or any kind of protection form race conditions. If you need to use coroutines with multithreading, just use [CppCoro](https://github.com/lewissbaker/cppcoro). This library is mostly thought for use with simple GUI programming.
### Example
```c++
#include <iostream>
#include <basiccoro/AwaitableTask.hpp>
#include <basiccoro/SingleEvent.hpp>
basiccoro::AwaitableTask<void> consumer(basiccoro::SingleEvent<int>& event)
{
std::cout << "consumer: start waiting" << std::endl;
while (true)
{
const auto i = co_await event;
std::cout << "consumer: received: " << i << std::endl;
}
}
int main()
{
basiccoro::SingleEvent<int> event;
consumer(event);
while (true)
{
int i = 0;
std::cout << "Enter no.(1-9): ";
std::cin >> i;
if (i == 0)
{
break;
}
else if (1 <= i && i <= 9)
{
event.set(i);
}
}
}
```
Simple example highlighting use of coroutines in producer-consumer problem.
## Acknowledgments
* [CMake C++ Project Template](https://github.com/kigster/cmake-project-template) as this project is based on this template
* Lewis Baker has excellent [articles](https://lewissbaker.github.io/) on topic of coroutines and assymetric transfer. This project is mostly based on information (and code snippets) contained in those articles.

View file

@ -0,0 +1,60 @@
#include "SingleEvent.hpp"
#include <utility>
namespace basiccoro
{
detail::SingleEventBase::SingleEventBase(detail::SingleEventBase&& other)
: waiting_(std::move(other.waiting_))
, isSet_(std::exchange(other.isSet_, false))
{
}
detail::SingleEventBase& detail::SingleEventBase::operator=(detail::SingleEventBase&& other)
{
waiting_ = std::move(other.waiting_);
isSet_ = std::exchange(other.isSet_, false);
return *this;
}
detail::SingleEventBase::~SingleEventBase()
{
for (auto handle : waiting_)
{
handle.destroy();
}
}
void detail::SingleEventBase::set_common()
{
if (!isSet_)
{
if (waiting_.empty())
{
isSet_ = true;
}
else
{
// resuming coroutines can result in
// consecutive co_awaits on this object
auto temp = std::move(waiting_);
for (auto handle : temp)
{
handle.resume();
if (handle.done())
{
handle.destroy();
}
}
}
}
}
SingleEvent<void>::awaiter SingleEvent<void>::operator co_await()
{
return awaiter{*this};
}
} // namespace basiccoro

View file

@ -0,0 +1,195 @@
#pragma once
#include <concepts>
#include <coroutine>
#include <exception>
#include <stdexcept>
#include <utility>
namespace basiccoro
{
namespace detail
{
template<class Derived>
struct PromiseBase
{
auto get_return_object() { return std::coroutine_handle<Derived>::from_promise(static_cast<Derived&>(*this)); }
void unhandled_exception() { std::terminate(); }
};
template<class Derived, class T> requires std::movable<T> || std::same_as<T, void>
struct ValuePromise : public PromiseBase<Derived>
{
using value_type = T;
T val;
void return_value(T t) { val = std::move(t); }
};
template<class Derived>
struct ValuePromise<Derived, void> : public PromiseBase<Derived>
{
using value_type = void;
void return_void() {}
};
template<class T>
class AwaitablePromise : public ValuePromise<AwaitablePromise<T>, T>
{
public:
auto initial_suspend() { return std::suspend_never(); }
auto final_suspend() noexcept
{
if (waiting_)
{
waiting_.resume();
if (waiting_.done())
{
waiting_.destroy();
}
waiting_ = nullptr;
}
return std::suspend_always();
}
void storeWaiting(std::coroutine_handle<> handle)
{
if (waiting_)
{
throw std::runtime_error("AwaitablePromise::storeWaiting(): already waiting");
}
waiting_ = handle;
}
~AwaitablePromise()
{
if (waiting_)
{
waiting_.destroy();
}
}
private:
std::coroutine_handle<> waiting_ = nullptr;
};
template<class Promise>
class TaskBase
{
public:
using promise_type = Promise;
TaskBase();
TaskBase(std::coroutine_handle<promise_type> handle);
TaskBase(const TaskBase&) = delete;
TaskBase(TaskBase&&);
TaskBase& operator=(const TaskBase&) = delete;
TaskBase& operator=(TaskBase&&);
~TaskBase();
bool done() const { return handle_.done(); }
protected:
std::coroutine_handle<promise_type> handle_;
bool handleShouldBeDestroyed_;
};
template<class Promise>
TaskBase<Promise>::TaskBase()
: handle_(nullptr), handleShouldBeDestroyed_(false)
{}
template<class Promise>
TaskBase<Promise>::TaskBase(std::coroutine_handle<promise_type> handle)
: handle_(handle)
{
// TODO: this whole system needs revamping with something like UniqueCoroutineHandle
// and custom static interface to awaiter types - so await_suspend method would take in UniqueCoroutineHandle
if (handle.done())
{
// it is resonable to expect that if the coroutine is done before
// the task creation, then the original stack is continued without suspending,
// and coroutine needs to be destroyed with TaskBase object
handleShouldBeDestroyed_ = true;
}
else
{
// otherwise the coroutine should be managed by object that it is awaiting
handleShouldBeDestroyed_ = false;
}
}
template<class Promise>
TaskBase<Promise>::TaskBase(TaskBase&& other)
: handle_(other.handle_), handleShouldBeDestroyed_(std::exchange(other.handleShouldBeDestroyed_, false))
{
}
template<class Promise>
TaskBase<Promise>& TaskBase<Promise>::operator=(TaskBase&& other)
{
handle_ = other.handle_;
handleShouldBeDestroyed_ = std::exchange(other.handleShouldBeDestroyed_, false);
return *this;
}
template<class Promise>
TaskBase<Promise>::~TaskBase()
{
if (handleShouldBeDestroyed_)
{
handle_.destroy();
}
}
} // namespace detail
template<class T>
class AwaitableTask : public detail::TaskBase<detail::AwaitablePromise<T>>
{
using Base = detail::TaskBase<detail::AwaitablePromise<T>>;
public:
using Base::Base;
class awaiter;
friend class awaiter;
awaiter operator co_await() const;
};
template<class T>
struct AwaitableTask<T>::awaiter
{
bool await_ready()
{
return task_.done();
}
template<class Promise>
void await_suspend(std::coroutine_handle<Promise> handle)
{
task_.handle_.promise().storeWaiting(handle);
}
T await_resume()
{
if constexpr (!std::is_same_v<void, T>)
{
return std::move(task_.handle_.promise().val);
}
}
const AwaitableTask& task_;
};
template<class T>
typename AwaitableTask<T>::awaiter AwaitableTask<T>::operator co_await() const
{
return awaiter{*this};
}
} // namespace basiccoro

View file

@ -0,0 +1,109 @@
#pragma once
#include <coroutine>
#include <optional>
#include <stdexcept>
#include <type_traits>
#include <vector>
namespace basiccoro
{
namespace detail
{
template<class Event>
class AwaiterBase
{
public:
AwaiterBase(Event& event)
: event_(event)
{}
bool await_ready()
{
if (event_.isSet())
{
// unset already set event, then continue coroutine
event_.isSet_ = false;
return true;
}
return false;
}
void await_suspend(std::coroutine_handle<> handle)
{
event_.waiting_.push_back(handle);
}
typename Event::value_type await_resume()
{
if constexpr (!std::is_same_v<typename Event::value_type, void>)
{
if (!event_.result)
{
throw std::runtime_error("AwaiterBase: no value in event_.result");
}
return *event_.result;
}
}
private:
Event& event_;
};
class SingleEventBase
{
public:
SingleEventBase() = default;
SingleEventBase(const SingleEventBase&) = delete;
SingleEventBase(SingleEventBase&&);
SingleEventBase& operator=(const SingleEventBase&) = delete;
SingleEventBase& operator=(SingleEventBase&&);
~SingleEventBase();
bool isSet() const {
return isSet_;
}
protected:
void set_common();
private:
template<class T>
friend class AwaiterBase;
std::vector<std::coroutine_handle<>> waiting_;
bool isSet_ = false;
};
} // namespace detail
template<class T>
class SingleEvent : public detail::SingleEventBase
{
public:
using value_type = T;
using awaiter = detail::AwaiterBase<SingleEvent<T>>;
void set(T t) { result = std::move(t); set_common(); }
awaiter operator co_await() { return awaiter{*this}; }
private:
friend awaiter;
std::optional<T> result;
};
template<>
class SingleEvent<void> : public detail::SingleEventBase
{
public:
using value_type = void;
using awaiter = detail::AwaiterBase<SingleEvent<void>>;
void set() {
set_common();
}
awaiter operator co_await();
};
} // namespace basiccoro

View file

@ -14,7 +14,7 @@ namespace CoSched {
class ScheduledThread {
struct QueueEntry {
std::string task_name;
std::function<async::result<void> ()> task_fcn;
std::function<AwaitableTask<void> ()> task_fcn;
};
std::thread thread;
@ -38,7 +38,7 @@ public:
}
// DO NOT call from within a task
void create_task(const std::string& task_name, const std::function<async::result<void> ()>& task_fcn) {
void create_task(const std::string& task_name, const std::function<AwaitableTask<void> ()>& task_fcn) {
// Enqueue function
{
std::scoped_lock L(queue_mutex);
@ -57,7 +57,7 @@ public:
// MUST already be running
void shutdown() {
create_task("Shutdown Initiator", [this] () -> async::result<void> {
create_task("Shutdown Initiator", [this] () -> AwaitableTask<void> {
shutdown_requested = true;
co_return;
});

View file

@ -5,11 +5,14 @@
#include <mutex>
#include <memory>
#include <chrono>
#include <async/result.hpp>
#include <async/recurring-event.hpp>
#include <AwaitableTask.hpp>
#include <SingleEvent.hpp>
namespace CoSched {
using namespace basiccoro;
using Priority = uint8_t;
enum {
PRIO_LOWEST = -99,
@ -34,7 +37,7 @@ class Task {
static thread_local class Task *current;
class Scheduler *scheduler;
async::recurring_event resume;
std::unique_ptr<SingleEvent<void>> resume_event;
std::chrono::system_clock::time_point stopped_at;
@ -93,7 +96,7 @@ public:
return suspended;
}
async::result<bool> yield();
AwaitableTask<bool> yield();
};

@ -1 +0,0 @@
Subproject commit 721195383b9005006826856fe84c5eabdf350882

View file

@ -14,7 +14,7 @@ void CoSched::ScheduledThread::main_loop() {
auto e = std::move(queue.front());
queue.pop();
sched.create_task(e.task_name);
async::detach(e.task_fcn());
e.task_fcn();
}
}
// Run once

View file

@ -5,11 +5,12 @@
namespace CoSched {
void CoSched::Task::kill() {
get_scheduler().delete_task(this);
}
async::result<bool> CoSched::Task::yield() {
AwaitableTask<bool> Task::yield() {
if (state == TaskState::terminating) {
// If it was terminating, it can finally be declared dead now
state = TaskState::dead;
@ -17,28 +18,30 @@ async::result<bool> CoSched::Task::yield() {
}
// It's just sleeping
state = TaskState::sleeping;
// Create event for resume
resume_event = std::make_unique<SingleEvent<void>>();
// Let's wait until we're back up!
stopped_at = std::chrono::system_clock::now();
co_await resume.async_wait();
co_await *resume_event;
// Here we go, let's keep going...
state = TaskState::running;
co_return true;
}
void CoSched::Scheduler::clean_task(Task *task) {
void Scheduler::clean_task(Task *task) {
// If current task isn't sleeping, it is considered a zombie so removed from list
if (task && task->state != TaskState::sleeping) {
delete_task(std::exchange(task, nullptr));
}
}
void CoSched::Scheduler::delete_task(Task *task) {
void Scheduler::delete_task(Task *task) {
std::scoped_lock L(tasks_mutex);
tasks.erase(std::find_if(tasks.begin(), tasks.end(), [task] (const auto& o) {return o.get() == task;}));
}
CoSched::Task *CoSched::Scheduler::get_next_task() {
Task *Scheduler::get_next_task() {
std::scoped_lock L(tasks_mutex);
// Get tasks with highest priority
@ -72,7 +75,7 @@ CoSched::Task *CoSched::Scheduler::get_next_task() {
return next_task;
}
void CoSched::Scheduler::run_once() {
void Scheduler::run_once() {
// Clean up old task
clean_task(Task::current);
@ -80,8 +83,9 @@ void CoSched::Scheduler::run_once() {
Task::current = get_next_task();
// Resume task if any
if (Task::current) Task::current->resume.raise();
if (Task::current) Task::current->resume_event->set();
}
thread_local CoSched::Task *CoSched::Task::current;
thread_local Task *Task::current;
}

View file

@ -4,11 +4,23 @@
#include <string>
CoSched::AwaitableTask<std::string> get_value() {
std::string fres = CoSched::Task::get_current().get_name();
for (unsigned it = 0; it != 100; it++) {
fres += "Hello";
co_await CoSched::Task::get_current().yield();
}
fres.resize(1);
co_return fres;
}
async::result<void> test_task() {
CoSched::AwaitableTask<void> test_task() {
auto& task = CoSched::Task::get_current();
if (task.get_name() == "B" || task.get_name() == "D") {
task.set_priority(CoSched::PRIO_HIGH);
}
for (unsigned x = 100; co_await task.yield(); x--) {
std::cout << task.get_name() << ": " << x << '\n';
std::cout << co_await get_value() << ": " << x << '\n';
if (x == 10) task.terminate();
}
}
@ -16,14 +28,7 @@ async::result<void> test_task() {
int main () {
CoSched::ScheduledThread scheduler;
for (const auto& name : {"A", "B", "C", "D", "E", "F"}) {
scheduler.enqueue([name] (CoSched::Scheduler& scheduler) {
scheduler.create_task(name);
async::detach(test_task());
auto& task = CoSched::Task::get_current();
if (task.get_name() == "B" || task.get_name() == "D") {
task.set_priority(CoSched::PRIO_HIGH);
}
});
scheduler.create_task(name, test_task);
}
scheduler.start();
scheduler.wait();