1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

coro::when_all() always takes ownership (#63)

This commit is contained in:
Josh Baldwin 2021-02-17 10:50:51 -07:00 committed by GitHub
parent 4aee0dc6f8
commit 6a2f398f9a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 76 additions and 47 deletions

View file

@ -277,7 +277,7 @@ int main()
}
// Wait for all tasks to complete.
coro::sync_wait(coro::when_all(tasks));
coro::sync_wait(coro::when_all(std::move(tasks)));
}
```
@ -333,7 +333,7 @@ int main()
tasks.emplace_back(make_critical_section_task(i));
}
coro::sync_wait(coro::when_all(tasks));
coro::sync_wait(coro::when_all(std::move(tasks)));
// The output will be variable per run depending on how the tasks are picked up on the
// thread pool workers.
@ -419,7 +419,7 @@ int main()
}
// Wait for the thread pool workers to process all child tasks.
auto results = co_await coro::when_all(child_tasks);
auto results = co_await coro::when_all(std::move(child_tasks));
// Sum up the results of the completed child tasks.
size_t calculation{0};

View file

@ -50,5 +50,5 @@ int main()
}
// Wait for all tasks to complete.
coro::sync_wait(coro::when_all(tasks));
coro::sync_wait(coro::when_all(std::move(tasks)));
}

View file

@ -27,7 +27,7 @@ int main()
tasks.emplace_back(make_critical_section_task(i));
}
coro::sync_wait(coro::when_all(tasks));
coro::sync_wait(coro::when_all(std::move(tasks)));
// The output will be variable per run depending on how the tasks are picked up on the
// thread pool workers.

View file

@ -62,7 +62,7 @@ int main()
}
// Wait for the thread pool workers to process all child tasks.
auto results = co_await coro::when_all(child_tasks);
auto results = co_await coro::when_all(std::move(child_tasks));
// Sum up the results of the completed child tasks.
size_t calculation{0};

View file

@ -5,6 +5,7 @@
#include <atomic>
#include <coroutine>
#include <ranges>
#include <tuple>
#include <vector>
@ -438,26 +439,8 @@ private:
template<
concepts::awaitable awaitable,
typename return_type = concepts::awaitable_traits<awaitable&&>::awaiter_return_type>
static auto make_when_all_task(awaitable& a) -> when_all_task<return_type>
static auto make_when_all_task(awaitable a) -> when_all_task<return_type>
{
// Use this version if the awaitable can be taken as a reference (non-owning).
if constexpr (std::is_void_v<return_type>)
{
co_await static_cast<awaitable&&>(a);
co_return;
}
else
{
co_yield co_await static_cast<awaitable&&>(a);
}
}
template<
concepts::awaitable awaitable,
typename return_type = concepts::awaitable_traits<awaitable&&>::awaiter_return_type>
static auto make_when_all_task_owned(awaitable a) -> when_all_task<return_type>
{
// Use this version if the awaitable needs to be owned by this task.
if constexpr (std::is_void_v<return_type>)
{
co_await static_cast<awaitable&&>(a);
@ -476,25 +459,32 @@ template<concepts::awaitable... awaitables_type>
{
return detail::when_all_ready_awaitable<std::tuple<
detail::when_all_task<typename concepts::awaitable_traits<awaitables_type>::awaiter_return_type>...>>(
std::make_tuple(detail::make_when_all_task_owned(std::move(awaitables))...));
std::make_tuple(detail::make_when_all_task(std::move(awaitables))...));
}
template<
concepts::awaitable awaitable_type,
typename return_type = concepts::awaitable_traits<awaitable_type>::awaiter_return_type,
typename allocator_type = std::allocator<awaitable_type>>
[[nodiscard]] auto when_all(std::vector<awaitable_type, allocator_type>& awaitables)
std::ranges::range range_type,
concepts::awaitable awaitable_type = std::ranges::range_value_t<range_type>,
typename return_type = concepts::awaitable_traits<awaitable_type>::awaiter_return_type>
[[nodiscard]] auto when_all(range_type awaitables)
-> detail::when_all_ready_awaitable<std::vector<detail::when_all_task<return_type>>>
{
std::vector<detail::when_all_task<return_type>> tasks;
tasks.reserve(std::size(awaitables));
std::vector<detail::when_all_task<return_type>> output_tasks;
for (auto& a : awaitables)
// If the size is known in constant time reserve the output tasks size.
if constexpr (std::ranges::sized_range<range_type>)
{
tasks.emplace_back(detail::make_when_all_task(a));
output_tasks.reserve(std::size(awaitables));
}
return detail::when_all_ready_awaitable(std::move(tasks));
// Wrap each task into a when_all_task.
for (auto& a : awaitables)
{
output_tasks.emplace_back(detail::make_when_all_task(std::move(a)));
}
// Return the single awaitable that drives all the user's tasks.
return detail::when_all_ready_awaitable(std::move(output_tasks));
}
} // namespace coro

View file

@ -100,7 +100,7 @@ TEST_CASE("benchmark counter func coro::sync_wait(coro::when_all(vector<awaitabl
tasks.emplace_back(f());
}
auto results = coro::sync_wait(coro::when_all(tasks));
auto results = coro::sync_wait(coro::when_all(std::move(tasks)));
for (const auto& r : results)
{
@ -207,7 +207,7 @@ TEST_CASE("benchmark counter task scheduler{1} yield", "[benchmark]")
tasks.emplace_back(make_task());
}
coro::sync_wait(coro::when_all(tasks));
coro::sync_wait(coro::when_all(std::move(tasks)));
auto stop = sc::now();
print_stats("benchmark counter task scheduler{1} yield", ops, start, stop);
@ -240,7 +240,7 @@ TEST_CASE("benchmark counter task scheduler{1} yield_for", "[benchmark]")
tasks.emplace_back(make_task());
}
coro::sync_wait(coro::when_all(tasks));
coro::sync_wait(coro::when_all(std::move(tasks)));
auto stop = sc::now();
print_stats("benchmark counter task scheduler{1} yield", ops, start, stop);
@ -288,7 +288,7 @@ TEST_CASE("benchmark counter task scheduler await event from another coroutine",
tasks.emplace_back(resume_func(i));
}
coro::sync_wait(coro::when_all(tasks));
coro::sync_wait(coro::when_all(std::move(tasks)));
auto stop = sc::now();
print_stats("benchmark counter task scheduler await event from another coroutine", ops, start, stop);
@ -475,7 +475,7 @@ TEST_CASE("benchmark tcp_server echo server", "[benchmark]")
{
c.tasks.emplace_back(make_client_task(c));
}
coro::sync_wait(coro::when_all(c.tasks));
coro::sync_wait(coro::when_all(std::move(c.tasks)));
c.scheduler.shutdown();
}});
}

View file

@ -49,7 +49,7 @@ TEST_CASE("io_scheduler submit mutiple tasks", "[io_scheduler]")
tasks.emplace_back(make_task());
}
coro::sync_wait(coro::when_all(tasks));
coro::sync_wait(coro::when_all(std::move(tasks)));
REQUIRE(counter == n);
}
@ -489,7 +489,7 @@ TEST_CASE("io_scheduler multipler event waiters", "[io_scheduler]")
tasks.emplace_back(func());
}
auto results = co_await coro::when_all(tasks);
auto results = co_await coro::when_all(std::move(tasks));
uint64_t counter{0};
for (const auto& task : results)

View file

@ -89,7 +89,7 @@ TEST_CASE("mutex many waiters until event", "[mutex]")
tasks.emplace_back(make_set_task());
coro::sync_wait(coro::when_all(tasks));
coro::sync_wait(coro::when_all(std::move(tasks)));
REQUIRE(value == 4);
}

View file

@ -49,7 +49,7 @@ TEST_CASE("thread_pool one worker many tasks vector", "[thread_pool]")
input_tasks.emplace_back(f());
input_tasks.emplace_back(f());
auto output_tasks = coro::sync_wait(coro::when_all(input_tasks));
auto output_tasks = coro::sync_wait(coro::when_all(std::move(input_tasks)));
REQUIRE(output_tasks.size() == 3);
@ -79,7 +79,7 @@ TEST_CASE("thread_pool N workers 100k tasks", "[thread_pool]")
input_tasks.emplace_back(make_task(tp));
}
auto output_tasks = coro::sync_wait(coro::when_all(input_tasks));
auto output_tasks = coro::sync_wait(coro::when_all(std::move(input_tasks)));
REQUIRE(output_tasks.size() == iterations);
uint64_t counter{0};

View file

@ -2,6 +2,9 @@
#include <coro/coro.hpp>
#include <list>
#include <vector>
TEST_CASE("when_all single task with tuple container", "[when_all]")
{
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> { co_return amount; };
@ -15,6 +18,20 @@ TEST_CASE("when_all single task with tuple container", "[when_all]")
REQUIRE(counter == 100);
}
TEST_CASE("when_all single task with tuple container by move", "[when_all]")
{
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> { co_return amount; };
auto t = make_task(100);
auto output_tasks = coro::sync_wait(coro::when_all(std::move(t)));
REQUIRE(std::tuple_size<decltype(output_tasks)>() == 1);
uint64_t counter{0};
std::apply([&counter](auto&&... tasks) -> void { ((counter += tasks.return_value()), ...); }, output_tasks);
REQUIRE(counter == 100);
}
TEST_CASE("when_all multiple tasks with tuple container", "[when_all]")
{
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> { co_return amount; };
@ -35,7 +52,7 @@ TEST_CASE("when_all single task with vector container", "[when_all]")
std::vector<coro::task<uint64_t>> input_tasks;
input_tasks.emplace_back(make_task(100));
auto output_tasks = coro::sync_wait(coro::when_all(input_tasks));
auto output_tasks = coro::sync_wait(coro::when_all(std::move(input_tasks)));
REQUIRE(output_tasks.size() == 1);
uint64_t counter{0};
@ -57,7 +74,29 @@ TEST_CASE("when_all multple task withs vector container", "[when_all]")
input_tasks.emplace_back(make_task(550));
input_tasks.emplace_back(make_task(1000));
auto output_tasks = coro::sync_wait(coro::when_all(input_tasks));
auto output_tasks = coro::sync_wait(coro::when_all(std::move(input_tasks)));
REQUIRE(output_tasks.size() == 4);
uint64_t counter{0};
for (const auto& task : output_tasks)
{
counter += task.return_value();
}
REQUIRE(counter == 1850);
}
TEST_CASE("when_all multple task withs list container", "[when_all]")
{
auto make_task = [](uint64_t amount) -> coro::task<uint64_t> { co_return amount; };
std::list<coro::task<uint64_t>> input_tasks;
input_tasks.emplace_back(make_task(100));
input_tasks.emplace_back(make_task(200));
input_tasks.emplace_back(make_task(550));
input_tasks.emplace_back(make_task(1000));
auto output_tasks = coro::sync_wait(coro::when_all(std::move(input_tasks)));
REQUIRE(output_tasks.size() == 4);
uint64_t counter{0};