123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- use crate::{fuse_fs::FuseFs, PathExt, DEFAULT_CONFIG};
- use btfproto::server::FsProvider;
- use btlib::{BlockPath, Result};
- use fuse_backend_rs::{
- api::server::Server,
- transport::{self, FuseChannel, FuseSession},
- };
- use futures::future::FutureExt;
- use log::error;
- use std::path::PathBuf;
- use std::{
- ffi::{c_char, c_int, CString},
- fs::File,
- os::{fd::FromRawFd, unix::ffi::OsStrExt},
- path::Path,
- sync::Arc,
- };
- use tokio::task::JoinSet;
- pub use private::FuseDaemon;
- mod private {
- use super::*;
- #[link(name = "fuse3")]
- extern "C" {
-
- fn fuse_open_channel(mountpoint: *const c_char, options: *const c_char) -> c_int;
- }
-
-
- fn mount_at<P: AsRef<Path>>(mnt_point: P) -> File {
- let mountpoint = CString::new(mnt_point.as_ref().as_os_str().as_bytes()).unwrap();
- let options = CString::new(DEFAULT_CONFIG.mnt_options).unwrap();
- let raw_fd = unsafe { fuse_open_channel(mountpoint.as_ptr(), options.as_ptr()) };
- unsafe { File::from_raw_fd(raw_fd) }
- }
- pub struct FuseDaemon {
- set: JoinSet<()>,
- }
- impl FuseDaemon {
- const FSNAME: &str = "btfuse";
- const FSTYPE: &str = "bt";
- pub fn new<P: 'static + FsProvider>(
- mnt_path: PathBuf,
- num_threads: usize,
- fallback_path: Arc<BlockPath>,
- provider: P,
- ) -> Result<Self> {
- let server = Arc::new(Server::new(FuseFs::new(provider, fallback_path)));
- let session = Self::session(mnt_path)?;
- let mut set = JoinSet::new();
- for _ in 0..num_threads {
- let server = server.clone();
- let channel = session.new_channel()?;
- let future =
- tokio::task::spawn_blocking(move || Self::server_loop(server, channel));
- let future = future.map(|result| {
- if let Err(err) = result {
- error!("server_loop produced an error: {err}");
- }
- });
- set.spawn(future);
- }
- Ok(Self { set })
- }
- fn session<T: AsRef<Path>>(mnt_path: T) -> Result<FuseSession> {
- mnt_path.try_create_dir()?;
- let mut session =
- FuseSession::new(mnt_path.as_ref(), Self::FSNAME, Self::FSTYPE, false)?;
- session.set_fuse_file(mount_at(mnt_path));
- Ok(session)
- }
-
- fn server_loop<P: 'static + FsProvider>(
- server: Arc<Server<FuseFs<P>>>,
- mut channel: FuseChannel,
- ) {
- loop {
- match channel.get_request() {
- Ok(Some((reader, writer))) => {
-
-
- if let Err(err) = server.handle_message(reader, writer.into(), None, None) {
- error!("error while handling FUSE message: {err}");
- }
- }
- Ok(None) => break,
- Err(err) => {
- match err {
-
- transport::Error::SessionFailure(_) => break,
- _ => error!("unexpected error from Channel::get_request: {err}"),
- }
- }
- }
- }
- }
- pub async fn finished(&mut self) {
- while self.set.join_next().await.is_some() {}
- }
- }
- }
|