// SPDX-License-Identifier: AGPL-3.0-or-later use crate::fuse_fs::FuseFs; use btfproto::server::FsProvider; use btlib::{bterr, BlockPath, Result}; use fuse_backend_rs::{ api::server::Server, transport::{self, FuseChannel, FuseSession}, }; use futures::future::FutureExt; use log::{debug, error}; use std::path::PathBuf; use std::{ ffi::{c_char, c_int, CString}, fs::File, os::{fd::FromRawFd, unix::ffi::OsStrExt}, path::Path, sync::Arc, }; use tokio::{sync::oneshot, task::JoinSet}; pub use private::FuseDaemon; mod private { use std::{fs::DirBuilder, num::NonZeroUsize}; use super::*; #[link(name = "fuse3")] extern "C" { /// Opens a channel to the kernel. fn fuse_open_channel(mountpoint: *const c_char, options: *const c_char) -> c_int; } /// Calls into libfuse3 to mount this file system at the given path. The file descriptor to use /// to communicate with the kernel is returned. fn mount_at>(mnt_point: P, mnt_options: &str) -> Result { let mountpoint = CString::new(mnt_point.as_ref().as_os_str().as_bytes())?; let options = CString::new(mnt_options)?; // Safety: mountpoint and options are both valid C strings. let raw_fd = unsafe { fuse_open_channel(mountpoint.as_ptr(), options.as_ptr()) }; // According to the fuse3 docs only -1 will be returned on error. // source: http://libfuse.github.io/doxygen/fuse_8h.html#a9e8c9af40b22631f9f2636019cd073b6 if raw_fd < 0 { return Err(bterr!("an error occurred in fuse_open_channel")); } // Safety: raw_rd is positive, indicating that fuse_open_channel succeeded. let file = unsafe { File::from_raw_fd(raw_fd) }; Ok(file) } pub struct FuseDaemon { set: JoinSet<()>, session: FuseSession, } impl FuseDaemon { const FSNAME: &str = "btfuse"; const FSTYPE: &str = "bt"; pub fn new( mntdir: PathBuf, mntoptions: &str, num_tasks: Option, fallback_path: Arc, mounted_signal: Option>, provider: P, ) -> Result { let server = Arc::new(Server::new(FuseFs::new(provider, fallback_path))); let session = Self::session(mntdir, mntoptions)?; if let Some(tx) = mounted_signal { tx.send(()) .map_err(|_| bterr!("failed to send mounted signal"))?; } let mut set = JoinSet::new(); let num_tasks = if let Some(num_tasks) = num_tasks { num_tasks } else { std::thread::available_parallelism()? }; log::debug!("spawning {num_tasks} blocking tasks"); for task_num in 0..num_tasks.get() { let server = server.clone(); let channel = session.new_channel()?; let future = tokio::task::spawn_blocking(move || { Self::server_loop(task_num, server, channel) }); let future = future.map(|result| { if let Err(err) = result { error!("server_loop produced an error: {err}"); } }); set.spawn(future); } Ok(Self { set, session }) } fn session>(mntdir: T, mntoptions: &str) -> Result { DirBuilder::new() .recursive(true) .create(mntdir.as_ref()) .map_err(|err| { bterr!(err).context(format!( "failed to create mntdir: '{}'", mntdir.as_ref().display() )) })?; let mut session = FuseSession::new(mntdir.as_ref(), Self::FSNAME, Self::FSTYPE, false)?; session.set_fuse_file(mount_at(mntdir, mntoptions)?); Ok(session) } /// Opens a channel to the kernel and processes messages received in an infinite loop. fn server_loop( task_num: usize, server: Arc>>, mut channel: FuseChannel, ) { loop { match channel.get_request() { Ok(Some((reader, writer))) => { // Safety: reader and writer are not mutated while the future returned from // async_handle_message is alive. if let Err(err) = server.handle_message(reader, writer.into(), None, None) { error!("error while handling FUSE message: {err}"); } } Ok(None) => break, Err(err) => { match err { // Occurs when the file system is unmounted. transport::Error::SessionFailure(_) => break, _ => error!("unexpected error from Channel::get_request: {err}"), } } } } debug!("server_loop {task_num} exiting"); } pub async fn finished(&mut self) { while self.set.join_next().await.is_some() {} } } impl Drop for FuseDaemon { fn drop(&mut self) { if let Err(err) = self.session.wake() { error!("failed to wake FuseSession: {err}"); } } } }