1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

tcp_client (#22)

* tcp_client

fixes #21

* remove double ci build
This commit is contained in:
Josh Baldwin 2020-12-27 14:32:03 -07:00 committed by GitHub
parent b15c7c1d16
commit e11058ef22
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 263 additions and 65 deletions

View file

@ -1,6 +1,6 @@
name: build
on: [pull_request, push]
on: [pull_request]
jobs:
build-ubuntu-20-04:

View file

@ -11,6 +11,7 @@ set(LIBCORO_SOURCE_FILES
inc/coro/detail/void_value.hpp
inc/coro/awaitable.hpp
inc/coro/connect.hpp src/connect.cpp
inc/coro/coro.hpp
inc/coro/event.hpp src/event.cpp
inc/coro/generator.hpp
@ -22,6 +23,7 @@ set(LIBCORO_SOURCE_FILES
inc/coro/socket.hpp
inc/coro/sync_wait.hpp src/sync_wait.cpp
inc/coro/task.hpp
inc/coro/tcp_client.hpp src/tcp_client.cpp
inc/coro/tcp_scheduler.hpp
inc/coro/thread_pool.hpp src/thread_pool.cpp
inc/coro/when_all.hpp

21
inc/coro/connect.hpp Normal file
View file

@ -0,0 +1,21 @@
#pragma once
#include <string>
namespace coro
{
enum class connect_status
{
/// The connection has been established.
connected,
/// The given ip address could not be parsed or is invalid.
invalid_ip_address,
/// The connection operation timed out.
timeout,
/// There was an error, use errno to get more information on the specific error.
error
};
auto to_string(const connect_status& status) -> const std::string&;
} // namespace coro

View file

@ -8,6 +8,7 @@
#include "coro/promise.hpp"
#include "coro/sync_wait.hpp"
#include "coro/task.hpp"
#include "coro/tcp_client.hpp"
#include "coro/tcp_scheduler.hpp"
#include "coro/thread_pool.hpp"
#include "coro/when_all.hpp"

View file

@ -36,7 +36,7 @@ public:
event(const event&) = delete;
event(event&&) = delete;
auto operator=(const event&) -> event& = delete;
auto operator=(event &&) -> event& = delete;
auto operator=(event&&) -> event& = delete;
/**
* @return True if this event is currently in the set state.

View file

@ -1,7 +1,9 @@
#pragma once
#include "coro/awaitable.hpp"
#include "coro/poll.hpp"
#include "coro/shutdown.hpp"
#include "coro/socket.hpp"
#include "coro/task.hpp"
#include <atomic>
@ -133,7 +135,7 @@ public:
resume_token(const resume_token&) = delete;
resume_token(resume_token&&) = default;
auto operator=(const resume_token&) -> resume_token& = delete;
auto operator=(resume_token &&) -> resume_token& = default;
auto operator=(resume_token&&) -> resume_token& = default;
auto resume(return_type value) noexcept -> void;
@ -158,7 +160,7 @@ public:
resume_token(const resume_token&) = delete;
resume_token(resume_token&&) = default;
auto operator=(const resume_token&) -> resume_token& = delete;
auto operator=(resume_token &&) -> resume_token& = default;
auto operator=(resume_token&&) -> resume_token& = default;
auto resume() noexcept -> void;
};
@ -429,7 +431,7 @@ public:
io_scheduler(const io_scheduler&) = delete;
io_scheduler(io_scheduler&&) = delete;
auto operator=(const io_scheduler&) -> io_scheduler& = delete;
auto operator=(io_scheduler &&) -> io_scheduler& = delete;
auto operator=(io_scheduler&&) -> io_scheduler& = delete;
virtual ~io_scheduler()
{
@ -650,16 +652,28 @@ public:
}
}
auto read(
const coro::socket& sock,
std::span<char> buffer,
std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task<std::pair<poll_status, ssize_t>>
{
return read(sock.native_handle(), buffer, timeout);
}
/**
* This function will first poll the given `fd` to make sure it can be written to. Once notified
* that the `fd` can be written to the given buffer's contents are written to the `fd`.
* @param fd The file descriptor to write the contents of `buffer` to.
* @param buffer The data to write to `fd`.
* @param timeout The timeout for the write operation, if timeout <= 0 then write will block
* indefinitely until the event is triggered.
* @return The number of bytes written or an error code if negative.
*/
auto write(fd_t fd, const std::span<const char> buffer) -> coro::task<std::pair<poll_status, ssize_t>>
auto write(
fd_t fd, const std::span<const char> buffer, std::chrono::milliseconds timeout = std::chrono::milliseconds{0})
-> coro::task<std::pair<poll_status, ssize_t>>
{
auto status = co_await poll(fd, poll_op::write);
auto status = co_await poll(fd, poll_op::write, timeout);
switch (status)
{
case poll_status::event:
@ -669,6 +683,14 @@ public:
}
}
auto write(
const coro::socket& sock,
const std::span<const char> buffer,
std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task<std::pair<poll_status, ssize_t>>
{
return write(sock.native_handle(), buffer, timeout);
}
/**
* Immediately yields the current task and places it at the end of the queue of tasks waiting
* to be processed. This will immediately be picked up again once it naturally goes through the
@ -1027,7 +1049,8 @@ private:
bool expected{true};
while (!m_event_set.compare_exchange_weak(
expected, false, std::memory_order::release, std::memory_order::relaxed))
{}
{
}
tasks_ready = true;
}

View file

@ -14,7 +14,7 @@ public:
latch(const latch&) = delete;
latch(latch&&) = delete;
auto operator=(const latch&) -> latch& = delete;
auto operator=(latch &&) -> latch& = delete;
auto operator=(latch&&) -> latch& = delete;
auto is_ready() const noexcept -> bool { return m_event.is_set(); }

View file

@ -4,6 +4,8 @@
#include <arpa/inet.h>
#include <fcntl.h>
#include <span>
#include <unistd.h>
#include <utility>
#include <iostream>
@ -74,7 +76,7 @@ public:
if (opts.blocking == blocking_t::no)
{
if (!s.blocking(blocking_t::no))
if (s.blocking(blocking_t::no) == false)
{
throw std::runtime_error{"Failed to set socket to non-blocking mode."};
}
@ -98,9 +100,13 @@ public:
}
sockaddr_in server{};
server.sin_family = domain_to_os(opts.domain);
server.sin_addr.s_addr = htonl(inet_addr(address.data()));
server.sin_port = htons(port);
server.sin_family = domain_to_os(opts.domain);
server.sin_port = htons(port);
if (inet_pton(server.sin_family, address.data(), &server.sin_addr) <= 0)
{
throw std::runtime_error{"Failed to translate IP Address."};
}
if (bind(s.native_handle(), (struct sockaddr*)&server, sizeof(server)) < 0)
{

View file

@ -16,8 +16,8 @@ public:
sync_wait_event(const sync_wait_event&) = delete;
sync_wait_event(sync_wait_event&&) = delete;
auto operator=(const sync_wait_event&) -> sync_wait_event& = delete;
auto operator=(sync_wait_event &&) -> sync_wait_event& = delete;
~sync_wait_event() = default;
auto operator=(sync_wait_event&&) -> sync_wait_event& = delete;
~sync_wait_event() = default;
auto set() noexcept -> void;
auto reset() noexcept -> void;

42
inc/coro/tcp_client.hpp Normal file
View file

@ -0,0 +1,42 @@
#pragma once
#include "coro/connect.hpp"
#include "coro/poll.hpp"
#include "coro/socket.hpp"
#include "coro/task.hpp"
#include <chrono>
namespace coro
{
class io_scheduler;
class tcp_client
{
public:
struct options
{
std::string address{"127.0.0.1"};
int16_t port{8080};
socket::domain_t domain{socket::domain_t::ipv4};
};
tcp_client(io_scheduler& scheduler, options opts = options{"127.0.0.1", 8080, socket::domain_t::ipv4});
tcp_client(const tcp_client&) = delete;
tcp_client(tcp_client&&) = default;
auto operator=(const tcp_client&) noexcept -> tcp_client& = delete;
auto operator=(tcp_client&&) noexcept -> tcp_client& = default;
~tcp_client() = default;
auto connect(std::chrono::milliseconds timeout = std::chrono::milliseconds{0}) -> coro::task<connect_status>;
auto socket() const -> const coro::socket& { return m_socket; }
auto socket() -> coro::socket& { return m_socket; }
private:
io_scheduler& m_io_scheduler;
options m_options;
coro::socket m_socket;
};
} // namespace coro

View file

@ -53,9 +53,18 @@ public:
tcp_scheduler(const tcp_scheduler&) = delete;
tcp_scheduler(tcp_scheduler&&) = delete;
auto operator=(const tcp_scheduler&) -> tcp_scheduler& = delete;
auto operator=(tcp_scheduler &&) -> tcp_scheduler& = delete;
auto operator=(tcp_scheduler&&) -> tcp_scheduler& = delete;
~tcp_scheduler() override = default;
~tcp_scheduler() override { shutdown(); }
auto empty() const -> bool { return size() == 0; }
auto size() const -> size_t
{
// Take one off for the accept task so the user doesn't have to account for the hidden task.
auto size = io_scheduler::size();
return (size > 0) ? size - 1 : 0;
}
auto shutdown(shutdown_t wait_for_tasks = shutdown_t::sync) -> void override
{
@ -63,9 +72,9 @@ public:
{
m_accept_socket.shutdown(); // wake it up by shutting down read/write operations.
while (m_accept_task_exited.load(std::memory_order_acquire) == false)
while (m_accept_task_exited.load(std::memory_order::acquire) == false)
{
std::this_thread::sleep_for(std::chrono::milliseconds{10});
std::this_thread::sleep_for(std::chrono::milliseconds{1});
}
io_scheduler::shutdown(wait_for_tasks);
@ -75,6 +84,7 @@ public:
private:
options m_opts;
/// Should the accept task continue accepting new connections?
std::atomic<bool> m_accept_new_connections{true};
std::atomic<bool> m_accept_task_exited{false};
socket m_accept_socket{-1};

View file

@ -82,7 +82,7 @@ public:
thread_pool(const thread_pool&) = delete;
thread_pool(thread_pool&&) = delete;
auto operator=(const thread_pool&) -> thread_pool& = delete;
auto operator=(thread_pool &&) -> thread_pool& = delete;
auto operator=(thread_pool&&) -> thread_pool& = delete;
~thread_pool();

View file

@ -103,7 +103,7 @@ public:
}
auto operator=(const when_all_ready_awaitable&) -> when_all_ready_awaitable& = delete;
auto operator=(when_all_ready_awaitable &&) -> when_all_ready_awaitable& = delete;
auto operator=(when_all_ready_awaitable&&) -> when_all_ready_awaitable& = delete;
auto operator co_await() & noexcept
{
@ -379,7 +379,7 @@ public:
}
auto operator=(const when_all_task&) -> when_all_task& = delete;
auto operator=(when_all_task &&) -> when_all_task& = delete;
auto operator=(when_all_task&&) -> when_all_task& = delete;
~when_all_task()
{
@ -435,7 +435,7 @@ private:
};
template<awaitable awaitable, typename return_type = awaitable_traits<awaitable&&>::awaiter_return_type>
static auto make_when_all_task(awaitable a) -> when_all_task<return_type>
static auto make_when_all_task(awaitable&& a) -> when_all_task<return_type>
{
if constexpr (std::is_void_v<return_type>)
{

25
src/connect.cpp Normal file
View file

@ -0,0 +1,25 @@
#include "coro/connect.hpp"
namespace coro
{
static std::string connect_status_connected{"connected"};
static std::string connect_status_invalid_ip_address{"invalid_ip_address"};
static std::string connect_status_timeout{"timeout"};
static std::string connect_status_error{"error"};
auto to_string(const connect_status& status) -> const std::string&
{
switch (status)
{
case connect_status::connected:
return connect_status_connected;
case connect_status::invalid_ip_address:
return connect_status_invalid_ip_address;
case connect_status::timeout:
return connect_status_timeout;
case connect_status::error:
return connect_status_error;
}
}
} // namespace coro

62
src/tcp_client.cpp Normal file
View file

@ -0,0 +1,62 @@
#include "coro/tcp_client.hpp"
#include "coro/io_scheduler.hpp"
namespace coro
{
tcp_client::tcp_client(io_scheduler& scheduler, options opts)
: m_io_scheduler(scheduler),
m_options(std::move(opts)),
m_socket(socket::make_socket(socket::options{m_options.domain, socket::type_t::tcp, socket::blocking_t::yes}))
{
}
auto tcp_client::connect(std::chrono::milliseconds timeout) -> coro::task<connect_status>
{
sockaddr_in server{};
server.sin_family = socket::domain_to_os(m_options.domain);
server.sin_port = htons(m_options.port);
if (inet_pton(server.sin_family, m_options.address.data(), &server.sin_addr) <= 0)
{
co_return connect_status::invalid_ip_address;
}
auto cret = ::connect(m_socket.native_handle(), (struct sockaddr*)&server, sizeof(server));
if (cret == 0)
{
// Immediate connect.
co_return connect_status::connected;
}
else if (cret == -1)
{
// If the connect is happening in the background poll for write on the socket to trigger
// when the connection is established.
if (errno == EAGAIN || errno == EINPROGRESS)
{
auto pstatus = co_await m_io_scheduler.poll(m_socket.native_handle(), poll_op::write, timeout);
if (pstatus == poll_status::event)
{
int result{0};
socklen_t result_length{sizeof(result)};
if (getsockopt(m_socket.native_handle(), SOL_SOCKET, SO_ERROR, &result, &result_length) < 0)
{
std::cerr << "connect failed to getsockopt after write poll event\n";
}
if (result == 0)
{
// success, connected
co_return connect_status::connected;
}
}
else if (pstatus == poll_status::timeout)
{
co_return connect_status::timeout;
}
}
}
co_return connect_status::error;
}
} // namespace coro

View file

@ -7,64 +7,70 @@ TEST_CASE("tcp_scheduler no on connection throws")
REQUIRE_THROWS(coro::tcp_scheduler{coro::tcp_scheduler::options{.on_connection = nullptr}});
}
TEST_CASE("tcp_scheduler ping")
TEST_CASE("tcp_scheduler echo server")
{
std::string msg{"Hello from client"};
const std::string msg{"Hello from client"};
auto on_connection = [&](coro::tcp_scheduler& tcp, coro::socket sock) -> coro::task<void> {
/*auto status =*/co_await tcp.poll(sock.native_handle(), coro::poll_op::read);
/*REQUIRE(status == coro::poll_status::success);*/
auto on_connection = [&msg](coro::tcp_scheduler& scheduler, coro::socket sock) -> coro::task<void> {
std::string in(64, '\0');
std::string in{};
in.resize(2048, '\0');
auto read_bytes = sock.recv(std::span<char>{in.data(), in.size()});
REQUIRE(read_bytes == msg.length());
in.resize(read_bytes);
auto [rstatus, rbytes] = co_await scheduler.read(sock, std::span<char>{in.data(), in.size()});
REQUIRE(rstatus == coro::poll_status::event);
in.resize(rbytes);
REQUIRE(in == msg);
/*status =*/co_await tcp.poll(sock.native_handle(), coro::poll_op::write);
/*REQUIRE(status == coro::poll_status::success);*/
auto written_bytes = sock.send(std::span<const char>(in.data(), in.length()));
REQUIRE(written_bytes == in.length());
auto [wstatus, wbytes] = co_await scheduler.write(sock, std::span<const char>(in.data(), in.length()));
REQUIRE(wstatus == coro::poll_status::event);
REQUIRE(wbytes == in.length());
co_return;
};
coro::tcp_scheduler tcp{coro::tcp_scheduler::options{
coro::tcp_scheduler scheduler{coro::tcp_scheduler::options{
.address = "0.0.0.0",
.port = 8080,
.backlog = 128,
.on_connection = on_connection,
.io_options = coro::io_scheduler::options{8, 2, coro::io_scheduler::thread_strategy_t::spawn}}};
.io_options = coro::io_scheduler::options{.thread_strategy = coro::io_scheduler::thread_strategy_t::spawn}}};
int client_socket = ::socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in server{};
server.sin_family = AF_INET;
server.sin_port = htons(8080);
auto make_client_task = [&scheduler, &msg]() -> coro::task<void> {
coro::tcp_client client{
scheduler,
coro::tcp_client::options{.address = "127.0.0.1", .port = 8080, .domain = coro::socket::domain_t::ipv4}};
if (inet_pton(AF_INET, "127.0.0.1", &server.sin_addr) <= 0)
auto cstatus = co_await client.connect();
REQUIRE(cstatus == coro::connect_status::connected);
auto [wstatus, wbytes] =
co_await scheduler.write(client.socket(), std::span<const char>{msg.data(), msg.length()});
REQUIRE(wstatus == coro::poll_status::event);
REQUIRE(wbytes == msg.length());
std::string response(64, '\0');
auto [rstatus, rbytes] =
co_await scheduler.read(client.socket(), std::span<char>{response.data(), response.length()});
REQUIRE(rstatus == coro::poll_status::event);
REQUIRE(rbytes == msg.length());
response.resize(rbytes);
REQUIRE(response == msg);
co_return;
};
scheduler.schedule(make_client_task());
// Shutting down the scheduler will cause it to stop accepting new connections, to avoid requiring
// another scheduler for this test the main thread can spin sleep until the tcp scheduler reports
// that it is empty. tcp schedulers do not report their accept task as a task in its size/empty count.
while (!scheduler.empty())
{
perror("failed to set sin_addr=127.0.0.1");
REQUIRE(false);
std::this_thread::sleep_for(std::chrono::milliseconds{1});
}
if (connect(client_socket, (struct sockaddr*)&server, sizeof(server)) < 0)
{
perror("Failed to connect to tcp scheduler server");
REQUIRE(false);
}
::send(client_socket, msg.data(), msg.length(), 0);
std::string response{};
response.resize(256, '\0');
auto bytes_recv = ::recv(client_socket, response.data(), response.length(), 0);
REQUIRE(bytes_recv == msg.length());
response.resize(bytes_recv);
REQUIRE(response == msg);
tcp.shutdown();
REQUIRE(tcp.empty());
close(client_socket);
scheduler.shutdown();
REQUIRE(scheduler.empty());
}