io_uring: batch task work locking
Many task_work handlers either grab ->uring_lock, or may benefit from having it. Move locking logic out of individual handlers to a lazy approach controlled by tctx_task_work(), so we don't keep doing tons of mutex lock/unlock. Signed-off-by: Pavel Begunkov <asml.silence@gmail.com> Link: https://lore.kernel.org/r/d6a34e147f2507a2f3e2fa1e38a9c541dcad3929.1629286357.git.asml.silence@gmail.com Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
parent
5636c00d3e
commit
f237c30a56
1 changed files with 49 additions and 31 deletions
|
@ -775,7 +775,7 @@ struct async_poll {
|
||||||
struct io_poll_iocb *double_poll;
|
struct io_poll_iocb *double_poll;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef void (*io_req_tw_func_t)(struct io_kiocb *req);
|
typedef void (*io_req_tw_func_t)(struct io_kiocb *req, bool *locked);
|
||||||
|
|
||||||
struct io_task_work {
|
struct io_task_work {
|
||||||
union {
|
union {
|
||||||
|
@ -1080,6 +1080,14 @@ struct sock *io_uring_get_socket(struct file *file)
|
||||||
}
|
}
|
||||||
EXPORT_SYMBOL(io_uring_get_socket);
|
EXPORT_SYMBOL(io_uring_get_socket);
|
||||||
|
|
||||||
|
static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked)
|
||||||
|
{
|
||||||
|
if (!*locked) {
|
||||||
|
mutex_lock(&ctx->uring_lock);
|
||||||
|
*locked = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#define io_for_each_link(pos, head) \
|
#define io_for_each_link(pos, head) \
|
||||||
for (pos = (head); pos; pos = pos->link)
|
for (pos = (head); pos; pos = pos->link)
|
||||||
|
|
||||||
|
@ -1193,16 +1201,19 @@ static void io_fallback_req_func(struct work_struct *work)
|
||||||
fallback_work.work);
|
fallback_work.work);
|
||||||
struct llist_node *node = llist_del_all(&ctx->fallback_llist);
|
struct llist_node *node = llist_del_all(&ctx->fallback_llist);
|
||||||
struct io_kiocb *req, *tmp;
|
struct io_kiocb *req, *tmp;
|
||||||
|
bool locked = false;
|
||||||
|
|
||||||
percpu_ref_get(&ctx->refs);
|
percpu_ref_get(&ctx->refs);
|
||||||
llist_for_each_entry_safe(req, tmp, node, io_task_work.fallback_node)
|
llist_for_each_entry_safe(req, tmp, node, io_task_work.fallback_node)
|
||||||
req->io_task_work.func(req);
|
req->io_task_work.func(req, &locked);
|
||||||
|
|
||||||
mutex_lock(&ctx->uring_lock);
|
if (locked) {
|
||||||
if (ctx->submit_state.compl_nr)
|
if (ctx->submit_state.compl_nr)
|
||||||
io_submit_flush_completions(ctx);
|
io_submit_flush_completions(ctx);
|
||||||
mutex_unlock(&ctx->uring_lock);
|
mutex_unlock(&ctx->uring_lock);
|
||||||
|
}
|
||||||
percpu_ref_put(&ctx->refs);
|
percpu_ref_put(&ctx->refs);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
|
static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
|
||||||
|
@ -1386,12 +1397,15 @@ static void io_prep_async_link(struct io_kiocb *req)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_queue_async_work(struct io_kiocb *req)
|
static void io_queue_async_work(struct io_kiocb *req, bool *locked)
|
||||||
{
|
{
|
||||||
struct io_ring_ctx *ctx = req->ctx;
|
struct io_ring_ctx *ctx = req->ctx;
|
||||||
struct io_kiocb *link = io_prep_linked_timeout(req);
|
struct io_kiocb *link = io_prep_linked_timeout(req);
|
||||||
struct io_uring_task *tctx = req->task->io_uring;
|
struct io_uring_task *tctx = req->task->io_uring;
|
||||||
|
|
||||||
|
/* must not take the lock, NULL it as a precaution */
|
||||||
|
locked = NULL;
|
||||||
|
|
||||||
BUG_ON(!tctx);
|
BUG_ON(!tctx);
|
||||||
BUG_ON(!tctx->io_wq);
|
BUG_ON(!tctx->io_wq);
|
||||||
|
|
||||||
|
@ -2013,21 +2027,22 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
|
||||||
return __io_req_find_next(req);
|
return __io_req_find_next(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void ctx_flush_and_put(struct io_ring_ctx *ctx)
|
static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
|
||||||
{
|
{
|
||||||
if (!ctx)
|
if (!ctx)
|
||||||
return;
|
return;
|
||||||
if (ctx->submit_state.compl_nr) {
|
if (*locked) {
|
||||||
mutex_lock(&ctx->uring_lock);
|
|
||||||
if (ctx->submit_state.compl_nr)
|
if (ctx->submit_state.compl_nr)
|
||||||
io_submit_flush_completions(ctx);
|
io_submit_flush_completions(ctx);
|
||||||
mutex_unlock(&ctx->uring_lock);
|
mutex_unlock(&ctx->uring_lock);
|
||||||
|
*locked = false;
|
||||||
}
|
}
|
||||||
percpu_ref_put(&ctx->refs);
|
percpu_ref_put(&ctx->refs);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tctx_task_work(struct callback_head *cb)
|
static void tctx_task_work(struct callback_head *cb)
|
||||||
{
|
{
|
||||||
|
bool locked = false;
|
||||||
struct io_ring_ctx *ctx = NULL;
|
struct io_ring_ctx *ctx = NULL;
|
||||||
struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
|
struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
|
||||||
task_work);
|
task_work);
|
||||||
|
@ -2050,18 +2065,18 @@ static void tctx_task_work(struct callback_head *cb)
|
||||||
io_task_work.node);
|
io_task_work.node);
|
||||||
|
|
||||||
if (req->ctx != ctx) {
|
if (req->ctx != ctx) {
|
||||||
ctx_flush_and_put(ctx);
|
ctx_flush_and_put(ctx, &locked);
|
||||||
ctx = req->ctx;
|
ctx = req->ctx;
|
||||||
percpu_ref_get(&ctx->refs);
|
percpu_ref_get(&ctx->refs);
|
||||||
}
|
}
|
||||||
req->io_task_work.func(req);
|
req->io_task_work.func(req, &locked);
|
||||||
node = next;
|
node = next;
|
||||||
} while (node);
|
} while (node);
|
||||||
|
|
||||||
cond_resched();
|
cond_resched();
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx_flush_and_put(ctx);
|
ctx_flush_and_put(ctx, &locked);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_req_task_work_add(struct io_kiocb *req)
|
static void io_req_task_work_add(struct io_kiocb *req)
|
||||||
|
@ -2113,28 +2128,26 @@ static void io_req_task_work_add(struct io_kiocb *req)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_req_task_cancel(struct io_kiocb *req)
|
static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
|
||||||
{
|
{
|
||||||
struct io_ring_ctx *ctx = req->ctx;
|
struct io_ring_ctx *ctx = req->ctx;
|
||||||
|
|
||||||
/* ctx is guaranteed to stay alive while we hold uring_lock */
|
/* ctx is guaranteed to stay alive while we hold uring_lock */
|
||||||
mutex_lock(&ctx->uring_lock);
|
io_tw_lock(ctx, locked);
|
||||||
io_req_complete_failed(req, req->result);
|
io_req_complete_failed(req, req->result);
|
||||||
mutex_unlock(&ctx->uring_lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_req_task_submit(struct io_kiocb *req)
|
static void io_req_task_submit(struct io_kiocb *req, bool *locked)
|
||||||
{
|
{
|
||||||
struct io_ring_ctx *ctx = req->ctx;
|
struct io_ring_ctx *ctx = req->ctx;
|
||||||
|
|
||||||
/* ctx stays valid until unlock, even if we drop all ours ctx->refs */
|
/* ctx stays valid until unlock, even if we drop all ours ctx->refs */
|
||||||
mutex_lock(&ctx->uring_lock);
|
io_tw_lock(ctx, locked);
|
||||||
/* req->task == current here, checking PF_EXITING is safe */
|
/* req->task == current here, checking PF_EXITING is safe */
|
||||||
if (likely(!(req->task->flags & PF_EXITING)))
|
if (likely(!(req->task->flags & PF_EXITING)))
|
||||||
__io_queue_sqe(req);
|
__io_queue_sqe(req);
|
||||||
else
|
else
|
||||||
io_req_complete_failed(req, -EFAULT);
|
io_req_complete_failed(req, -EFAULT);
|
||||||
mutex_unlock(&ctx->uring_lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_req_task_queue_fail(struct io_kiocb *req, int ret)
|
static void io_req_task_queue_fail(struct io_kiocb *req, int ret)
|
||||||
|
@ -2170,6 +2183,11 @@ static void io_free_req(struct io_kiocb *req)
|
||||||
__io_free_req(req);
|
__io_free_req(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void io_free_req_work(struct io_kiocb *req, bool *locked)
|
||||||
|
{
|
||||||
|
io_free_req(req);
|
||||||
|
}
|
||||||
|
|
||||||
struct req_batch {
|
struct req_batch {
|
||||||
struct task_struct *task;
|
struct task_struct *task;
|
||||||
int task_refs;
|
int task_refs;
|
||||||
|
@ -2267,7 +2285,7 @@ static inline void io_put_req(struct io_kiocb *req)
|
||||||
static inline void io_put_req_deferred(struct io_kiocb *req)
|
static inline void io_put_req_deferred(struct io_kiocb *req)
|
||||||
{
|
{
|
||||||
if (req_ref_put_and_test(req)) {
|
if (req_ref_put_and_test(req)) {
|
||||||
req->io_task_work.func = io_free_req;
|
req->io_task_work.func = io_free_req_work;
|
||||||
io_req_task_work_add(req);
|
io_req_task_work_add(req);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2562,7 +2580,7 @@ static bool __io_complete_rw_common(struct io_kiocb *req, long res)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_req_task_complete(struct io_kiocb *req)
|
static void io_req_task_complete(struct io_kiocb *req, bool *locked)
|
||||||
{
|
{
|
||||||
__io_req_complete(req, 0, req->result, io_put_rw_kbuf(req));
|
__io_req_complete(req, 0, req->result, io_put_rw_kbuf(req));
|
||||||
}
|
}
|
||||||
|
@ -2572,7 +2590,7 @@ static void __io_complete_rw(struct io_kiocb *req, long res, long res2,
|
||||||
{
|
{
|
||||||
if (__io_complete_rw_common(req, res))
|
if (__io_complete_rw_common(req, res))
|
||||||
return;
|
return;
|
||||||
io_req_task_complete(req);
|
__io_req_complete(req, 0, req->result, io_put_rw_kbuf(req));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
|
static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
|
||||||
|
@ -4994,7 +5012,7 @@ static bool io_poll_complete(struct io_kiocb *req, __poll_t mask)
|
||||||
return !(flags & IORING_CQE_F_MORE);
|
return !(flags & IORING_CQE_F_MORE);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_poll_task_func(struct io_kiocb *req)
|
static void io_poll_task_func(struct io_kiocb *req, bool *locked)
|
||||||
{
|
{
|
||||||
struct io_ring_ctx *ctx = req->ctx;
|
struct io_ring_ctx *ctx = req->ctx;
|
||||||
struct io_kiocb *nxt;
|
struct io_kiocb *nxt;
|
||||||
|
@ -5018,7 +5036,7 @@ static void io_poll_task_func(struct io_kiocb *req)
|
||||||
if (done) {
|
if (done) {
|
||||||
nxt = io_put_req_find_next(req);
|
nxt = io_put_req_find_next(req);
|
||||||
if (nxt)
|
if (nxt)
|
||||||
io_req_task_submit(nxt);
|
io_req_task_submit(nxt, locked);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5130,7 +5148,7 @@ static void io_async_queue_proc(struct file *file, struct wait_queue_head *head,
|
||||||
__io_queue_proc(&apoll->poll, pt, head, &apoll->double_poll);
|
__io_queue_proc(&apoll->poll, pt, head, &apoll->double_poll);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_async_task_func(struct io_kiocb *req)
|
static void io_async_task_func(struct io_kiocb *req, bool *locked)
|
||||||
{
|
{
|
||||||
struct async_poll *apoll = req->apoll;
|
struct async_poll *apoll = req->apoll;
|
||||||
struct io_ring_ctx *ctx = req->ctx;
|
struct io_ring_ctx *ctx = req->ctx;
|
||||||
|
@ -5147,7 +5165,7 @@ static void io_async_task_func(struct io_kiocb *req)
|
||||||
spin_unlock(&ctx->completion_lock);
|
spin_unlock(&ctx->completion_lock);
|
||||||
|
|
||||||
if (!READ_ONCE(apoll->poll.canceled))
|
if (!READ_ONCE(apoll->poll.canceled))
|
||||||
io_req_task_submit(req);
|
io_req_task_submit(req, locked);
|
||||||
else
|
else
|
||||||
io_req_complete_failed(req, -ECANCELED);
|
io_req_complete_failed(req, -ECANCELED);
|
||||||
}
|
}
|
||||||
|
@ -5546,7 +5564,7 @@ err:
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_req_task_timeout(struct io_kiocb *req)
|
static void io_req_task_timeout(struct io_kiocb *req, bool *locked)
|
||||||
{
|
{
|
||||||
req_set_fail(req);
|
req_set_fail(req);
|
||||||
io_req_complete_post(req, -ETIME, 0);
|
io_req_complete_post(req, -ETIME, 0);
|
||||||
|
@ -6103,7 +6121,7 @@ fail:
|
||||||
if (!req_need_defer(req, seq) && list_empty(&ctx->defer_list)) {
|
if (!req_need_defer(req, seq) && list_empty(&ctx->defer_list)) {
|
||||||
spin_unlock(&ctx->completion_lock);
|
spin_unlock(&ctx->completion_lock);
|
||||||
kfree(de);
|
kfree(de);
|
||||||
io_queue_async_work(req);
|
io_queue_async_work(req, NULL);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6425,7 +6443,7 @@ static inline struct file *io_file_get(struct io_ring_ctx *ctx,
|
||||||
return io_file_get_normal(ctx, req, fd);
|
return io_file_get_normal(ctx, req, fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void io_req_task_link_timeout(struct io_kiocb *req)
|
static void io_req_task_link_timeout(struct io_kiocb *req, bool *locked)
|
||||||
{
|
{
|
||||||
struct io_kiocb *prev = req->timeout.prev;
|
struct io_kiocb *prev = req->timeout.prev;
|
||||||
int ret;
|
int ret;
|
||||||
|
@ -6529,7 +6547,7 @@ issue_sqe:
|
||||||
* Queued up for async execution, worker will release
|
* Queued up for async execution, worker will release
|
||||||
* submit reference when the iocb is actually submitted.
|
* submit reference when the iocb is actually submitted.
|
||||||
*/
|
*/
|
||||||
io_queue_async_work(req);
|
io_queue_async_work(req, NULL);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6554,7 +6572,7 @@ static inline void io_queue_sqe(struct io_kiocb *req)
|
||||||
if (unlikely(ret))
|
if (unlikely(ret))
|
||||||
io_req_complete_failed(req, ret);
|
io_req_complete_failed(req, ret);
|
||||||
else
|
else
|
||||||
io_queue_async_work(req);
|
io_queue_async_work(req, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue