fuse_daemon.rs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. // SPDX-License-Identifier: AGPL-3.0-or-later
  2. use crate::fuse_fs::FuseFs;
  3. use btfproto::server::FsProvider;
  4. use btlib::{bterr, BlockPath, Result};
  5. use fuse_backend_rs::{
  6. api::server::Server,
  7. transport::{self, FuseChannel, FuseSession},
  8. };
  9. use futures::future::FutureExt;
  10. use log::{debug, error};
  11. use std::path::PathBuf;
  12. use std::{
  13. ffi::{c_char, c_int, CString},
  14. fs::File,
  15. os::{fd::FromRawFd, unix::ffi::OsStrExt},
  16. path::Path,
  17. sync::Arc,
  18. };
  19. use tokio::{sync::oneshot, task::JoinSet};
  20. pub use private::FuseDaemon;
  21. mod private {
  22. use std::{fs::DirBuilder, num::NonZeroUsize};
  23. use super::*;
  24. #[link(name = "fuse3")]
  25. extern "C" {
  26. /// Opens a channel to the kernel.
  27. fn fuse_open_channel(mountpoint: *const c_char, options: *const c_char) -> c_int;
  28. }
  29. /// Calls into libfuse3 to mount this file system at the given path. The file descriptor to use
  30. /// to communicate with the kernel is returned.
  31. fn mount_at<P: AsRef<Path>>(mnt_point: P, mnt_options: &str) -> Result<File> {
  32. let mountpoint = CString::new(mnt_point.as_ref().as_os_str().as_bytes())?;
  33. let options = CString::new(mnt_options)?;
  34. // Safety: mountpoint and options are both valid C strings.
  35. let raw_fd = unsafe { fuse_open_channel(mountpoint.as_ptr(), options.as_ptr()) };
  36. // According to the fuse3 docs only -1 will be returned on error.
  37. // source: http://libfuse.github.io/doxygen/fuse_8h.html#a9e8c9af40b22631f9f2636019cd073b6
  38. if raw_fd < 0 {
  39. return Err(bterr!("an error occurred in fuse_open_channel"));
  40. }
  41. // Safety: raw_rd is positive, indicating that fuse_open_channel succeeded.
  42. let file = unsafe { File::from_raw_fd(raw_fd) };
  43. Ok(file)
  44. }
  45. pub struct FuseDaemon {
  46. set: JoinSet<()>,
  47. session: FuseSession,
  48. }
  49. impl FuseDaemon {
  50. const FSNAME: &str = "btfuse";
  51. const FSTYPE: &str = "bt";
  52. pub fn new<P: 'static + FsProvider>(
  53. mntdir: PathBuf,
  54. mntoptions: &str,
  55. num_tasks: Option<NonZeroUsize>,
  56. fallback_path: Arc<BlockPath>,
  57. mounted_signal: Option<oneshot::Sender<()>>,
  58. provider: P,
  59. ) -> Result<Self> {
  60. let server = Arc::new(Server::new(FuseFs::new(provider, fallback_path)));
  61. let session = Self::session(mntdir, mntoptions)?;
  62. if let Some(tx) = mounted_signal {
  63. tx.send(())
  64. .map_err(|_| bterr!("failed to send mounted signal"))?;
  65. }
  66. let mut set = JoinSet::new();
  67. let num_tasks = if let Some(num_tasks) = num_tasks {
  68. num_tasks
  69. } else {
  70. std::thread::available_parallelism()?
  71. };
  72. log::debug!("spawning {num_tasks} blocking tasks");
  73. for task_num in 0..num_tasks.get() {
  74. let server = server.clone();
  75. let channel = session.new_channel()?;
  76. let future = tokio::task::spawn_blocking(move || {
  77. Self::server_loop(task_num, server, channel)
  78. });
  79. let future = future.map(|result| {
  80. if let Err(err) = result {
  81. error!("server_loop produced an error: {err}");
  82. }
  83. });
  84. set.spawn(future);
  85. }
  86. Ok(Self { set, session })
  87. }
  88. fn session<T: AsRef<Path>>(mntdir: T, mntoptions: &str) -> Result<FuseSession> {
  89. DirBuilder::new()
  90. .recursive(true)
  91. .create(mntdir.as_ref())
  92. .map_err(|err| {
  93. bterr!(err).context(format!(
  94. "failed to create mntdir: '{}'",
  95. mntdir.as_ref().display()
  96. ))
  97. })?;
  98. let mut session = FuseSession::new(mntdir.as_ref(), Self::FSNAME, Self::FSTYPE, false)?;
  99. session.set_fuse_file(mount_at(mntdir, mntoptions)?);
  100. Ok(session)
  101. }
  102. /// Opens a channel to the kernel and processes messages received in an infinite loop.
  103. fn server_loop<P: 'static + FsProvider>(
  104. task_num: usize,
  105. server: Arc<Server<FuseFs<P>>>,
  106. mut channel: FuseChannel,
  107. ) {
  108. loop {
  109. match channel.get_request() {
  110. Ok(Some((reader, writer))) => {
  111. // Safety: reader and writer are not mutated while the future returned from
  112. // async_handle_message is alive.
  113. if let Err(err) = server.handle_message(reader, writer.into(), None, None) {
  114. error!("error while handling FUSE message: {err}");
  115. }
  116. }
  117. Ok(None) => break,
  118. Err(err) => {
  119. match err {
  120. // Occurs when the file system is unmounted.
  121. transport::Error::SessionFailure(_) => break,
  122. _ => error!("unexpected error from Channel::get_request: {err}"),
  123. }
  124. }
  125. }
  126. }
  127. debug!("server_loop {task_num} exiting");
  128. }
  129. pub async fn finished(&mut self) {
  130. while self.set.join_next().await.is_some() {}
  131. }
  132. }
  133. impl Drop for FuseDaemon {
  134. fn drop(&mut self) {
  135. if let Err(err) = self.session.wake() {
  136. error!("failed to wake FuseSession: {err}");
  137. }
  138. }
  139. }
  140. }