1
0
Fork 0
mirror of https://gitlab.com/niansa/libcrosscoro.git synced 2025-03-06 20:53:32 +01:00

coro::mutex remove std::mutex requirement, only 1 atomic with CAS (#71)

This commit is contained in:
Josh Baldwin 2021-02-27 14:46:37 -07:00 committed by GitHub
parent fab634154f
commit 19d626c1fb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 86 additions and 57 deletions

View file

@ -119,9 +119,11 @@ latch task dependency tasks completed, resuming.
```
### mutex
The `coro::mutex` is a thread safe async tool to protect critical sections and only allow a single thread to execute the critical section at any given time. Mutexes that are uncontended are a simple CAS operation with a memory fence 'acquire' to behave similar to `std::mutex`. If the lock is contended then the thread will add itself to a FIFO queue of waiters and yield excution to allow another coroutine to process on that thread while it waits to acquire the lock.
The `coro::mutex` is a thread safe async tool to protect critical sections and only allow a single thread to execute the critical section at any given time. Mutexes that are uncontended are a simple CAS operation with a memory fence 'acquire' to behave similar to `std::mutex`. If the lock is contended then the thread will add itself to a LIFO queue of waiters and yield excution to allow another coroutine to process on that thread while it waits to acquire the lock.
Its important to note that upon releasing the mutex that thread will immediately start processing the next waiter in line for the `coro::mutex`, the mutex is only unlocked/released once all waiters have been processed. This guarantees fair execution in a FIFO manner, but it also means all coroutines that stack in the waiter queue will end up shifting to the single thread that is executing all waiting coroutines. It is possible to reschedule after the critical section onto a thread pool to re-distribute the work. Perhaps an auto-reschedule on a given thread pool is a good feature to implement in the future to prevent this behavior so the post critical section work in the coroutines is redistributed amongst all available thread pool threads.
Its important to note that upon releasing the mutex that thread unlocking the mutex will immediately start processing the next waiter in line for the `coro::mutex` (if there are any waiters), the mutex is only unlocked/released once all waiters have been processed. This guarantees fair execution in a reasonbly FIFO manner, but it also means all coroutines that stack in the waiter queue will end up shifting to the single thread that is executing all waiting coroutines. It is possible to manually reschedule after the critical section onto a thread pool to re-distribute the work if this is a concern in your use case.
The suspend waiter queue is LIFO, however the worker that current holds the mutex will periodically 'acquire' the current LIFO waiter list to process those waiters when its internal list becomes empty. This effectively resets the suspended waiter list to empty and the worker holding the mutex will work through the newly acquired LIFO queue of waiters. It would be possible to reverse this list to be as fair as possible, however not reversing the list should result is better throughput at possibly the cost of some latency for the first suspended waiters on the 'current' LIFO queue. Reversing the list, however, would introduce latency for all queue waiters since its done everytime the LIFO queue is swapped.
```C++
${EXAMPLE_CORO_MUTEX_CPP}
@ -131,9 +133,11 @@ Expected output, note that the output will vary from run to run based on how the
are scheduled and in what order they acquire the mutex lock:
```bash
$ ./examples/coro_mutex
1, 2, 3, 4, 5, 6, 7, 8, 10, 9, 12, 11, 13, 14, 15, 16, 17, 18, 19, 21, 22, 20, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 47, 48, 49, 46, 50, 51, 52, 53, 54, 55, 57, 58, 59, 56, 60, 62, 61, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100,
1, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 37, 36, 35, 40, 39, 38, 41, 42, 43, 44, 46, 47, 48, 45, 49, 50, 51, 52, 53, 54, 55, 57, 56, 59, 58, 61, 60, 62, 63, 65, 64, 67, 66, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 83, 82, 84, 85, 86, 87, 88, 89, 91, 90, 92, 93, 94, 95, 96, 97, 98, 99, 100,
```
Its very easy to see the LIFO 'atomic' queue in action in the beginning where 22->2 are immediately suspended waiting to acquire the mutex.
### shared_mutex
The `coro::shared_mutex` is a thread safe async tool to allow for multiple shared users at once but also exclusive access. The lock is acquired strictly in a FIFO manner in that if the lock is currenty held by shared users and an exclusive attempts to lock, the exclusive waiter will suspend until all the _current_ shared users finish using the lock. Any new users that attempt to lock the mutex in a shared state once there is an exclusive waiter will also wait behind the exclusive waiter. This prevents the exclusive waiter from being starved.

View file

@ -302,9 +302,11 @@ latch task dependency tasks completed, resuming.
```
### mutex
The `coro::mutex` is a thread safe async tool to protect critical sections and only allow a single thread to execute the critical section at any given time. Mutexes that are uncontended are a simple CAS operation with a memory fence 'acquire' to behave similar to `std::mutex`. If the lock is contended then the thread will add itself to a FIFO queue of waiters and yield excution to allow another coroutine to process on that thread while it waits to acquire the lock.
The `coro::mutex` is a thread safe async tool to protect critical sections and only allow a single thread to execute the critical section at any given time. Mutexes that are uncontended are a simple CAS operation with a memory fence 'acquire' to behave similar to `std::mutex`. If the lock is contended then the thread will add itself to a LIFO queue of waiters and yield excution to allow another coroutine to process on that thread while it waits to acquire the lock.
Its important to note that upon releasing the mutex that thread will immediately start processing the next waiter in line for the `coro::mutex`, the mutex is only unlocked/released once all waiters have been processed. This guarantees fair execution in a FIFO manner, but it also means all coroutines that stack in the waiter queue will end up shifting to the single thread that is executing all waiting coroutines. It is possible to reschedule after the critical section onto a thread pool to re-distribute the work. Perhaps an auto-reschedule on a given thread pool is a good feature to implement in the future to prevent this behavior so the post critical section work in the coroutines is redistributed amongst all available thread pool threads.
Its important to note that upon releasing the mutex that thread unlocking the mutex will immediately start processing the next waiter in line for the `coro::mutex` (if there are any waiters), the mutex is only unlocked/released once all waiters have been processed. This guarantees fair execution in a reasonbly FIFO manner, but it also means all coroutines that stack in the waiter queue will end up shifting to the single thread that is executing all waiting coroutines. It is possible to manually reschedule after the critical section onto a thread pool to re-distribute the work if this is a concern in your use case.
The suspend waiter queue is LIFO, however the worker that current holds the mutex will periodically 'acquire' the current LIFO waiter list to process those waiters when its internal list becomes empty. This effectively resets the suspended waiter list to empty and the worker holding the mutex will work through the newly acquired LIFO queue of waiters. It would be possible to reverse this list to be as fair as possible, however not reversing the list should result is better throughput at possibly the cost of some latency for the first suspended waiters on the 'current' LIFO queue. Reversing the list, however, would introduce latency for all queue waiters since its done everytime the LIFO queue is swapped.
```C++
#include <coro/coro.hpp>
@ -351,9 +353,11 @@ Expected output, note that the output will vary from run to run based on how the
are scheduled and in what order they acquire the mutex lock:
```bash
$ ./examples/coro_mutex
1, 2, 3, 4, 5, 6, 7, 8, 10, 9, 12, 11, 13, 14, 15, 16, 17, 18, 19, 21, 22, 20, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 47, 48, 49, 46, 50, 51, 52, 53, 54, 55, 57, 58, 59, 56, 60, 62, 61, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100,
1, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 37, 36, 35, 40, 39, 38, 41, 42, 43, 44, 46, 47, 48, 45, 49, 50, 51, 52, 53, 54, 55, 57, 56, 59, 58, 61, 60, 62, 63, 65, 64, 67, 66, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 83, 82, 84, 85, 86, 87, 88, 89, 91, 90, 92, 93, 94, 95, 96, 97, 98, 99, 100,
```
Its very easy to see the LIFO 'atomic' queue in action in the beginning where 22->2 are immediately suspended waiting to acquire the mutex.
### shared_mutex
The `coro::shared_mutex` is a thread safe async tool to allow for multiple shared users at once but also exclusive access. The lock is acquired strictly in a FIFO manner in that if the lock is currenty held by shared users and an exclusive attempts to lock, the exclusive waiter will suspend until all the _current_ shared users finish using the lock. Any new users that attempt to lock the mutex in a shared state once there is an exclusive waiter will also wait behind the exclusive waiter. This prevents the exclusive waiter from being starved.

View file

@ -61,8 +61,12 @@ private:
class mutex
{
public:
explicit mutex() noexcept = default;
~mutex() = default;
explicit mutex() noexcept
: m_unlocked_value(&m_state), // This address is guaranteed to be unique and un-used elsewhere.
m_state(const_cast<void*>(m_unlocked_value))
{
}
~mutex() = default;
mutex(const mutex&) = delete;
mutex(mutex&&) = delete;
@ -106,10 +110,18 @@ public:
private:
friend class lock_operation;
std::atomic<bool> m_state{false};
std::mutex m_waiter_mutex{};
lock_operation* m_head_waiter{nullptr};
lock_operation* m_tail_waiter{nullptr};
/// Inactive value, this cannot be nullptr since we want nullptr to signify that the mutex
/// is locked but there are zero waiters, this makes it easy to CAS new waiters into the
/// m_state linked list.
const void* m_unlocked_value;
/// unlocked -> state == m_unlocked_value
/// locked but empty waiter list == nullptr
/// locked with waiters == lock_operation*
std::atomic<void*> m_state;
/// A list of grabbed internal waiters that are only accessed by the unlock()'er.
lock_operation* m_internal_waiters{nullptr};
};
} // namespace coro

