123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- // SPDX-License-Identifier: AGPL-3.0-or-later
- use crate::fuse_fs::FuseFs;
- use btfproto::server::FsProvider;
- use btlib::{bterr, BlockPath, Result};
- use fuse_backend_rs::{
- api::server::Server,
- transport::{self, FuseChannel, FuseSession},
- };
- use futures::future::FutureExt;
- use log::{debug, error};
- use std::path::PathBuf;
- use std::{
- ffi::{c_char, c_int, CString},
- fs::File,
- os::{fd::FromRawFd, unix::ffi::OsStrExt},
- path::Path,
- sync::Arc,
- };
- use tokio::{sync::oneshot, task::JoinSet};
- pub use private::FuseDaemon;
- mod private {
- use std::{fs::DirBuilder, num::NonZeroUsize};
- use super::*;
- #[link(name = "fuse3")]
- extern "C" {
- /// Opens a channel to the kernel.
- fn fuse_open_channel(mountpoint: *const c_char, options: *const c_char) -> c_int;
- }
- /// Calls into libfuse3 to mount this file system at the given path. The file descriptor to use
- /// to communicate with the kernel is returned.
- fn mount_at<P: AsRef<Path>>(mnt_point: P, mnt_options: &str) -> Result<File> {
- let mountpoint = CString::new(mnt_point.as_ref().as_os_str().as_bytes())?;
- let options = CString::new(mnt_options)?;
- // Safety: mountpoint and options are both valid C strings.
- let raw_fd = unsafe { fuse_open_channel(mountpoint.as_ptr(), options.as_ptr()) };
- // According to the fuse3 docs only -1 will be returned on error.
- // source: http://libfuse.github.io/doxygen/fuse_8h.html#a9e8c9af40b22631f9f2636019cd073b6
- if raw_fd < 0 {
- return Err(bterr!("an error occurred in fuse_open_channel"));
- }
- // Safety: raw_rd is positive, indicating that fuse_open_channel succeeded.
- let file = unsafe { File::from_raw_fd(raw_fd) };
- Ok(file)
- }
- pub struct FuseDaemon {
- set: JoinSet<()>,
- session: FuseSession,
- }
- impl FuseDaemon {
- const FSNAME: &str = "btfuse";
- const FSTYPE: &str = "bt";
- pub fn new<P: 'static + FsProvider>(
- mntdir: PathBuf,
- mntoptions: &str,
- num_tasks: Option<NonZeroUsize>,
- fallback_path: Arc<BlockPath>,
- mounted_signal: Option<oneshot::Sender<()>>,
- provider: P,
- ) -> Result<Self> {
- let server = Arc::new(Server::new(FuseFs::new(provider, fallback_path)));
- let session = Self::session(mntdir, mntoptions)?;
- if let Some(tx) = mounted_signal {
- tx.send(())
- .map_err(|_| bterr!("failed to send mounted signal"))?;
- }
- let mut set = JoinSet::new();
- let num_tasks = if let Some(num_tasks) = num_tasks {
- num_tasks
- } else {
- std::thread::available_parallelism()?
- };
- log::debug!("spawning {num_tasks} blocking tasks");
- for task_num in 0..num_tasks.get() {
- let server = server.clone();
- let channel = session.new_channel()?;
- let future = tokio::task::spawn_blocking(move || {
- Self::server_loop(task_num, server, channel)
- });
- let future = future.map(|result| {
- if let Err(err) = result {
- error!("server_loop produced an error: {err}");
- }
- });
- set.spawn(future);
- }
- Ok(Self { set, session })
- }
- fn session<T: AsRef<Path>>(mntdir: T, mntoptions: &str) -> Result<FuseSession> {
- DirBuilder::new()
- .recursive(true)
- .create(mntdir.as_ref())
- .map_err(|err| {
- bterr!(err).context(format!(
- "failed to create mntdir: '{}'",
- mntdir.as_ref().display()
- ))
- })?;
- let mut session = FuseSession::new(mntdir.as_ref(), Self::FSNAME, Self::FSTYPE, false)?;
- session.set_fuse_file(mount_at(mntdir, mntoptions)?);
- Ok(session)
- }
- /// Opens a channel to the kernel and processes messages received in an infinite loop.
- fn server_loop<P: 'static + FsProvider>(
- task_num: usize,
- server: Arc<Server<FuseFs<P>>>,
- mut channel: FuseChannel,
- ) {
- loop {
- match channel.get_request() {
- Ok(Some((reader, writer))) => {
- // Safety: reader and writer are not mutated while the future returned from
- // async_handle_message is alive.
- if let Err(err) = server.handle_message(reader, writer.into(), None, None) {
- error!("error while handling FUSE message: {err}");
- }
- }
- Ok(None) => break,
- Err(err) => {
- match err {
- // Occurs when the file system is unmounted.
- transport::Error::SessionFailure(_) => break,
- _ => error!("unexpected error from Channel::get_request: {err}"),
- }
- }
- }
- }
- debug!("server_loop {task_num} exiting");
- }
- pub async fn finished(&mut self) {
- while self.set.join_next().await.is_some() {}
- }
- }
- impl Drop for FuseDaemon {
- fn drop(&mut self) {
- if let Err(err) = self.session.wake() {
- error!("failed to wake FuseSession: {err}");
- }
- }
- }
- }
|