fuse_daemon.rs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. use crate::{fuse_fs::FuseFs, PathExt, DEFAULT_CONFIG};
  2. use btfproto::server::FsProvider;
  3. use btlib::{BlockPath, Result};
  4. use fuse_backend_rs::{
  5. api::server::Server,
  6. transport::{self, FuseChannel, FuseSession},
  7. };
  8. use futures::future::FutureExt;
  9. use log::error;
  10. use std::path::PathBuf;
  11. use std::{
  12. ffi::{c_char, c_int, CString},
  13. fs::File,
  14. os::{fd::FromRawFd, unix::ffi::OsStrExt},
  15. path::Path,
  16. sync::Arc,
  17. };
  18. use tokio::task::JoinSet;
  19. pub use private::FuseDaemon;
  20. mod private {
  21. use super::*;
  22. #[link(name = "fuse3")]
  23. extern "C" {
  24. /// Opens a channel to the kernel.
  25. fn fuse_open_channel(mountpoint: *const c_char, options: *const c_char) -> c_int;
  26. }
  27. /// Calls into libfuse3 to mount this file system at the given path. The file descriptor to use
  28. /// to communicate with the kernel is returned.
  29. fn mount_at<P: AsRef<Path>>(mnt_point: P) -> File {
  30. let mountpoint = CString::new(mnt_point.as_ref().as_os_str().as_bytes()).unwrap();
  31. let options = CString::new(DEFAULT_CONFIG.mnt_options).unwrap();
  32. let raw_fd = unsafe { fuse_open_channel(mountpoint.as_ptr(), options.as_ptr()) };
  33. unsafe { File::from_raw_fd(raw_fd) }
  34. }
  35. pub struct FuseDaemon {
  36. set: JoinSet<()>,
  37. }
  38. impl FuseDaemon {
  39. const FSNAME: &str = "btfuse";
  40. const FSTYPE: &str = "bt";
  41. const NUM_THREADS: usize = 1;
  42. pub fn new<P: 'static + FsProvider>(
  43. mnt_path: PathBuf,
  44. fallback_path: Arc<BlockPath>,
  45. provider: P,
  46. ) -> Result<Self> {
  47. let server = Arc::new(Server::new(FuseFs::new(provider, fallback_path)));
  48. let session = Self::session(mnt_path)?;
  49. let mut set = JoinSet::new();
  50. for _ in 0..Self::NUM_THREADS {
  51. let server = server.clone();
  52. let channel = session.new_channel()?;
  53. let future =
  54. tokio::task::spawn_blocking(move || Self::server_loop(server, channel));
  55. let future = future.map(|result| {
  56. if let Err(err) = result {
  57. error!("server_loop produced an error: {err}");
  58. }
  59. });
  60. set.spawn(future);
  61. }
  62. Ok(Self { set })
  63. }
  64. fn session<T: AsRef<Path>>(mnt_path: T) -> Result<FuseSession> {
  65. mnt_path.try_create_dir()?;
  66. let mut session =
  67. FuseSession::new(mnt_path.as_ref(), Self::FSNAME, Self::FSTYPE, false)?;
  68. session.set_fuse_file(mount_at(mnt_path));
  69. Ok(session)
  70. }
  71. /// Opens a channel to the kernel and processes messages received in an infinite loop.
  72. ///
  73. /// # Warning
  74. /// Because the future returned by this method is passed to `unsafe_cast`, you must not
  75. /// hold any type which is not `Send` and `Sync` across an `await` point.
  76. fn server_loop<P: 'static + FsProvider>(
  77. server: Arc<Server<FuseFs<P>>>,
  78. mut channel: FuseChannel,
  79. ) {
  80. loop {
  81. match channel.get_request() {
  82. Ok(Some((reader, writer))) => {
  83. // Safety: reader and writer are not mutated while the future returned from
  84. // async_handle_message is alive.
  85. if let Err(err) = server.handle_message(reader, writer.into(), None, None) {
  86. error!("error while handling FUSE message: {err}");
  87. }
  88. }
  89. Ok(None) => break,
  90. Err(err) => {
  91. match err {
  92. // Occurs when the file system is unmounted.
  93. transport::Error::SessionFailure(_) => break,
  94. _ => error!("unexpected error from Channel::get_request: {err}"),
  95. }
  96. }
  97. }
  98. }
  99. }
  100. pub async fn finished(&mut self) {
  101. while self.set.join_next().await.is_some() {}
  102. }
  103. }
  104. }