fuse_daemon.rs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. // SPDX-License-Identifier: AGPL-3.0-or-later
  2. use crate::{fuse_fs::FuseFs, PathExt, DEFAULT_CONFIG};
  3. use btfproto::server::FsProvider;
  4. use btlib::{BlockPath, Result};
  5. use fuse_backend_rs::{
  6. api::server::Server,
  7. transport::{self, FuseChannel, FuseSession},
  8. };
  9. use futures::future::FutureExt;
  10. use log::error;
  11. use std::path::PathBuf;
  12. use std::{
  13. ffi::{c_char, c_int, CString},
  14. fs::File,
  15. os::{fd::FromRawFd, unix::ffi::OsStrExt},
  16. path::Path,
  17. sync::Arc,
  18. };
  19. use tokio::task::JoinSet;
  20. pub use private::FuseDaemon;
  21. mod private {
  22. use super::*;
  23. #[link(name = "fuse3")]
  24. extern "C" {
  25. /// Opens a channel to the kernel.
  26. fn fuse_open_channel(mountpoint: *const c_char, options: *const c_char) -> c_int;
  27. }
  28. /// Calls into libfuse3 to mount this file system at the given path. The file descriptor to use
  29. /// to communicate with the kernel is returned.
  30. fn mount_at<P: AsRef<Path>>(mnt_point: P) -> File {
  31. let mountpoint = CString::new(mnt_point.as_ref().as_os_str().as_bytes()).unwrap();
  32. let options = CString::new(DEFAULT_CONFIG.mnt_options).unwrap();
  33. let raw_fd = unsafe { fuse_open_channel(mountpoint.as_ptr(), options.as_ptr()) };
  34. unsafe { File::from_raw_fd(raw_fd) }
  35. }
  36. pub struct FuseDaemon {
  37. set: JoinSet<()>,
  38. }
  39. impl FuseDaemon {
  40. const FSNAME: &str = "btfuse";
  41. const FSTYPE: &str = "bt";
  42. pub fn new<P: 'static + FsProvider>(
  43. mnt_path: PathBuf,
  44. num_threads: usize,
  45. fallback_path: Arc<BlockPath>,
  46. provider: P,
  47. ) -> Result<Self> {
  48. let server = Arc::new(Server::new(FuseFs::new(provider, fallback_path)));
  49. let session = Self::session(mnt_path)?;
  50. let mut set = JoinSet::new();
  51. for _ in 0..num_threads {
  52. let server = server.clone();
  53. let channel = session.new_channel()?;
  54. let future =
  55. tokio::task::spawn_blocking(move || Self::server_loop(server, channel));
  56. let future = future.map(|result| {
  57. if let Err(err) = result {
  58. error!("server_loop produced an error: {err}");
  59. }
  60. });
  61. set.spawn(future);
  62. }
  63. Ok(Self { set })
  64. }
  65. fn session<T: AsRef<Path>>(mnt_path: T) -> Result<FuseSession> {
  66. mnt_path.try_create_dir()?;
  67. let mut session =
  68. FuseSession::new(mnt_path.as_ref(), Self::FSNAME, Self::FSTYPE, false)?;
  69. session.set_fuse_file(mount_at(mnt_path));
  70. Ok(session)
  71. }
  72. /// Opens a channel to the kernel and processes messages received in an infinite loop.
  73. fn server_loop<P: 'static + FsProvider>(
  74. server: Arc<Server<FuseFs<P>>>,
  75. mut channel: FuseChannel,
  76. ) {
  77. loop {
  78. match channel.get_request() {
  79. Ok(Some((reader, writer))) => {
  80. // Safety: reader and writer are not mutated while the future returned from
  81. // async_handle_message is alive.
  82. if let Err(err) = server.handle_message(reader, writer.into(), None, None) {
  83. error!("error while handling FUSE message: {err}");
  84. }
  85. }
  86. Ok(None) => break,
  87. Err(err) => {
  88. match err {
  89. // Occurs when the file system is unmounted.
  90. transport::Error::SessionFailure(_) => break,
  91. _ => error!("unexpected error from Channel::get_request: {err}"),
  92. }
  93. }
  94. }
  95. }
  96. }
  97. pub async fn finished(&mut self) {
  98. while self.set.join_next().await.is_some() {}
  99. }
  100. }
  101. }