libfuse
fuse_uring.c
1/*
2 * FUSE: Filesystem in Userspace
3 * Copyright (C) 2025 Bernd Schubert <bschubert@ddn.com>
4 *
5 * Implementation of (most of) FUSE-over-io-uring.
6 *
7 * This program can be distributed under the terms of the GNU LGPLv2.
8 * See the file LGPL2.txt
9 */
10
11#define _GNU_SOURCE
12
13#include "fuse_i.h"
14#include "fuse_kernel.h"
15#include "fuse_uring_i.h"
16
17#include <stdlib.h>
18#include <liburing.h>
19#include <sys/sysinfo.h>
20#include <stdint.h>
21#include <inttypes.h>
22#include <stdbool.h>
23#include <string.h>
24#include <unistd.h>
25#include <numa.h>
26#include <pthread.h>
27#include <stdio.h>
28#include <linux/sched.h>
29#include <poll.h>
30#include <sys/eventfd.h>
31
32/* Size of command data area in SQE when IORING_SETUP_SQE128 is used */
33#define FUSE_URING_MAX_SQE128_CMD_DATA 80
34
35struct fuse_ring_ent {
36 struct fuse_ring_queue *ring_queue; /* back pointer */
37 struct fuse_req req;
38
39 struct fuse_uring_req_header *req_header;
40 void *op_payload;
41 size_t req_payload_sz;
42
43 /* commit id of a fuse request */
44 uint64_t req_commit_id;
45
46 enum fuse_uring_cmd last_cmd;
47
48 /* header and payload */
49 struct iovec iov[2];
50};
51
52struct fuse_ring_queue {
53 /* back pointer */
54 struct fuse_ring_pool *ring_pool;
55 int qid;
56 int numa_node;
57 pthread_t tid;
58 int eventfd;
59 size_t req_header_sz;
60 struct io_uring ring;
61
62 pthread_mutex_t ring_lock;
63 bool cqe_processing;
64
65 /* size depends on queue depth */
66 struct fuse_ring_ent ent[];
67};
68
73 struct fuse_session *se;
74
75 /* number of queues */
76 size_t nr_queues;
77
78 /* number of per queue entries */
79 size_t queue_depth;
80
81 /* max payload size for fuse requests*/
82 size_t max_req_payload_sz;
83
84 /* size of a single queue */
85 size_t queue_mem_size;
86
87 unsigned int started_threads;
88 unsigned int failed_threads;
89
90 /* Avoid sending queue entries before FUSE_INIT reply*/
91 sem_t init_sem;
92
93 pthread_cond_t thread_start_cond;
94 pthread_mutex_t thread_start_mutex;
95
96 /* pointer to the first queue */
97 struct fuse_ring_queue *queues;
98};
99
100static size_t
101fuse_ring_queue_size(const size_t q_depth)
102{
103 const size_t req_size = sizeof(struct fuse_ring_ent) * q_depth;
104
105 return sizeof(struct fuse_ring_queue) + req_size;
106}
107
108static struct fuse_ring_queue *
109fuse_uring_get_queue(struct fuse_ring_pool *fuse_ring, int qid)
110{
111 void *ptr =
112 ((char *)fuse_ring->queues) + (qid * fuse_ring->queue_mem_size);
113
114 return ptr;
115}
116
120static void *fuse_uring_get_sqe_cmd(struct io_uring_sqe *sqe)
121{
122 return (void *)&sqe->cmd[0];
123}
124
125static void fuse_uring_sqe_set_req_data(struct fuse_uring_cmd_req *req,
126 const unsigned int qid,
127 const uint64_t commit_id)
128{
129 req->qid = qid;
130 req->commit_id = commit_id;
131 req->flags = 0;
132}
133
134static void
135fuse_uring_sqe_prepare(struct io_uring_sqe *sqe, struct fuse_ring_ent *req,
136 __u32 cmd_op)
137{
138 /* These fields should be written once, never change */
139 sqe->opcode = IORING_OP_URING_CMD;
140
141 /*
142 * IOSQE_FIXED_FILE: fd is the index to the fd *array*
143 * given to io_uring_register_files()
144 */
145 sqe->flags = IOSQE_FIXED_FILE;
146 sqe->fd = 0;
147
148 sqe->rw_flags = 0;
149 sqe->ioprio = 0;
150 sqe->off = 0;
151
152 io_uring_sqe_set_data(sqe, req);
153
154 sqe->cmd_op = cmd_op;
155 sqe->__pad1 = 0;
156}
157
158static int fuse_uring_commit_sqe(struct fuse_ring_pool *ring_pool,
159 struct fuse_ring_queue *queue,
160 struct fuse_ring_ent *ring_ent)
161{
162 bool locked = false;
163 struct fuse_session *se = ring_pool->se;
164 struct fuse_uring_req_header *rrh = ring_ent->req_header;
165 struct fuse_out_header *out = (struct fuse_out_header *)&rrh->in_out;
166 struct fuse_uring_ent_in_out *ent_in_out =
167 (struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
168 struct io_uring_sqe *sqe;
169
170 if (pthread_self() != queue->tid) {
171 pthread_mutex_lock(&queue->ring_lock);
172 locked = true;
173 }
174
175 sqe = io_uring_get_sqe(&queue->ring);
176
177 if (sqe == NULL) {
178 /* This is an impossible condition, unless there is a bug.
179 * The kernel sent back an SQEs, which is assigned to a request.
180 * There is no way to get out of SQEs, as the number of
181 * SQEs matches the number tof requests.
182 */
183
184 se->error = -EIO;
185 fuse_log(FUSE_LOG_ERR, "Failed to get a ring SQEs\n");
186
187 return -EIO;
188 }
189
190 ring_ent->last_cmd = FUSE_IO_URING_CMD_COMMIT_AND_FETCH;
191 fuse_uring_sqe_prepare(sqe, ring_ent, ring_ent->last_cmd);
192 fuse_uring_sqe_set_req_data(fuse_uring_get_sqe_cmd(sqe), queue->qid,
193 ring_ent->req_commit_id);
194
195 if (se->debug) {
196 fuse_log(FUSE_LOG_DEBUG, " unique: %" PRIu64 ", result=%d\n",
197 out->unique, ent_in_out->payload_sz);
198 }
199
200 if (!queue->cqe_processing)
201 io_uring_submit(&queue->ring);
202
203 if (locked)
204 pthread_mutex_unlock(&queue->ring_lock);
205
206 return 0;
207}
208
209int fuse_req_get_payload(fuse_req_t req, char **payload, size_t *payload_sz,
210 void **mr)
211{
212 struct fuse_ring_ent *ring_ent;
213
214 /* Not possible without io-uring interface */
215 if (!req->flags.is_uring)
216 return -EINVAL;
217
218 ring_ent = container_of(req, struct fuse_ring_ent, req);
219
220 *payload = ring_ent->op_payload;
221 *payload_sz = ring_ent->req_payload_sz;
222
223 /*
224 * For now unused, but will be used later when the application can
225 * allocate the buffers itself and register them for rdma.
226 */
227 if (mr)
228 *mr = NULL;
229
230 return 0;
231}
232
233int send_reply_uring(fuse_req_t req, int error, const void *arg, size_t argsize)
234{
235 int res;
236 struct fuse_ring_ent *ring_ent =
237 container_of(req, struct fuse_ring_ent, req);
238 struct fuse_uring_req_header *rrh = ring_ent->req_header;
239 struct fuse_out_header *out = (struct fuse_out_header *)&rrh->in_out;
240 struct fuse_uring_ent_in_out *ent_in_out =
241 (struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
242
243 struct fuse_ring_queue *queue = ring_ent->ring_queue;
244 struct fuse_ring_pool *ring_pool = queue->ring_pool;
245 size_t max_payload_sz = ring_pool->max_req_payload_sz;
246
247 if (argsize > max_payload_sz) {
248 fuse_log(FUSE_LOG_ERR, "argsize %zu exceeds buffer size %zu",
249 argsize, max_payload_sz);
250 error = -EINVAL;
251 } else if (argsize) {
252 if (arg != ring_ent->op_payload)
253 memcpy(ring_ent->op_payload, arg, argsize);
254 }
255 ent_in_out->payload_sz = argsize;
256
257 out->error = error;
258 out->unique = req->unique;
259
260 res = fuse_uring_commit_sqe(ring_pool, queue, ring_ent);
261
262 fuse_free_req(req);
263
264 return res;
265}
266
267int fuse_reply_data_uring(fuse_req_t req, struct fuse_bufvec *bufv,
268 enum fuse_buf_copy_flags flags)
269{
270 struct fuse_ring_ent *ring_ent =
271 container_of(req, struct fuse_ring_ent, req);
272
273 struct fuse_ring_queue *queue = ring_ent->ring_queue;
274 struct fuse_ring_pool *ring_pool = queue->ring_pool;
275 struct fuse_uring_req_header *rrh = ring_ent->req_header;
276 struct fuse_out_header *out = (struct fuse_out_header *)&rrh->in_out;
277 struct fuse_uring_ent_in_out *ent_in_out =
278 (struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
279 size_t max_payload_sz = ring_ent->req_payload_sz;
280 struct fuse_bufvec dest_vec = FUSE_BUFVEC_INIT(max_payload_sz);
281 int res;
282
283 dest_vec.buf[0].mem = ring_ent->op_payload;
284 dest_vec.buf[0].size = max_payload_sz;
285
286 res = fuse_buf_copy(&dest_vec, bufv, flags);
287
288 out->error = res < 0 ? res : 0;
289 out->unique = req->unique;
290
291 ent_in_out->payload_sz = res > 0 ? res : 0;
292
293 res = fuse_uring_commit_sqe(ring_pool, queue, ring_ent);
294
295 fuse_free_req(req);
296
297 return res;
298}
299
303int fuse_send_msg_uring(fuse_req_t req, struct iovec *iov, int count)
304{
305 struct fuse_ring_ent *ring_ent =
306 container_of(req, struct fuse_ring_ent, req);
307
308 struct fuse_ring_queue *queue = ring_ent->ring_queue;
309 struct fuse_ring_pool *ring_pool = queue->ring_pool;
310 struct fuse_uring_req_header *rrh = ring_ent->req_header;
311 struct fuse_out_header *out = (struct fuse_out_header *)&rrh->in_out;
312 struct fuse_uring_ent_in_out *ent_in_out =
313 (struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
314 size_t max_buf = ring_pool->max_req_payload_sz;
315 size_t len = 0;
316 int res = 0;
317
318 /* copy iov into the payload, idx=0 is the header section */
319 for (int idx = 1; idx < count; idx++) {
320 struct iovec *cur = &iov[idx];
321
322 if (len + cur->iov_len > max_buf) {
323 fuse_log(FUSE_LOG_ERR,
324 "iov[%d] exceeds buffer size %zu",
325 idx, max_buf);
326 res = -EINVAL; /* Gracefully handle this? */
327 break;
328 }
329
330 memcpy(ring_ent->op_payload + len, cur->iov_base, cur->iov_len);
331 len += cur->iov_len;
332 }
333
334 ent_in_out->payload_sz = len;
335
336 out->error = res;
337 out->unique = req->unique;
338 out->len = len;
339
340 return fuse_uring_commit_sqe(ring_pool, queue, ring_ent);
341}
342
343static int fuse_queue_setup_io_uring(struct io_uring *ring, size_t qid,
344 size_t depth, int fd, int evfd)
345{
346 int rc;
347 struct io_uring_params params = {0};
348 int files[2] = { fd, evfd };
349
350 depth += 1; /* for the eventfd poll SQE */
351
352 params.flags = IORING_SETUP_SQE128;
353
354 /* Avoid cq overflow */
355 params.flags |= IORING_SETUP_CQSIZE;
356 params.cq_entries = depth * 2;
357
358 /* These flags should help to increase performance, but actually
359 * make it a bit slower - reason should get investigated.
360 */
361 if (0) {
362 /* Has the main slow down effect */
363 params.flags |= IORING_SETUP_SINGLE_ISSUER;
364
365 // params.flags |= IORING_SETUP_DEFER_TASKRUN;
366 params.flags |= IORING_SETUP_TASKRUN_FLAG;
367
368 /* Second main effect to make it slower */
369 params.flags |= IORING_SETUP_COOP_TASKRUN;
370 }
371
372 rc = io_uring_queue_init_params(depth, ring, &params);
373 if (rc != 0) {
374 fuse_log(FUSE_LOG_ERR, "Failed to setup qid %zu: %d (%s)\n",
375 qid, rc, strerror(-rc));
376 return rc;
377 }
378
379 rc = io_uring_register_files(ring, files, 1);
380 if (rc != 0) {
381 rc = -errno;
382 fuse_log(FUSE_LOG_ERR,
383 "Failed to register files for ring idx %zu: %s",
384 qid, strerror(errno));
385 return rc;
386 }
387
388 return 0;
389}
390
391static void fuse_session_destruct_uring(struct fuse_ring_pool *fuse_ring)
392{
393 for (size_t qid = 0; qid < fuse_ring->nr_queues; qid++) {
394 struct fuse_ring_queue *queue =
395 fuse_uring_get_queue(fuse_ring, qid);
396
397 if (queue->tid != 0) {
398 uint64_t value = 1ULL;
399 int rc;
400
401 rc = write(queue->eventfd, &value, sizeof(value));
402 if (rc != sizeof(value))
403 fprintf(stderr,
404 "Wrote to eventfd=%d err=%s: rc=%d\n",
405 queue->eventfd, strerror(errno), rc);
406 pthread_cancel(queue->tid);
407 pthread_join(queue->tid, NULL);
408 queue->tid = 0;
409 }
410
411 if (queue->eventfd >= 0) {
412 close(queue->eventfd);
413 queue->eventfd = -1;
414 }
415
416 if (queue->ring.ring_fd != -1)
417 io_uring_queue_exit(&queue->ring);
418
419 for (size_t idx = 0; idx < fuse_ring->queue_depth; idx++) {
420 struct fuse_ring_ent *ent = &queue->ent[idx];
421
422 numa_free(ent->op_payload, ent->req_payload_sz);
423 numa_free(ent->req_header, queue->req_header_sz);
424 }
425
426 pthread_mutex_destroy(&queue->ring_lock);
427 }
428
429 free(fuse_ring->queues);
430 pthread_cond_destroy(&fuse_ring->thread_start_cond);
431 pthread_mutex_destroy(&fuse_ring->thread_start_mutex);
432 free(fuse_ring);
433}
434
435static int fuse_uring_register_ent(struct fuse_ring_queue *queue,
436 struct fuse_ring_ent *ent)
437{
438 struct io_uring_sqe *sqe;
439
440 sqe = io_uring_get_sqe(&queue->ring);
441 if (sqe == NULL) {
442 /*
443 * All SQEs are idle here - no good reason this
444 * could fail
445 */
446 fuse_log(FUSE_LOG_ERR, "Failed to get all ring SQEs");
447 return -EIO;
448 }
449
450 ent->last_cmd = FUSE_IO_URING_CMD_REGISTER;
451 fuse_uring_sqe_prepare(sqe, ent, ent->last_cmd);
452
453 /* only needed for fetch */
454 ent->iov[0].iov_base = ent->req_header;
455 ent->iov[0].iov_len = queue->req_header_sz;
456
457 ent->iov[1].iov_base = ent->op_payload;
458 ent->iov[1].iov_len = ent->req_payload_sz;
459
460 sqe->addr = (uint64_t)(ent->iov);
461 sqe->len = 2;
462
463 /* this is a fetch, kernel does not read commit id */
464 fuse_uring_sqe_set_req_data(fuse_uring_get_sqe_cmd(sqe), queue->qid, 0);
465
466 return 0;
467
468}
469
470static int fuse_uring_register_queue(struct fuse_ring_queue *queue)
471{
472 struct fuse_ring_pool *ring_pool = queue->ring_pool;
473 unsigned int sq_ready;
474 struct io_uring_sqe *sqe;
475 int res;
476
477 for (size_t idx = 0; idx < ring_pool->queue_depth; idx++) {
478 struct fuse_ring_ent *ent = &queue->ent[idx];
479
480 res = fuse_uring_register_ent(queue, ent);
481 if (res != 0)
482 return res;
483 }
484
485 sq_ready = io_uring_sq_ready(&queue->ring);
486 if (sq_ready != ring_pool->queue_depth) {
487 fuse_log(FUSE_LOG_ERR,
488 "SQE ready mismatch, expected %zu got %u\n",
489 ring_pool->queue_depth, sq_ready);
490 return -EINVAL;
491 }
492
493 /* Poll SQE for the eventfd to wake up on teardown */
494 sqe = io_uring_get_sqe(&queue->ring);
495 if (sqe == NULL) {
496 fuse_log(FUSE_LOG_ERR, "Failed to get eventfd SQE");
497 return -EIO;
498 }
499
500 io_uring_prep_poll_add(sqe, queue->eventfd, POLLIN);
501 io_uring_sqe_set_data(sqe, (void *)(uintptr_t)queue->eventfd);
502
503 /* Only preparation until here, no submission yet */
504
505 return 0;
506}
507
508static struct fuse_ring_pool *fuse_create_ring(struct fuse_session *se)
509{
510 struct fuse_ring_pool *fuse_ring = NULL;
511 const size_t nr_queues = get_nprocs_conf();
512 size_t payload_sz = se->bufsize - FUSE_BUFFER_HEADER_SIZE;
513 size_t queue_sz;
514
515 if (se->debug)
516 fuse_log(FUSE_LOG_DEBUG, "starting io-uring q-depth=%d\n",
517 se->uring.q_depth);
518
519 fuse_ring = calloc(1, sizeof(*fuse_ring));
520 if (fuse_ring == NULL) {
521 fuse_log(FUSE_LOG_ERR, "Allocating the ring failed\n");
522 goto err;
523 }
524
525 queue_sz = fuse_ring_queue_size(se->uring.q_depth);
526 fuse_ring->queues = calloc(1, queue_sz * nr_queues);
527 if (fuse_ring->queues == NULL) {
528 fuse_log(FUSE_LOG_ERR, "Allocating the queues failed\n");
529 goto err;
530 }
531
532 fuse_ring->se = se;
533 fuse_ring->nr_queues = nr_queues;
534 fuse_ring->queue_depth = se->uring.q_depth;
535 fuse_ring->max_req_payload_sz = payload_sz;
536 fuse_ring->queue_mem_size = queue_sz;
537
538 /*
539 * very basic queue initialization, that cannot fail and will
540 * allow easy cleanup if something (like mmap) fails in the middle
541 * below
542 */
543 for (size_t qid = 0; qid < nr_queues; qid++) {
544 struct fuse_ring_queue *queue =
545 fuse_uring_get_queue(fuse_ring, qid);
546
547 queue->ring.ring_fd = -1;
548 queue->numa_node = numa_node_of_cpu(qid);
549 queue->qid = qid;
550 queue->ring_pool = fuse_ring;
551 queue->eventfd = -1;
552 pthread_mutex_init(&queue->ring_lock, NULL);
553 }
554
555 pthread_cond_init(&fuse_ring->thread_start_cond, NULL);
556 pthread_mutex_init(&fuse_ring->thread_start_mutex, NULL);
557 sem_init(&fuse_ring->init_sem, 0, 0);
558
559 return fuse_ring;
560
561err:
562 if (fuse_ring)
563 fuse_session_destruct_uring(fuse_ring);
564
565 return NULL;
566}
567
568static void fuse_uring_resubmit(struct fuse_ring_queue *queue,
569 struct fuse_ring_ent *ent)
570{
571 struct io_uring_sqe *sqe;
572
573 sqe = io_uring_get_sqe(&queue->ring);
574 if (sqe == NULL) {
575 /* This is an impossible condition, unless there is a bug.
576 * The kernel sent back an SQEs, which is assigned to a request.
577 * There is no way to get out of SQEs, as the number of
578 * SQEs matches the number tof requests.
579 */
580
581 queue->ring_pool->se->error = -EIO;
582 fuse_log(FUSE_LOG_ERR, "Failed to get a ring SQEs\n");
583
584 return;
585 }
586
587 fuse_uring_sqe_prepare(sqe, ent, ent->last_cmd);
588
589 switch (ent->last_cmd) {
590 case FUSE_IO_URING_CMD_REGISTER:
591 sqe->addr = (uint64_t)(ent->iov);
592 sqe->len = 2;
593 fuse_uring_sqe_set_req_data(fuse_uring_get_sqe_cmd(sqe),
594 queue->qid, 0);
595 break;
596 case FUSE_IO_URING_CMD_COMMIT_AND_FETCH:
597 fuse_uring_sqe_set_req_data(fuse_uring_get_sqe_cmd(sqe),
598 queue->qid, ent->req_commit_id);
599 break;
600 default:
601 fuse_log(FUSE_LOG_ERR, "Unknown command type: %d\n",
602 ent->last_cmd);
603 queue->ring_pool->se->error = -EINVAL;
604 break;
605 }
606
607 /* caller submits */
608}
609
610static void fuse_uring_handle_cqe(struct fuse_ring_queue *queue,
611 struct io_uring_cqe *cqe)
612{
613 struct fuse_ring_ent *ent = io_uring_cqe_get_data(cqe);
614
615 if (!ent) {
616 fuse_log(FUSE_LOG_ERR,
617 "cqe=%p io_uring_cqe_get_data returned NULL\n", cqe);
618 return;
619 }
620
621 struct fuse_req *req = &ent->req;
622 struct fuse_ring_pool *fuse_ring = queue->ring_pool;
623 struct fuse_uring_req_header *rrh = ent->req_header;
624
625 struct fuse_in_header *in = (struct fuse_in_header *)&rrh->in_out;
626 struct fuse_uring_ent_in_out *ent_in_out = &rrh->ring_ent_in_out;
627
628 ent->req_commit_id = ent_in_out->commit_id;
629 if (unlikely(ent->req_commit_id == 0)) {
630 /*
631 * If this happens kernel will not find the response - it will
632 * be stuck forever - better to abort immediately.
633 */
634 fuse_log(FUSE_LOG_ERR, "Received invalid commit_id=0\n");
635 abort();
636 }
637
638 memset(&req->flags, 0, sizeof(req->flags));
639 memset(&req->u, 0, sizeof(req->u));
640 req->flags.is_uring = 1;
641 req->ref_cnt++;
642 req->ch = NULL; /* not needed for uring */
643 req->interrupted = 0;
644 list_init_req(req);
645
646 fuse_session_process_uring_cqe(fuse_ring->se, req, in, &rrh->op_in,
647 ent->op_payload, ent_in_out->payload_sz);
648}
649
650static int fuse_uring_queue_handle_cqes(struct fuse_ring_queue *queue)
651{
652 struct fuse_ring_pool *ring_pool = queue->ring_pool;
653 struct fuse_session *se = ring_pool->se;
654 size_t num_completed = 0;
655 struct io_uring_cqe *cqe;
656 unsigned int head;
657 struct fuse_ring_ent *ent;
658 int ret = 0;
659
660 io_uring_for_each_cqe(&queue->ring, head, cqe) {
661 int err = 0;
662
663 num_completed++;
664
665 err = cqe->res;
666 if (unlikely(err != 0)) {
667 if (err > 0 && ((uintptr_t)io_uring_cqe_get_data(cqe) ==
668 (unsigned int)queue->eventfd)) {
669 /* teardown from eventfd */
670 return -ENOTCONN;
671 }
672
673
674 switch (err) {
675 case -EAGAIN:
676 fallthrough;
677 case -EINTR:
678 ent = io_uring_cqe_get_data(cqe);
679 fuse_uring_resubmit(queue, ent);
680 continue;
681 default:
682 break;
683 }
684
685 /* -ENOTCONN is ok on umount */
686 if (err != -ENOTCONN) {
687 se->error = cqe->res;
688
689 /* return first error */
690 if (ret == 0)
691 ret = err;
692 }
693
694 } else {
695 fuse_uring_handle_cqe(queue, cqe);
696 }
697 }
698
699 if (num_completed)
700 io_uring_cq_advance(&queue->ring, num_completed);
701
702 return ret == 0 ? 0 : num_completed;
703}
704
709static void fuse_uring_set_thread_core(int qid)
710{
711 cpu_set_t mask;
712 int rc;
713
714 CPU_ZERO(&mask);
715 CPU_SET(qid, &mask);
716 rc = sched_setaffinity(0, sizeof(cpu_set_t), &mask);
717 if (rc != 0)
718 fuse_log(FUSE_LOG_ERR, "Failed to bind qid=%d to its core: %s\n",
719 qid, strerror(errno));
720
721 if (0) {
722 const int policy = SCHED_IDLE;
723 const struct sched_param param = {
724 .sched_priority = sched_get_priority_min(policy),
725 };
726
727 /* Set the lowest possible priority, so that the application
728 * submitting requests is not moved away from the current core.
729 */
730 rc = sched_setscheduler(0, policy, &param);
731 if (rc != 0)
732 fuse_log(FUSE_LOG_ERR, "Failed to set scheduler: %s\n",
733 strerror(errno));
734 }
735}
736
737/*
738 * @return negative error code or io-uring file descriptor
739 */
740static int fuse_uring_init_queue(struct fuse_ring_queue *queue)
741{
742 struct fuse_ring_pool *ring = queue->ring_pool;
743 struct fuse_session *se = ring->se;
744 int res;
745 size_t page_sz = sysconf(_SC_PAGESIZE);
746
747 queue->eventfd = eventfd(0, EFD_CLOEXEC);
748 if (queue->eventfd < 0) {
749 res = -errno;
750 fuse_log(FUSE_LOG_ERR,
751 "Failed to create eventfd for qid %d: %s\n",
752 queue->qid, strerror(errno));
753 return res;
754 }
755
756 res = fuse_queue_setup_io_uring(&queue->ring, queue->qid,
757 ring->queue_depth, se->fd,
758 queue->eventfd);
759 if (res != 0) {
760 fuse_log(FUSE_LOG_ERR, "qid=%d io_uring init failed\n",
761 queue->qid);
762 goto err;
763 }
764
765 queue->req_header_sz = ROUND_UP(sizeof(struct fuse_ring_ent),
766 page_sz);
767
768 for (size_t idx = 0; idx < ring->queue_depth; idx++) {
769 struct fuse_ring_ent *ring_ent = &queue->ent[idx];
770 struct fuse_req *req = &ring_ent->req;
771
772 ring_ent->ring_queue = queue;
773
774 /*
775 * Also allocate the header to have it page aligned, which
776 * is a requirement for page pinning
777 */
778 ring_ent->req_header =
779 numa_alloc_local(queue->req_header_sz);
780 ring_ent->req_payload_sz = ring->max_req_payload_sz;
781
782 ring_ent->op_payload =
783 numa_alloc_local(ring_ent->req_payload_sz);
784
785 req->se = se;
786 pthread_mutex_init(&req->lock, NULL);
787 req->flags.is_uring = 1;
788 req->ref_cnt = 1; /* extra ref to avoid destruction */
789 list_init_req(req);
790 }
791
792 res = fuse_uring_register_queue(queue);
793 if (res != 0) {
794 fuse_log(
795 FUSE_LOG_ERR,
796 "Grave fuse-uring error on preparing SQEs, aborting\n");
797 se->error = -EIO;
799 }
800
801 return queue->ring.ring_fd;
802
803err:
804 close(queue->eventfd);
805 return res;
806}
807
808static void *fuse_uring_thread(void *arg)
809{
810 struct fuse_ring_queue *queue = arg;
811 struct fuse_ring_pool *ring_pool = queue->ring_pool;
812 struct fuse_session *se = ring_pool->se;
813 int err;
814 char thread_name[16] = { 0 };
815
816 snprintf(thread_name, 16, "fuse-ring-%d", queue->qid);
817 thread_name[15] = '\0';
818 fuse_set_thread_name(thread_name);
819
820 fuse_uring_set_thread_core(queue->qid);
821
822 err = fuse_uring_init_queue(queue);
823 pthread_mutex_lock(&ring_pool->thread_start_mutex);
824 if (err < 0)
825 ring_pool->failed_threads++;
826 ring_pool->started_threads++;
827 pthread_cond_broadcast(&ring_pool->thread_start_cond);
828 pthread_mutex_unlock(&ring_pool->thread_start_mutex);
829
830 if (err < 0) {
831 fuse_log(FUSE_LOG_ERR, "qid=%d queue setup failed\n",
832 queue->qid);
833 goto err_non_fatal;
834 }
835
836 sem_wait(&ring_pool->init_sem);
837
838 /* Not using fuse_session_exited(se), as that cannot be inlined */
839 while (!atomic_load_explicit(&se->mt_exited, memory_order_relaxed)) {
840 io_uring_submit_and_wait(&queue->ring, 1);
841
842 pthread_mutex_lock(&queue->ring_lock);
843 queue->cqe_processing = true;
844 err = fuse_uring_queue_handle_cqes(queue);
845 queue->cqe_processing = false;
846 pthread_mutex_unlock(&queue->ring_lock);
847 if (err < 0)
848 goto err;
849 }
850
851 return NULL;
852
853err:
855err_non_fatal:
856 return NULL;
857}
858
859static int fuse_uring_start_ring_threads(struct fuse_ring_pool *ring)
860{
861 int rc = 0;
862
863 for (size_t qid = 0; qid < ring->nr_queues; qid++) {
864 struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
865
866 rc = pthread_create(&queue->tid, NULL, fuse_uring_thread, queue);
867 if (rc != 0)
868 break;
869 }
870
871 return rc;
872}
873
874static int fuse_uring_sanity_check(struct fuse_session *se)
875{
876 if (se->uring.q_depth == 0) {
877 fuse_log(FUSE_LOG_ERR, "io-uring queue depth must be > 0\n");
878 return -EINVAL;
879 }
880
881 _Static_assert(sizeof(struct fuse_uring_cmd_req) <=
882 FUSE_URING_MAX_SQE128_CMD_DATA,
883 "SQE128_CMD_DATA has 80B cmd data");
884
885 return 0;
886}
887
888int fuse_uring_start(struct fuse_session *se)
889{
890 int err = 0;
891 struct fuse_ring_pool *fuse_ring;
892
893 fuse_uring_sanity_check(se);
894
895 fuse_ring = fuse_create_ring(se);
896 if (fuse_ring == NULL) {
897 err = -EADDRNOTAVAIL;
898 goto err;
899 }
900
901 se->uring.pool = fuse_ring;
902
903 /* Hold off threads from send fuse ring entries (SQEs) */
904 sem_init(&fuse_ring->init_sem, 0, 0);
905 pthread_cond_init(&fuse_ring->thread_start_cond, NULL);
906 pthread_mutex_init(&fuse_ring->thread_start_mutex, NULL);
907
908 err = fuse_uring_start_ring_threads(fuse_ring);
909 if (err)
910 goto err;
911
912 /*
913 * Wait for all threads to start or to fail
914 */
915 pthread_mutex_lock(&fuse_ring->thread_start_mutex);
916 while (fuse_ring->started_threads < fuse_ring->nr_queues)
917 pthread_cond_wait(&fuse_ring->thread_start_cond,
918 &fuse_ring->thread_start_mutex);
919
920 if (fuse_ring->failed_threads != 0)
921 err = -EADDRNOTAVAIL;
922 pthread_mutex_unlock(&fuse_ring->thread_start_mutex);
923
924err:
925 if (err) {
926 /* Note all threads need to have been started */
927 fuse_session_destruct_uring(fuse_ring);
928 se->uring.pool = fuse_ring;
929 }
930 return err;
931}
932
933int fuse_uring_stop(struct fuse_session *se)
934{
935 struct fuse_ring_pool *ring = se->uring.pool;
936
937 if (ring == NULL)
938 return 0;
939
940 fuse_session_destruct_uring(ring);
941
942 return 0;
943}
944
945void fuse_uring_wake_ring_threads(struct fuse_session *se)
946{
947 struct fuse_ring_pool *ring = se->uring.pool;
948
949 /* Wake up the threads to let them send SQEs */
950 for (size_t qid = 0; qid < ring->nr_queues; qid++)
951 sem_post(&ring->init_sem);
952}
ssize_t fuse_buf_copy(struct fuse_bufvec *dst, struct fuse_bufvec *src, enum fuse_buf_copy_flags flags)
Definition buffer.c:284
fuse_buf_copy_flags
void fuse_log(enum fuse_log_level level, const char *fmt,...)
Definition fuse_log.c:77
void fuse_session_exit(struct fuse_session *se)
struct fuse_req * fuse_req_t
int fuse_req_get_payload(fuse_req_t req, char **payload, size_t *payload_sz, void **mr)
void * mem
size_t size
struct fuse_buf buf[1]