1
0
Fork 0
mirror of https://gitlab.com/niansa/llama_any.git synced 2025-03-06 20:48:27 +01:00
llama_any/AsyncManager.cpp
2023-04-06 10:00:27 +02:00

114 lines
3.1 KiB
C++

#include "AsyncManager.hpp"
#include "Runtime.hpp"
#include <vector>
#include <utility>
#ifndef PLATFORM_WINDOWS
# include <sys/select.h>
#else
# include <ws2tcpip.h>
#endif
void AsyncManager::cleanFutureMap(SockFutureMap& map) {
std::vector<SockFutureMap::iterator> erasureQueue;
for (auto it = map.begin(); it != map.end(); it++) {
if (!it->second) [[unlikely]] {
erasureQueue.push_back(it);
}
}
for (auto& it : erasureQueue) {
map.erase(it);
}
}
void AsyncManager::run() {
while (!stopping && runtime.cooperate()) {
// We should stop once there is nothing left to do
if (sockReads.empty() && sockWrites.empty()) [[unlikely]] {
break;
}
// We need to keep track of the highest fd for socket()
int maxFd = 0;
// Create except FD set
fd_set exceptFds;
FD_ZERO(&exceptFds);
// Create write FD set
fd_set writeFds;
FD_ZERO(&writeFds);
for (const auto& [fd, cb] : sockWrites) {
FD_SET(fd, &writeFds);
FD_SET(fd, &exceptFds);
if (fd > maxFd) {
maxFd = fd;
}
}
// Create read FD set
fd_set readFds;
FD_ZERO(&readFds);
for (const auto& [fd, cb] : sockReads) {
FD_SET(fd, &readFds);
FD_SET(fd, &exceptFds);
if (fd > maxFd) {
maxFd = fd;
}
}
// Specify timeout
timeval tv{
.tv_sec = 0,
.tv_usec = 250000
};
// select() until there is data
bool error = false;
if (select(maxFd+1, &readFds, &writeFds, &exceptFds, &tv) < 0) {
FD_ZERO(&readFds);
FD_ZERO(&writeFds);
error = true;
}
// Execution queue
std::vector<std::pair<SockFutureUnique&, bool>> execQueue;
// Collect all write futures
for (auto& [fd, future] : sockWrites) {
if (FD_ISSET(fd, &writeFds)) {
// Socket is ready for writing
execQueue.push_back({future, false});
}
if (FD_ISSET(fd, &exceptFds) || error) [[unlikely]] {
// An exception happened in the socket
execQueue.push_back({future, true});
}
}
// Collect all read futures
for (auto& [fd, future] : sockReads) {
if (FD_ISSET(fd, &readFds)) {
// Socket is ready for reading
execQueue.push_back({future, false});
}
if (FD_ISSET(fd, &exceptFds) || error) [[unlikely]] {
// An exception happened in the socket
execQueue.push_back({future, true});
}
}
// Set futures
for (auto& [future, value] : execQueue) {
future->set(value?Result::Error:Result::Success);
future = nullptr;
}
// Clean future maps
cleanFutureMap(sockWrites);
cleanFutureMap(sockReads);
}
stopping = false;
}