View file

@ -1,5 +1,7 @@
#include "coro/mutex.hpp"
#include <iostream>
namespace coro
{
scoped_lock::~scoped_lock()
@ -12,6 +14,7 @@ auto scoped_lock::unlock() -> void
if (m_mutex != nullptr)
{
m_mutex->unlock();
// Only allow a scoped lock to unlock the mutex a single time.
m_mutex = nullptr;
}
}
@ -29,28 +32,29 @@ auto mutex::lock_operation::await_ready() const noexcept -> bool
auto mutex::lock_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
{
std::scoped_lock lk{m_mutex.m_waiter_mutex};
if (m_mutex.try_lock())
{
// If we just straight up acquire the lock, don't suspend. This is necessary because its
// possible the lock is released between await_ready() and await_suspend() and suspending
// when the lock isn't held would be bad.
return false;
}
void* current = m_mutex.m_state.load(std::memory_order::acquire);
void* new_value;
// The lock is currently held, so append ourself to the waiter list.
if (m_mutex.m_tail_waiter == nullptr)
do
{
// If there are no current waiters this lock operation is the head and tail.
m_mutex.m_head_waiter = this;
m_mutex.m_tail_waiter = this;
}
else
if (current == m_mutex.m_unlocked_value)
{
// If the current value is 'unlocked' then attempt to lock it.
new_value = nullptr;
}
else
{
// If the current value is a waiting lock operation, or nullptr, set our next to that
// lock op and attempt to set ourself as the head of the waiter list.
m_next = static_cast<lock_operation*>(current);
new_value = static_cast<void*>(this);
}
} while (!m_mutex.m_state.compare_exchange_weak(current, new_value, std::memory_order::acq_rel));
// Don't suspend if the state went from unlocked -> locked with zero waiters.
if (current == m_mutex.m_unlocked_value)
{
// Update the current tail pointer to ourself.
m_mutex.m_tail_waiter->m_next = this;
// Update the tail pointer on the mutex to ourself.
m_mutex.m_tail_waiter = this;
return false;
}
m_awaiting_coroutine = awaiting_coroutine;
@ -59,43 +63,47 @@ auto mutex::lock_operation::await_suspend(std::coroutine_handle<> awaiting_corou
auto mutex::try_lock() -> bool
{
bool expected = false;
return m_state.compare_exchange_strong(expected, true, std::memory_order::release, std::memory_order::relaxed);
void* expected = const_cast<void*>(m_unlocked_value);
return m_state.compare_exchange_strong(expected, nullptr, std::memory_order::acq_rel, std::memory_order::relaxed);
}
auto mutex::unlock() -> void
{
// Acquire the next waiter before releasing _or_ moving ownship of the lock.
lock_operation* next{nullptr};
if (m_internal_waiters == nullptr)
{
std::scoped_lock lk{m_waiter_mutex};
if (m_head_waiter != nullptr)
void* current = m_state.load(std::memory_order::relaxed);
if (current == nullptr)
{
next = m_head_waiter;
m_head_waiter = m_head_waiter->m_next;
// Null out the tail waiter if this was the last waiter.
if (m_head_waiter == nullptr)
// If there are no internal waiters and there are no atomic waiters, attempt to set the
// mutex as unlocked.
if (m_state.compare_exchange_strong(
current,
const_cast<void*>(m_unlocked_value),
std::memory_order::release,
std::memory_order::relaxed))
{
m_tail_waiter = nullptr;
return; // The mutex is now unlocked with zero waiters.
}
// else we failed to unlock, someone added themself as a waiter.
}
else
{
// If there were no waiters, release the lock. This is done under the waiter list being
// locked so another thread doesn't add themselves to the waiter list before the lock
// is actually released. We can safely used relaxed here since m_waiter_mutex will
// perform the release memory order upon unlocking.
m_state.exchange(false, std::memory_order::relaxed);
}
// There are waiters on the atomic list, acquire them and update the state for all others.
m_internal_waiters = static_cast<lock_operation*>(m_state.exchange(nullptr, std::memory_order::acq_rel));
// Should internal waiters be reversed to allow for true FIFO, or should they be resumed
// in this reverse order to maximum throuhgput? If this list ever gets 'long' the reversal
// will take some time, but it might guarantee better latency across waiters. This LIFO
// middle ground on the atomic waiters means the best throughput at the cost of the first
// waiter possibly having added latency based on the queue length of waiters. Either way
// incurs a cost but this way for short lists will most likely be faster even though it
// isn't completely fair.
}
// If there were any waiters resume the next in line, this will pass ownership of the mutex to
// that waiter, only the final waiter in the list actually unlocks the mutex.
if (next != nullptr)
{
next->m_awaiting_coroutine.resume();
}
// assert m_internal_waiters != nullptr
lock_operation* to_resume = m_internal_waiters;
m_internal_waiters = m_internal_waiters->m_next;
to_resume->m_awaiting_coroutine.resume();
}
} // namespace coro

View file

@ -23,6 +23,7 @@ TEST_CASE("mutex single waiter not locked", "[mutex]")
// The scoped lock should release the lock upon destructing.
REQUIRE(m.try_lock());
REQUIRE_FALSE(m.try_lock());
m.unlock();
co_return;