14#include "fuse_kernel.h"
15#include "fuse_uring_i.h"
19#include <sys/sysinfo.h>
28#include <linux/sched.h>
30#include <sys/eventfd.h>
33#define FUSE_URING_MAX_SQE128_CMD_DATA 80
36 struct fuse_ring_queue *ring_queue;
41 size_t req_payload_sz;
44 uint64_t req_commit_id;
46 enum fuse_uring_cmd last_cmd;
52struct fuse_ring_queue {
62 pthread_mutex_t ring_lock;
66 struct fuse_ring_ent ent[];
73 struct fuse_session *se;
82 size_t max_req_payload_sz;
85 size_t queue_mem_size;
87 unsigned int started_threads;
88 unsigned int failed_threads;
93 pthread_cond_t thread_start_cond;
94 pthread_mutex_t thread_start_mutex;
97 struct fuse_ring_queue *queues;
101fuse_ring_queue_size(
const size_t q_depth)
103 const size_t req_size =
sizeof(
struct fuse_ring_ent) * q_depth;
105 return sizeof(
struct fuse_ring_queue) + req_size;
108static struct fuse_ring_queue *
112 ((
char *)fuse_ring->queues) + (qid * fuse_ring->queue_mem_size);
120static void *fuse_uring_get_sqe_cmd(
struct io_uring_sqe *sqe)
122 return (
void *)&sqe->cmd[0];
126 const unsigned int qid,
127 const uint64_t commit_id)
130 req->commit_id = commit_id;
135fuse_uring_sqe_prepare(
struct io_uring_sqe *sqe,
struct fuse_ring_ent *req,
139 sqe->opcode = IORING_OP_URING_CMD;
145 sqe->flags = IOSQE_FIXED_FILE;
152 io_uring_sqe_set_data(sqe, req);
154 sqe->cmd_op = cmd_op;
159 struct fuse_ring_queue *queue,
160 struct fuse_ring_ent *ring_ent)
163 struct fuse_session *se = ring_pool->se;
165 struct fuse_out_header *out = (
struct fuse_out_header *)&rrh->in_out;
166 struct fuse_uring_ent_in_out *ent_in_out =
167 (
struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
168 struct io_uring_sqe *sqe;
170 if (pthread_self() != queue->tid) {
171 pthread_mutex_lock(&queue->ring_lock);
175 sqe = io_uring_get_sqe(&queue->ring);
185 fuse_log(FUSE_LOG_ERR,
"Failed to get a ring SQEs\n");
190 ring_ent->last_cmd = FUSE_IO_URING_CMD_COMMIT_AND_FETCH;
191 fuse_uring_sqe_prepare(sqe, ring_ent, ring_ent->last_cmd);
192 fuse_uring_sqe_set_req_data(fuse_uring_get_sqe_cmd(sqe), queue->qid,
193 ring_ent->req_commit_id);
196 fuse_log(FUSE_LOG_DEBUG,
" unique: %" PRIu64
", result=%d\n",
197 out->unique, ent_in_out->payload_sz);
200 if (!queue->cqe_processing)
201 io_uring_submit(&queue->ring);
204 pthread_mutex_unlock(&queue->ring_lock);
212 struct fuse_ring_ent *ring_ent;
215 if (!req->flags.is_uring)
218 ring_ent = container_of(req,
struct fuse_ring_ent, req);
220 *payload = ring_ent->op_payload;
221 *payload_sz = ring_ent->req_payload_sz;
233int send_reply_uring(
fuse_req_t req,
int error,
const void *arg,
size_t argsize)
236 struct fuse_ring_ent *ring_ent =
237 container_of(req,
struct fuse_ring_ent, req);
239 struct fuse_out_header *out = (
struct fuse_out_header *)&rrh->in_out;
240 struct fuse_uring_ent_in_out *ent_in_out =
241 (
struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
243 struct fuse_ring_queue *queue = ring_ent->ring_queue;
245 size_t max_payload_sz = ring_pool->max_req_payload_sz;
247 if (argsize > max_payload_sz) {
248 fuse_log(FUSE_LOG_ERR,
"argsize %zu exceeds buffer size %zu",
249 argsize, max_payload_sz);
251 }
else if (argsize) {
252 if (arg != ring_ent->op_payload)
253 memcpy(ring_ent->op_payload, arg, argsize);
255 ent_in_out->payload_sz = argsize;
258 out->unique = req->unique;
260 res = fuse_uring_commit_sqe(ring_pool, queue, ring_ent);
270 struct fuse_ring_ent *ring_ent =
271 container_of(req,
struct fuse_ring_ent, req);
273 struct fuse_ring_queue *queue = ring_ent->ring_queue;
276 struct fuse_out_header *out = (
struct fuse_out_header *)&rrh->in_out;
277 struct fuse_uring_ent_in_out *ent_in_out =
278 (
struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
279 size_t max_payload_sz = ring_ent->req_payload_sz;
280 struct fuse_bufvec dest_vec = FUSE_BUFVEC_INIT(max_payload_sz);
283 dest_vec.
buf[0].
mem = ring_ent->op_payload;
284 dest_vec.
buf[0].
size = max_payload_sz;
288 out->error = res < 0 ? res : 0;
289 out->unique = req->unique;
291 ent_in_out->payload_sz = res > 0 ? res : 0;
293 res = fuse_uring_commit_sqe(ring_pool, queue, ring_ent);
305 struct fuse_ring_ent *ring_ent =
306 container_of(req,
struct fuse_ring_ent, req);
308 struct fuse_ring_queue *queue = ring_ent->ring_queue;
311 struct fuse_out_header *out = (
struct fuse_out_header *)&rrh->in_out;
312 struct fuse_uring_ent_in_out *ent_in_out =
313 (
struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
314 size_t max_buf = ring_pool->max_req_payload_sz;
319 for (
int idx = 1; idx < count; idx++) {
320 struct iovec *cur = &iov[idx];
322 if (len + cur->iov_len > max_buf) {
324 "iov[%d] exceeds buffer size %zu",
330 memcpy(ring_ent->op_payload + len, cur->iov_base, cur->iov_len);
334 ent_in_out->payload_sz = len;
337 out->unique = req->unique;
340 return fuse_uring_commit_sqe(ring_pool, queue, ring_ent);
343static int fuse_queue_setup_io_uring(
struct io_uring *ring,
size_t qid,
344 size_t depth,
int fd,
int evfd)
347 struct io_uring_params params = {0};
348 int files[2] = { fd, evfd };
352 params.flags = IORING_SETUP_SQE128;
355 params.flags |= IORING_SETUP_CQSIZE;
356 params.cq_entries = depth * 2;
363 params.flags |= IORING_SETUP_SINGLE_ISSUER;
366 params.flags |= IORING_SETUP_TASKRUN_FLAG;
369 params.flags |= IORING_SETUP_COOP_TASKRUN;
372 rc = io_uring_queue_init_params(depth, ring, ¶ms);
374 fuse_log(FUSE_LOG_ERR,
"Failed to setup qid %zu: %d (%s)\n",
375 qid, rc, strerror(-rc));
379 rc = io_uring_register_files(ring, files, 1);
383 "Failed to register files for ring idx %zu: %s",
384 qid, strerror(errno));
391static void fuse_session_destruct_uring(
struct fuse_ring_pool *fuse_ring)
393 for (
size_t qid = 0; qid < fuse_ring->nr_queues; qid++) {
394 struct fuse_ring_queue *queue =
395 fuse_uring_get_queue(fuse_ring, qid);
397 if (queue->tid != 0) {
398 uint64_t value = 1ULL;
401 rc = write(queue->eventfd, &value,
sizeof(value));
402 if (rc !=
sizeof(value))
404 "Wrote to eventfd=%d err=%s: rc=%d\n",
405 queue->eventfd, strerror(errno), rc);
406 pthread_cancel(queue->tid);
407 pthread_join(queue->tid, NULL);
411 if (queue->eventfd >= 0) {
412 close(queue->eventfd);
416 if (queue->ring.ring_fd != -1)
417 io_uring_queue_exit(&queue->ring);
419 for (
size_t idx = 0; idx < fuse_ring->queue_depth; idx++) {
420 struct fuse_ring_ent *ent = &queue->ent[idx];
422 numa_free(ent->op_payload, ent->req_payload_sz);
423 numa_free(ent->req_header, queue->req_header_sz);
426 pthread_mutex_destroy(&queue->ring_lock);
429 free(fuse_ring->queues);
430 pthread_cond_destroy(&fuse_ring->thread_start_cond);
431 pthread_mutex_destroy(&fuse_ring->thread_start_mutex);
435static int fuse_uring_register_ent(
struct fuse_ring_queue *queue,
436 struct fuse_ring_ent *ent)
438 struct io_uring_sqe *sqe;
440 sqe = io_uring_get_sqe(&queue->ring);
446 fuse_log(FUSE_LOG_ERR,
"Failed to get all ring SQEs");
450 ent->last_cmd = FUSE_IO_URING_CMD_REGISTER;
451 fuse_uring_sqe_prepare(sqe, ent, ent->last_cmd);
454 ent->iov[0].iov_base = ent->req_header;
455 ent->iov[0].iov_len = queue->req_header_sz;
457 ent->iov[1].iov_base = ent->op_payload;
458 ent->iov[1].iov_len = ent->req_payload_sz;
460 sqe->addr = (uint64_t)(ent->iov);
464 fuse_uring_sqe_set_req_data(fuse_uring_get_sqe_cmd(sqe), queue->qid, 0);
470static int fuse_uring_register_queue(
struct fuse_ring_queue *queue)
473 unsigned int sq_ready;
474 struct io_uring_sqe *sqe;
477 for (
size_t idx = 0; idx < ring_pool->queue_depth; idx++) {
478 struct fuse_ring_ent *ent = &queue->ent[idx];
480 res = fuse_uring_register_ent(queue, ent);
485 sq_ready = io_uring_sq_ready(&queue->ring);
486 if (sq_ready != ring_pool->queue_depth) {
488 "SQE ready mismatch, expected %zu got %u\n",
489 ring_pool->queue_depth, sq_ready);
494 sqe = io_uring_get_sqe(&queue->ring);
496 fuse_log(FUSE_LOG_ERR,
"Failed to get eventfd SQE");
500 io_uring_prep_poll_add(sqe, queue->eventfd, POLLIN);
501 io_uring_sqe_set_data(sqe, (
void *)(uintptr_t)queue->eventfd);
508static struct fuse_ring_pool *fuse_create_ring(
struct fuse_session *se)
511 const size_t nr_queues = get_nprocs_conf();
512 size_t payload_sz = se->bufsize - FUSE_BUFFER_HEADER_SIZE;
516 fuse_log(FUSE_LOG_DEBUG,
"starting io-uring q-depth=%d\n",
519 fuse_ring = calloc(1,
sizeof(*fuse_ring));
520 if (fuse_ring == NULL) {
521 fuse_log(FUSE_LOG_ERR,
"Allocating the ring failed\n");
525 queue_sz = fuse_ring_queue_size(se->uring.q_depth);
526 fuse_ring->queues = calloc(1, queue_sz * nr_queues);
527 if (fuse_ring->queues == NULL) {
528 fuse_log(FUSE_LOG_ERR,
"Allocating the queues failed\n");
533 fuse_ring->nr_queues = nr_queues;
534 fuse_ring->queue_depth = se->uring.q_depth;
535 fuse_ring->max_req_payload_sz = payload_sz;
536 fuse_ring->queue_mem_size = queue_sz;
543 for (
size_t qid = 0; qid < nr_queues; qid++) {
544 struct fuse_ring_queue *queue =
545 fuse_uring_get_queue(fuse_ring, qid);
547 queue->ring.ring_fd = -1;
548 queue->numa_node = numa_node_of_cpu(qid);
550 queue->ring_pool = fuse_ring;
552 pthread_mutex_init(&queue->ring_lock, NULL);
555 pthread_cond_init(&fuse_ring->thread_start_cond, NULL);
556 pthread_mutex_init(&fuse_ring->thread_start_mutex, NULL);
557 sem_init(&fuse_ring->init_sem, 0, 0);
563 fuse_session_destruct_uring(fuse_ring);
568static void fuse_uring_resubmit(
struct fuse_ring_queue *queue,
569 struct fuse_ring_ent *ent)
571 struct io_uring_sqe *sqe;
573 sqe = io_uring_get_sqe(&queue->ring);
581 queue->ring_pool->se->error = -EIO;
582 fuse_log(FUSE_LOG_ERR,
"Failed to get a ring SQEs\n");
587 fuse_uring_sqe_prepare(sqe, ent, ent->last_cmd);
589 switch (ent->last_cmd) {
590 case FUSE_IO_URING_CMD_REGISTER:
591 sqe->addr = (uint64_t)(ent->iov);
593 fuse_uring_sqe_set_req_data(fuse_uring_get_sqe_cmd(sqe),
596 case FUSE_IO_URING_CMD_COMMIT_AND_FETCH:
597 fuse_uring_sqe_set_req_data(fuse_uring_get_sqe_cmd(sqe),
598 queue->qid, ent->req_commit_id);
601 fuse_log(FUSE_LOG_ERR,
"Unknown command type: %d\n",
603 queue->ring_pool->se->error = -EINVAL;
610static void fuse_uring_handle_cqe(
struct fuse_ring_queue *queue,
611 struct io_uring_cqe *cqe)
613 struct fuse_ring_ent *ent = io_uring_cqe_get_data(cqe);
617 "cqe=%p io_uring_cqe_get_data returned NULL\n", cqe);
621 struct fuse_req *req = &ent->req;
625 struct fuse_in_header *in = (
struct fuse_in_header *)&rrh->in_out;
626 struct fuse_uring_ent_in_out *ent_in_out = &rrh->ring_ent_in_out;
628 ent->req_commit_id = ent_in_out->commit_id;
629 if (unlikely(ent->req_commit_id == 0)) {
634 fuse_log(FUSE_LOG_ERR,
"Received invalid commit_id=0\n");
638 memset(&req->flags, 0,
sizeof(req->flags));
639 memset(&req->u, 0,
sizeof(req->u));
640 req->flags.is_uring = 1;
643 req->interrupted = 0;
646 fuse_session_process_uring_cqe(fuse_ring->se, req, in, &rrh->op_in,
647 ent->op_payload, ent_in_out->payload_sz);
650static int fuse_uring_queue_handle_cqes(
struct fuse_ring_queue *queue)
653 struct fuse_session *se = ring_pool->se;
654 size_t num_completed = 0;
655 struct io_uring_cqe *cqe;
657 struct fuse_ring_ent *ent;
660 io_uring_for_each_cqe(&queue->ring, head, cqe) {
666 if (unlikely(err != 0)) {
667 if (err > 0 && ((uintptr_t)io_uring_cqe_get_data(cqe) ==
668 (
unsigned int)queue->eventfd)) {
678 ent = io_uring_cqe_get_data(cqe);
679 fuse_uring_resubmit(queue, ent);
686 if (err != -ENOTCONN) {
687 se->error = cqe->res;
695 fuse_uring_handle_cqe(queue, cqe);
700 io_uring_cq_advance(&queue->ring, num_completed);
702 return ret == 0 ? 0 : num_completed;
709static void fuse_uring_set_thread_core(
int qid)
716 rc = sched_setaffinity(0,
sizeof(cpu_set_t), &mask);
718 fuse_log(FUSE_LOG_ERR,
"Failed to bind qid=%d to its core: %s\n",
719 qid, strerror(errno));
722 const int policy = SCHED_IDLE;
723 const struct sched_param param = {
724 .sched_priority = sched_get_priority_min(policy),
730 rc = sched_setscheduler(0, policy, ¶m);
732 fuse_log(FUSE_LOG_ERR,
"Failed to set scheduler: %s\n",
740static int fuse_uring_init_queue(
struct fuse_ring_queue *queue)
743 struct fuse_session *se = ring->se;
745 size_t page_sz = sysconf(_SC_PAGESIZE);
747 queue->eventfd = eventfd(0, EFD_CLOEXEC);
748 if (queue->eventfd < 0) {
751 "Failed to create eventfd for qid %d: %s\n",
752 queue->qid, strerror(errno));
756 res = fuse_queue_setup_io_uring(&queue->ring, queue->qid,
757 ring->queue_depth, se->fd,
760 fuse_log(FUSE_LOG_ERR,
"qid=%d io_uring init failed\n",
765 queue->req_header_sz = ROUND_UP(
sizeof(
struct fuse_ring_ent),
768 for (
size_t idx = 0; idx < ring->queue_depth; idx++) {
769 struct fuse_ring_ent *ring_ent = &queue->ent[idx];
770 struct fuse_req *req = &ring_ent->req;
772 ring_ent->ring_queue = queue;
778 ring_ent->req_header =
779 numa_alloc_local(queue->req_header_sz);
780 ring_ent->req_payload_sz = ring->max_req_payload_sz;
782 ring_ent->op_payload =
783 numa_alloc_local(ring_ent->req_payload_sz);
786 pthread_mutex_init(&req->lock, NULL);
787 req->flags.is_uring = 1;
792 res = fuse_uring_register_queue(queue);
796 "Grave fuse-uring error on preparing SQEs, aborting\n");
801 return queue->ring.ring_fd;
804 close(queue->eventfd);
808static void *fuse_uring_thread(
void *arg)
810 struct fuse_ring_queue *queue = arg;
812 struct fuse_session *se = ring_pool->se;
814 char thread_name[16] = { 0 };
816 snprintf(thread_name, 16,
"fuse-ring-%d", queue->qid);
817 thread_name[15] =
'\0';
818 fuse_set_thread_name(thread_name);
820 fuse_uring_set_thread_core(queue->qid);
822 err = fuse_uring_init_queue(queue);
823 pthread_mutex_lock(&ring_pool->thread_start_mutex);
825 ring_pool->failed_threads++;
826 ring_pool->started_threads++;
827 pthread_cond_broadcast(&ring_pool->thread_start_cond);
828 pthread_mutex_unlock(&ring_pool->thread_start_mutex);
831 fuse_log(FUSE_LOG_ERR,
"qid=%d queue setup failed\n",
836 sem_wait(&ring_pool->init_sem);
839 while (!atomic_load_explicit(&se->mt_exited, memory_order_relaxed)) {
840 io_uring_submit_and_wait(&queue->ring, 1);
842 pthread_mutex_lock(&queue->ring_lock);
843 queue->cqe_processing =
true;
844 err = fuse_uring_queue_handle_cqes(queue);
845 queue->cqe_processing =
false;
846 pthread_mutex_unlock(&queue->ring_lock);
859static int fuse_uring_start_ring_threads(
struct fuse_ring_pool *ring)
863 for (
size_t qid = 0; qid < ring->nr_queues; qid++) {
864 struct fuse_ring_queue *queue = fuse_uring_get_queue(ring, qid);
866 rc = pthread_create(&queue->tid, NULL, fuse_uring_thread, queue);
874static int fuse_uring_sanity_check(
struct fuse_session *se)
876 if (se->uring.q_depth == 0) {
877 fuse_log(FUSE_LOG_ERR,
"io-uring queue depth must be > 0\n");
882 FUSE_URING_MAX_SQE128_CMD_DATA,
883 "SQE128_CMD_DATA has 80B cmd data");
888int fuse_uring_start(
struct fuse_session *se)
893 fuse_uring_sanity_check(se);
895 fuse_ring = fuse_create_ring(se);
896 if (fuse_ring == NULL) {
897 err = -EADDRNOTAVAIL;
901 se->uring.pool = fuse_ring;
904 sem_init(&fuse_ring->init_sem, 0, 0);
905 pthread_cond_init(&fuse_ring->thread_start_cond, NULL);
906 pthread_mutex_init(&fuse_ring->thread_start_mutex, NULL);
908 err = fuse_uring_start_ring_threads(fuse_ring);
915 pthread_mutex_lock(&fuse_ring->thread_start_mutex);
916 while (fuse_ring->started_threads < fuse_ring->nr_queues)
917 pthread_cond_wait(&fuse_ring->thread_start_cond,
918 &fuse_ring->thread_start_mutex);
920 if (fuse_ring->failed_threads != 0)
921 err = -EADDRNOTAVAIL;
922 pthread_mutex_unlock(&fuse_ring->thread_start_mutex);
927 fuse_session_destruct_uring(fuse_ring);
928 se->uring.pool = fuse_ring;
933int fuse_uring_stop(
struct fuse_session *se)
940 fuse_session_destruct_uring(ring);
945void fuse_uring_wake_ring_threads(
struct fuse_session *se)
950 for (
size_t qid = 0; qid < ring->nr_queues; qid++)
951 sem_post(&ring->init_sem);
ssize_t fuse_buf_copy(struct fuse_bufvec *dst, struct fuse_bufvec *src, enum fuse_buf_copy_flags flags)
void fuse_log(enum fuse_log_level level, const char *fmt,...)
void fuse_session_exit(struct fuse_session *se)
struct fuse_req * fuse_req_t
int fuse_req_get_payload(fuse_req_t req, char **payload, size_t *payload_sz, void **mr)