// SPDX-License-Identifier: AGPL-3.0-or-later use crate::{fuse_fs::FuseFs, PathExt, DEFAULT_CONFIG}; use btfproto::server::FsProvider; use btlib::{BlockPath, Result}; use fuse_backend_rs::{ api::server::Server, transport::{self, FuseChannel, FuseSession}, }; use futures::future::FutureExt; use log::error; use std::path::PathBuf; use std::{ ffi::{c_char, c_int, CString}, fs::File, os::{fd::FromRawFd, unix::ffi::OsStrExt}, path::Path, sync::Arc, }; use tokio::task::JoinSet; pub use private::FuseDaemon; mod private { use super::*; #[link(name = "fuse3")] extern "C" { /// Opens a channel to the kernel. fn fuse_open_channel(mountpoint: *const c_char, options: *const c_char) -> c_int; } /// Calls into libfuse3 to mount this file system at the given path. The file descriptor to use /// to communicate with the kernel is returned. fn mount_at>(mnt_point: P) -> File { let mountpoint = CString::new(mnt_point.as_ref().as_os_str().as_bytes()).unwrap(); let options = CString::new(DEFAULT_CONFIG.mnt_options).unwrap(); let raw_fd = unsafe { fuse_open_channel(mountpoint.as_ptr(), options.as_ptr()) }; unsafe { File::from_raw_fd(raw_fd) } } pub struct FuseDaemon { set: JoinSet<()>, } impl FuseDaemon { const FSNAME: &str = "btfuse"; const FSTYPE: &str = "bt"; pub fn new( mnt_path: PathBuf, num_threads: usize, fallback_path: Arc, provider: P, ) -> Result { let server = Arc::new(Server::new(FuseFs::new(provider, fallback_path))); let session = Self::session(mnt_path)?; let mut set = JoinSet::new(); for _ in 0..num_threads { let server = server.clone(); let channel = session.new_channel()?; let future = tokio::task::spawn_blocking(move || Self::server_loop(server, channel)); let future = future.map(|result| { if let Err(err) = result { error!("server_loop produced an error: {err}"); } }); set.spawn(future); } Ok(Self { set }) } fn session>(mnt_path: T) -> Result { mnt_path.try_create_dir()?; let mut session = FuseSession::new(mnt_path.as_ref(), Self::FSNAME, Self::FSTYPE, false)?; session.set_fuse_file(mount_at(mnt_path)); Ok(session) } /// Opens a channel to the kernel and processes messages received in an infinite loop. fn server_loop( server: Arc>>, mut channel: FuseChannel, ) { loop { match channel.get_request() { Ok(Some((reader, writer))) => { // Safety: reader and writer are not mutated while the future returned from // async_handle_message is alive. if let Err(err) = server.handle_message(reader, writer.into(), None, None) { error!("error while handling FUSE message: {err}"); } } Ok(None) => break, Err(err) => { match err { // Occurs when the file system is unmounted. transport::Error::SessionFailure(_) => break, _ => error!("unexpected error from Channel::get_request: {err}"), } } } } } pub async fn finished(&mut self) { while self.set.join_next().await.is_some() {} } } }