libfuse
fuse_loop_mt.c
1 /*
2  FUSE: Filesystem in Userspace
3  Copyright (C) 2001-2007 Miklos Szeredi <miklos@szeredi.hu>
4 
5  Implementation of the multi-threaded FUSE session loop.
6 
7  This program can be distributed under the terms of the GNU LGPLv2.
8  See the file COPYING.LIB.
9 */
10 
11 #include "fuse_config.h"
12 #include "fuse_lowlevel.h"
13 #include "fuse_misc.h"
14 #include "fuse_kernel.h"
15 #include "fuse_i.h"
16 
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <unistd.h>
21 #include <signal.h>
22 #include <semaphore.h>
23 #include <errno.h>
24 #include <sys/time.h>
25 #include <sys/ioctl.h>
26 #include <assert.h>
27 #include <limits.h>
28 
29 /* Environment var controlling the thread stack size */
30 #define ENVNAME_THREAD_STACK "FUSE_THREAD_STACK"
31 
32 #define FUSE_LOOP_MT_V2_IDENTIFIER INT_MAX - 2
33 #define FUSE_LOOP_MT_DEF_CLONE_FD 0
34 #define FUSE_LOOP_MT_DEF_MAX_THREADS 10
35 #define FUSE_LOOP_MT_DEF_IDLE_THREADS -1 /* thread destruction is disabled
36  * by default */
37 
38 /* an arbitrary large value that cannot be valid */
39 #define FUSE_LOOP_MT_MAX_THREADS (100U * 1000)
40 
41 struct fuse_worker {
42  struct fuse_worker *prev;
43  struct fuse_worker *next;
44  pthread_t thread_id;
45 
46  // We need to include fuse_buf so that we can properly free
47  // it when a thread is terminated by pthread_cancel().
48  struct fuse_buf fbuf;
49  struct fuse_chan *ch;
50  struct fuse_mt *mt;
51 };
52 
53 struct fuse_mt {
54  pthread_mutex_t lock;
55  int numworker;
56  int numavail;
57  struct fuse_session *se;
58  struct fuse_worker main;
59  sem_t finish;
60  int exit;
61  int error;
62  int clone_fd;
63  int max_idle;
64  int max_threads;
65 };
66 
67 static struct fuse_chan *fuse_chan_new(int fd)
68 {
69  struct fuse_chan *ch = (struct fuse_chan *) malloc(sizeof(*ch));
70  if (ch == NULL) {
71  fuse_log(FUSE_LOG_ERR, "fuse: failed to allocate channel\n");
72  return NULL;
73  }
74 
75  memset(ch, 0, sizeof(*ch));
76  ch->fd = fd;
77  ch->ctr = 1;
78  pthread_mutex_init(&ch->lock, NULL);
79 
80  return ch;
81 }
82 
83 struct fuse_chan *fuse_chan_get(struct fuse_chan *ch)
84 {
85  assert(ch->ctr > 0);
86  pthread_mutex_lock(&ch->lock);
87  ch->ctr++;
88  pthread_mutex_unlock(&ch->lock);
89 
90  return ch;
91 }
92 
93 void fuse_chan_put(struct fuse_chan *ch)
94 {
95  if (ch == NULL)
96  return;
97  pthread_mutex_lock(&ch->lock);
98  ch->ctr--;
99  if (!ch->ctr) {
100  pthread_mutex_unlock(&ch->lock);
101  close(ch->fd);
102  pthread_mutex_destroy(&ch->lock);
103  free(ch);
104  } else
105  pthread_mutex_unlock(&ch->lock);
106 }
107 
108 static void list_add_worker(struct fuse_worker *w, struct fuse_worker *next)
109 {
110  struct fuse_worker *prev = next->prev;
111  w->next = next;
112  w->prev = prev;
113  prev->next = w;
114  next->prev = w;
115 }
116 
117 static void list_del_worker(struct fuse_worker *w)
118 {
119  struct fuse_worker *prev = w->prev;
120  struct fuse_worker *next = w->next;
121  prev->next = next;
122  next->prev = prev;
123 }
124 
125 static int fuse_loop_start_thread(struct fuse_mt *mt);
126 
127 static void *fuse_do_work(void *data)
128 {
129  struct fuse_worker *w = (struct fuse_worker *) data;
130  struct fuse_mt *mt = w->mt;
131 
132  while (!fuse_session_exited(mt->se)) {
133  int isforget = 0;
134  int res;
135 
136  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
137  res = fuse_session_receive_buf_int(mt->se, &w->fbuf, w->ch);
138  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
139  if (res == -EINTR)
140  continue;
141  if (res <= 0) {
142  if (res < 0) {
143  fuse_session_exit(mt->se);
144  mt->error = res;
145  }
146  break;
147  }
148 
149  pthread_mutex_lock(&mt->lock);
150  if (mt->exit) {
151  pthread_mutex_unlock(&mt->lock);
152  return NULL;
153  }
154 
155  /*
156  * This disgusting hack is needed so that zillions of threads
157  * are not created on a burst of FORGET messages
158  */
159  if (!(w->fbuf.flags & FUSE_BUF_IS_FD)) {
160  struct fuse_in_header *in = w->fbuf.mem;
161 
162  if (in->opcode == FUSE_FORGET ||
163  in->opcode == FUSE_BATCH_FORGET)
164  isforget = 1;
165  }
166 
167  if (!isforget)
168  mt->numavail--;
169  if (mt->numavail == 0 && mt->numworker < mt->max_threads)
170  fuse_loop_start_thread(mt);
171  pthread_mutex_unlock(&mt->lock);
172 
173  fuse_session_process_buf_int(mt->se, &w->fbuf, w->ch);
174 
175  pthread_mutex_lock(&mt->lock);
176  if (!isforget)
177  mt->numavail++;
178 
179  /* creating and destroying threads is rather expensive - and there is
180  * not much gain from destroying existing threads. It is therefore
181  * discouraged to set max_idle to anything else than -1. If there
182  * is indeed a good reason to destruct threads it should be done
183  * delayed, a moving average might be useful for that.
184  */
185  if (mt->max_idle != -1 && mt->numavail > mt->max_idle && mt->numworker > 1) {
186  if (mt->exit) {
187  pthread_mutex_unlock(&mt->lock);
188  return NULL;
189  }
190  list_del_worker(w);
191  mt->numavail--;
192  mt->numworker--;
193  pthread_mutex_unlock(&mt->lock);
194 
195  pthread_detach(w->thread_id);
196  free(w->fbuf.mem);
197  fuse_chan_put(w->ch);
198  free(w);
199  return NULL;
200  }
201  pthread_mutex_unlock(&mt->lock);
202  }
203 
204  sem_post(&mt->finish);
205 
206  return NULL;
207 }
208 
209 int fuse_start_thread(pthread_t *thread_id, void *(*func)(void *), void *arg)
210 {
211  sigset_t oldset;
212  sigset_t newset;
213  int res;
214  pthread_attr_t attr;
215  char *stack_size;
216 
217  /* Override default stack size
218  * XXX: This should ideally be a parameter option. It is rather
219  * well hidden here.
220  */
221  pthread_attr_init(&attr);
222  stack_size = getenv(ENVNAME_THREAD_STACK);
223  if (stack_size && pthread_attr_setstacksize(&attr, atoi(stack_size)))
224  fuse_log(FUSE_LOG_ERR, "fuse: invalid stack size: %s\n", stack_size);
225 
226  /* Disallow signal reception in worker threads */
227  sigemptyset(&newset);
228  sigaddset(&newset, SIGTERM);
229  sigaddset(&newset, SIGINT);
230  sigaddset(&newset, SIGHUP);
231  sigaddset(&newset, SIGQUIT);
232  pthread_sigmask(SIG_BLOCK, &newset, &oldset);
233  res = pthread_create(thread_id, &attr, func, arg);
234  pthread_sigmask(SIG_SETMASK, &oldset, NULL);
235  pthread_attr_destroy(&attr);
236  if (res != 0) {
237  fuse_log(FUSE_LOG_ERR, "fuse: error creating thread: %s\n",
238  strerror(res));
239  return -1;
240  }
241 
242  return 0;
243 }
244 
245 static struct fuse_chan *fuse_clone_chan(struct fuse_mt *mt)
246 {
247  int res;
248  int clonefd;
249  uint32_t masterfd;
250  struct fuse_chan *newch;
251  const char *devname = "/dev/fuse";
252 
253 #ifndef O_CLOEXEC
254 #define O_CLOEXEC 0
255 #endif
256  clonefd = open(devname, O_RDWR | O_CLOEXEC);
257  if (clonefd == -1) {
258  fuse_log(FUSE_LOG_ERR, "fuse: failed to open %s: %s\n", devname,
259  strerror(errno));
260  return NULL;
261  }
262  fcntl(clonefd, F_SETFD, FD_CLOEXEC);
263 
264  masterfd = mt->se->fd;
265  res = ioctl(clonefd, FUSE_DEV_IOC_CLONE, &masterfd);
266  if (res == -1) {
267  fuse_log(FUSE_LOG_ERR, "fuse: failed to clone device fd: %s\n",
268  strerror(errno));
269  close(clonefd);
270  return NULL;
271  }
272  newch = fuse_chan_new(clonefd);
273  if (newch == NULL)
274  close(clonefd);
275 
276  return newch;
277 }
278 
279 static int fuse_loop_start_thread(struct fuse_mt *mt)
280 {
281  int res;
282 
283  struct fuse_worker *w = malloc(sizeof(struct fuse_worker));
284  if (!w) {
285  fuse_log(FUSE_LOG_ERR, "fuse: failed to allocate worker structure\n");
286  return -1;
287  }
288  memset(w, 0, sizeof(struct fuse_worker));
289  w->fbuf.mem = NULL;
290  w->mt = mt;
291 
292  w->ch = NULL;
293  if (mt->clone_fd) {
294  w->ch = fuse_clone_chan(mt);
295  if(!w->ch) {
296  /* Don't attempt this again */
297  fuse_log(FUSE_LOG_ERR, "fuse: trying to continue "
298  "without -o clone_fd.\n");
299  mt->clone_fd = 0;
300  }
301  }
302 
303  res = fuse_start_thread(&w->thread_id, fuse_do_work, w);
304  if (res == -1) {
305  fuse_chan_put(w->ch);
306  free(w);
307  return -1;
308  }
309  list_add_worker(w, &mt->main);
310  mt->numavail ++;
311  mt->numworker ++;
312 
313  return 0;
314 }
315 
316 static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w)
317 {
318  pthread_join(w->thread_id, NULL);
319  pthread_mutex_lock(&mt->lock);
320  list_del_worker(w);
321  pthread_mutex_unlock(&mt->lock);
322  free(w->fbuf.mem);
323  fuse_chan_put(w->ch);
324  free(w);
325 }
326 
327 int fuse_session_loop_mt_312(struct fuse_session *se, struct fuse_loop_config *config);
328 FUSE_SYMVER("fuse_session_loop_mt_312", "fuse_session_loop_mt@@FUSE_3.12")
329 int fuse_session_loop_mt_312(struct fuse_session *se, struct fuse_loop_config *config)
330 {
331 int err;
332  struct fuse_mt mt;
333  struct fuse_worker *w;
334  int created_config = 0;
335 
336  if (config) {
337  err = fuse_loop_cfg_verify(config);
338  if (err)
339  return err;
340  } else {
341  /* The caller does not care about parameters - use the default */
342  config = fuse_loop_cfg_create();
343  created_config = 1;
344  }
345 
346 
347  memset(&mt, 0, sizeof(struct fuse_mt));
348  mt.se = se;
349  mt.clone_fd = config->clone_fd;
350  mt.error = 0;
351  mt.numworker = 0;
352  mt.numavail = 0;
353  mt.max_idle = config->max_idle_threads;
354  mt.max_threads = config->max_threads;
355  mt.main.thread_id = pthread_self();
356  mt.main.prev = mt.main.next = &mt.main;
357  sem_init(&mt.finish, 0, 0);
358  pthread_mutex_init(&mt.lock, NULL);
359 
360  pthread_mutex_lock(&mt.lock);
361  err = fuse_loop_start_thread(&mt);
362  pthread_mutex_unlock(&mt.lock);
363  if (!err) {
364  /* sem_wait() is interruptible */
365  while (!fuse_session_exited(se))
366  sem_wait(&mt.finish);
367 
368  pthread_mutex_lock(&mt.lock);
369  for (w = mt.main.next; w != &mt.main; w = w->next)
370  pthread_cancel(w->thread_id);
371  mt.exit = 1;
372  pthread_mutex_unlock(&mt.lock);
373 
374  while (mt.main.next != &mt.main)
375  fuse_join_worker(&mt, mt.main.next);
376 
377  err = mt.error;
378  }
379 
380  pthread_mutex_destroy(&mt.lock);
381  sem_destroy(&mt.finish);
382  if(se->error != 0)
383  err = se->error;
384  fuse_session_reset(se);
385 
386  if (created_config) {
387  fuse_loop_cfg_destroy(config);
388  config = NULL;
389  }
390 
391  return err;
392 }
393 
394 int fuse_session_loop_mt_32(struct fuse_session *se, struct fuse_loop_config_v1 *config_v1);
395 FUSE_SYMVER("fuse_session_loop_mt_32", "fuse_session_loop_mt@FUSE_3.2")
396 int fuse_session_loop_mt_32(struct fuse_session *se, struct fuse_loop_config_v1 *config_v1)
397 {
398  int err;
399  struct fuse_loop_config *config = NULL;
400 
401  if (config_v1 != NULL) {
402  /* convert the given v1 config */
403  config = fuse_loop_cfg_create();
404  if (config == NULL)
405  return ENOMEM;
406 
407  fuse_loop_cfg_convert(config, config_v1);
408  }
409 
410  err = fuse_session_loop_mt_312(se, config);
411 
412  fuse_loop_cfg_destroy(config);
413 
414  return err;
415 }
416 
417 
418 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd);
419 FUSE_SYMVER("fuse_session_loop_mt_31", "fuse_session_loop_mt@FUSE_3.0")
420 int fuse_session_loop_mt_31(struct fuse_session *se, int clone_fd)
421 {
422  struct fuse_loop_config *config = fuse_loop_cfg_create();
423  if (clone_fd > 0)
425  return fuse_session_loop_mt_312(se, config);
426 }
427 
429 {
430  struct fuse_loop_config *config = calloc(1, sizeof(*config));
431  if (config == NULL)
432  return NULL;
433 
434  config->version_id = FUSE_LOOP_MT_V2_IDENTIFIER;
435  config->max_idle_threads = FUSE_LOOP_MT_DEF_IDLE_THREADS;
436  config->max_threads = FUSE_LOOP_MT_DEF_MAX_THREADS;
437  config->clone_fd = FUSE_LOOP_MT_DEF_CLONE_FD;
438 
439  return config;
440 }
441 
442 void fuse_loop_cfg_destroy(struct fuse_loop_config *config)
443 {
444  free(config);
445 }
446 
447 int fuse_loop_cfg_verify(struct fuse_loop_config *config)
448 {
449  if (config->version_id != FUSE_LOOP_MT_V2_IDENTIFIER)
450  return -EINVAL;
451 
452  return 0;
453 }
454 
455 void fuse_loop_cfg_convert(struct fuse_loop_config *config,
456  struct fuse_loop_config_v1 *v1_conf)
457 {
458  fuse_loop_cfg_set_idle_threads(config, v1_conf->max_idle_threads);
459 
460  fuse_loop_cfg_set_clone_fd(config, v1_conf->clone_fd);
461 }
462 
464  unsigned int value)
465 {
466  if (value > FUSE_LOOP_MT_MAX_THREADS) {
467  if (value != UINT_MAX)
468  fuse_log(FUSE_LOG_ERR,
469  "Ignoring invalid max threads value "
470  "%u > max (%u).\n", value,
471  FUSE_LOOP_MT_MAX_THREADS);
472  return;
473  }
474  config->max_idle_threads = value;
475 }
476 
478  unsigned int value)
479 {
480  config->max_threads = value;
481 }
482 
483 void fuse_loop_cfg_set_clone_fd(struct fuse_loop_config *config,
484  unsigned int value)
485 {
486  config->clone_fd = value;
487 }
488 
void fuse_loop_cfg_convert(struct fuse_loop_config *config, struct fuse_loop_config_v1 *v1_conf)
Definition: fuse_loop_mt.c:454
void fuse_loop_cfg_set_idle_threads(struct fuse_loop_config *config, unsigned int value)
Definition: fuse_loop_mt.c:462
@ FUSE_BUF_IS_FD
Definition: fuse_common.h:672
struct fuse_loop_config * fuse_loop_cfg_create(void)
Definition: fuse_loop_mt.c:427
void fuse_loop_cfg_set_clone_fd(struct fuse_loop_config *config, unsigned int value)
Definition: fuse_loop_mt.c:482
void fuse_loop_cfg_destroy(struct fuse_loop_config *config)
Definition: fuse_loop_mt.c:441
void fuse_loop_cfg_set_max_threads(struct fuse_loop_config *config, unsigned int value)
Definition: fuse_loop_mt.c:476
void fuse_log(enum fuse_log_level level, const char *fmt,...)
Definition: fuse_log.c:33
void fuse_session_exit(struct fuse_session *se)
int fuse_session_exited(struct fuse_session *se)
void fuse_session_reset(struct fuse_session *se)
unsigned int max_threads
Definition: fuse_i.h:138
unsigned int max_idle_threads
Definition: fuse_common.h:146