#include "AsyncManager.hpp" #include "Runtime.hpp" #include #include #ifndef PLATFORM_WINDOWS # include #else # include #endif void AsyncManager::cleanFutureMap(SockFutureMap& map) { std::vector erasureQueue; for (auto it = map.begin(); it != map.end(); it++) { if (!it->second) [[unlikely]] { erasureQueue.push_back(it); } } for (auto& it : erasureQueue) { map.erase(it); } } void AsyncManager::run() { // We should stop once there is nothing left to do if (sockReads.empty() && sockWrites.empty()) [[unlikely]] { stopping = true; return; } // We need to keep track of the highest fd for socket() int maxFd = 0; // Create except FD set fd_set exceptFds; FD_ZERO(&exceptFds); // Create write FD set fd_set writeFds; FD_ZERO(&writeFds); for (const auto& [fd, cb] : sockWrites) { FD_SET(fd, &writeFds); FD_SET(fd, &exceptFds); if (fd > maxFd) { maxFd = fd; } } // Create read FD set fd_set readFds; FD_ZERO(&readFds); for (const auto& [fd, cb] : sockReads) { FD_SET(fd, &readFds); FD_SET(fd, &exceptFds); if (fd > maxFd) { maxFd = fd; } } // Specify timeout timeval tv{ .tv_sec = 0, .tv_usec = 250000 }; // select() until there is data bool error = false; if (select(maxFd+1, &readFds, &writeFds, &exceptFds, &tv) < 0) { FD_ZERO(&readFds); FD_ZERO(&writeFds); error = true; } // Execution queue std::vector> execQueue; // Collect all write futures for (auto& [fd, future] : sockWrites) { if (FD_ISSET(fd, &writeFds)) { // Socket is ready for writing execQueue.push_back({future, false}); } if (FD_ISSET(fd, &exceptFds) || error) [[unlikely]] { // An exception happened in the socket execQueue.push_back({future, true}); } } // Collect all read futures for (auto& [fd, future] : sockReads) { if (FD_ISSET(fd, &readFds)) { // Socket is ready for reading execQueue.push_back({future, false}); } if (FD_ISSET(fd, &exceptFds) || error) [[unlikely]] { // An exception happened in the socket execQueue.push_back({future, true}); } } // Set futures for (auto& [future, value] : execQueue) { future->set(value?AsyncResult::Error:AsyncResult::Success); future = nullptr; } // Clean future maps cleanFutureMap(sockWrites); cleanFutureMap(sockReads); }