1
0
Fork 0
mirror of https://gitlab.com/niansa/cosched.git synced 2025-03-06 20:53:26 +01:00

Implemented ScheduledThread

This commit is contained in:
niansa/tuxifan 2023-05-04 15:06:26 +02:00
parent c84bb0d571
commit 637889421a
5 changed files with 120 additions and 18 deletions

View file

@ -7,12 +7,12 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON)
add_subdirectory(libasync)
add_library(cosched STATIC scheduler.cpp include/scheduler.hpp)
add_library(cosched STATIC scheduler.cpp include/scheduler.hpp include/scheduled_thread.hpp)
target_link_libraries(cosched PUBLIC async)
target_include_directories(cosched PUBLIC include/)
#add_executable(test test.cpp)
#target_link_libraries(test PRIVATE cosched)
add_executable(test test.cpp)
target_link_libraries(test PRIVATE cosched)
install(TARGETS cosched
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR})

View file

@ -0,0 +1,79 @@
#ifndef SCHEDULED_THREAD_HPP
#define SCHEDULED_THREAD_HPP
#include "scheduler.hpp"
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
namespace CoSched {
class ScheduledThread {
std::thread thread;
std::mutex queue_mutex;
std::queue<std::function<void (Scheduler&)>> queue;
std::mutex conditional_mutex;
std::condition_variable conditional_lock;
bool shutdown_requested = false;
bool joined = false;
void main_loop() {
// Create scheduler
Scheduler sched;
// Loop until shutdown is requested
while (!shutdown_requested) {
// Start all new tasks enqueued
std::scoped_lock L(queue_mutex);
while (!queue.empty()) {
auto f = std::move(queue.front());
queue.pop();
f(sched);
}
// Run once
sched.run_once();
// Wait for work if there is none
if (!sched.has_work()) {
if (joined) break;
std::unique_lock<std::mutex> lock(conditional_mutex);
conditional_lock.wait(lock);
}
}
}
public:
ScheduledThread() {}
void start() {
thread = std::thread([this] () {
main_loop();
});
}
// DO NOT call from within a task
void enqueue(const std::function<void (Scheduler&)>& f) {
// Enqueue function
{
std::scoped_lock L(queue_mutex);
queue.emplace(f);
}
// Notify thread
conditional_lock.notify_one();
}
void wait() {
joined = true;
thread.join();
}
void shutdown() {
enqueue([this] (Scheduler&) {
shutdown_requested = true;
});
}
};
}
#endif // SCHEDULED_THREAD_HPP

View file

@ -104,8 +104,8 @@ class Scheduler {
std::mutex tasks_mutex;
std::vector<std::unique_ptr<Task>> tasks;
void clean_task(Task *task);
void delete_task(Task *task);
Task *get_next_task();
public:
@ -113,24 +113,38 @@ public:
Scheduler(const Scheduler&) = delete;
Scheduler(Scheduler&&) = delete;
// Returns all tasks
const auto& get_tasks() const {
return tasks;
}
// Checks if there is nothing left to do
bool has_work() const {
return !tasks.empty();
}
// Creates new task, returns it and switches to it
// DO NOT call from within a task
void create_task(const std::string& name) {
// Clean up old task
clean_task(Task::current);
// Create and switch to new task
std::scoped_lock L(tasks_mutex);
Task::current = tasks.emplace_back(std::make_unique<Task>(this, name)).get();
}
// Run until there are no more tasks left to process
// DO NOT call from within a task
void run() {
while (!tasks.empty()) {
// Repeat while we have work to do
while (has_work()) {
run_once();
}
}
// Run once
// DO NOT call from within a task
void run_once();
};
}

View file

@ -26,6 +26,13 @@ async::result<bool> CoSched::Task::yield() {
}
void CoSched::Scheduler::clean_task(Task *task) {
// If current task isn't sleeping, it is considered a zombie so removed from list
if (task && task->state != TaskState::sleeping) {
delete_task(std::exchange(task, nullptr));
}
}
void CoSched::Scheduler::delete_task(Task *task) {
std::scoped_lock L(tasks_mutex);
tasks.erase(std::find_if(tasks.begin(), tasks.end(), [task] (const auto& o) {return o.get() == task;}));
@ -66,10 +73,8 @@ CoSched::Task *CoSched::Scheduler::get_next_task() {
}
void CoSched::Scheduler::run_once() {
// If current task isn't sleeping, it is considered a zombie so removed from list
if (Task::current && Task::current->state != TaskState::sleeping) {
delete_task(std::exchange(Task::current, nullptr));
}
// Clean up old task
clean_task(Task::current);
// Get new task
Task::current = get_next_task();
@ -78,4 +83,5 @@ void CoSched::Scheduler::run_once() {
if (Task::current) Task::current->resume.raise();
}
thread_local CoSched::Task *CoSched::Task::current;

View file

@ -1,4 +1,4 @@
#include "scheduler.hpp"
#include "scheduled_thread.hpp"
#include <iostream>
#include <string>
@ -14,14 +14,17 @@ async::result<void> test_task() {
}
int main () {
CoSched::Scheduler scheduler;
CoSched::ScheduledThread scheduler;
for (const auto& name : {"A", "B", "C", "D", "E", "F"}) {
scheduler.create_task(name);
async::detach(test_task());
auto& task = CoSched::Task::get_current();
if (task.get_name() == "B" || task.get_name() == "D") {
task.set_priority(CoSched::PRIO_HIGH);
}
scheduler.enqueue([name] (CoSched::Scheduler& scheduler) {
scheduler.create_task(name);
async::detach(test_task());
auto& task = CoSched::Task::get_current();
if (task.get_name() == "B" || task.get_name() == "D") {
task.set_priority(CoSched::PRIO_HIGH);
}
});
}
scheduler.run();
scheduler.start();
scheduler.wait();
}