Переглянути джерело

Modified btfuse to allow a remote FS to be mounted.

Matthew Carr 1 рік тому
батько
коміт
8ad6024669

+ 2 - 1
.vscode/settings.json

@@ -24,5 +24,6 @@
     "editor.rulers": [
         100
     ],
-    "cmake.configureOnOpen": false
+    "cmake.configureOnOpen": false,
+    "editor.inlayHints.enabled": "off",
 }

+ 1 - 0
Cargo.lock

@@ -234,6 +234,7 @@ dependencies = [
  "btfproto",
  "btfproto-tests",
  "btlib",
+ "btmsg",
  "btserde",
  "ctor",
  "env_logger",

+ 65 - 80
crates/btfproto/src/client.rs

@@ -1,13 +1,13 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
-use crate::{msg::*, Handle, Inode, server::FsProvider};
+use crate::{msg::*, server::FsProvider, Handle, Inode};
 
-use btlib::{bterr, crypto::ConcretePub, IssuedProcRec, Result, BlockPath};
+use btlib::{bterr, crypto::ConcretePub, BlockPath, IssuedProcRec, Result};
 use btmsg::{DeserCallback, Transmitter};
 
 use core::future::{ready, Future, Ready};
-use std::{sync::Arc, ops::Deref};
-use paste::paste;
 use futures::FutureExt;
+use paste::paste;
+use std::sync::Arc;
 
 macro_rules! extractor {
     ($variant:ident) => {
@@ -144,140 +144,125 @@ impl<T> FsClient<T> {
 impl<T: Send + Sync + Transmitter> FsProvider for FsClient<T> {
     type LookupFut<'c> = impl 'c  + Send + Future<Output = Result<LookupReply>> where T: 'c;
     fn lookup<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Lookup<'c>) -> Self::LookupFut<'c> {
-        self.tx.call(FsMsg::Lookup(msg), extractor!(Lookup)).map(|e| e?)
+        self.tx
+            .call(FsMsg::Lookup(msg), extractor!(Lookup))
+            .map(|e| e?)
     }
 
-    type CreateFut<'c> = impl Send + Future<Output = Result<CreateReply>> where T: 'c;
-    fn create<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Create<'c>) -> Self::CreateFut<'c> {
-        async move {
-            todo!()
-        }
+    type CreateFut<'c> = impl 'c + Send + Future<Output = Result<CreateReply>> where T: 'c;
+    fn create<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Create<'c>) -> Self::CreateFut<'c> {
+        self.tx
+            .call(FsMsg::Create(msg), extractor!(Create))
+            .map(|e| e?)
     }
 
-    type OpenFut<'c> = impl Send + Future<Output = Result<OpenReply>> where T: 'c;
-    fn open<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c> {
-        async move {
-            todo!()
-        }
+    type OpenFut<'c> = impl 'c + Send + Future<Output = Result<OpenReply>> where T: 'c;
+    fn open<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c> {
+        self.tx.call(FsMsg::Open(msg), extractor!(Open)).map(|e| e?)
     }
 
     type ReadGuard = Vec<u8>;
     type ReadFut<'c> = impl 'c + Send + Future<Output = Result<Self::ReadGuard>> where T: 'c;
-    /// Reads from the file specified in the given message.
-    /// ### WARNING
-    /// The returned guard must be dropped before another method is called on this provider.
-    /// Otherwise deadlock _will_ occur.
-    fn read<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Read) -> Self::ReadFut<'c> {
-        async move {
-            todo!()
-        }
+    fn read<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Read) -> Self::ReadFut<'c> {
+        let callback = ExtractRead::new(|reply: ReadReply<'_>| {
+            // TODO: Use a pool of buffers rather than allocating a new one each read.
+            ready(reply.data.to_vec())
+        });
+        self.tx.call(FsMsg::Read(msg), callback).map(|e| e?)
     }
 
     type WriteFut<'r> = impl 'r + Send + Future<Output = Result<WriteReply>> where T: 'r;
-    fn write<'c>(&'c self, from: &'c Arc<BlockPath>, write: Write<&'c [u8]>) -> Self::WriteFut<'c> {
-        async move {
-            todo!()
-        }
+    fn write<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Write<&'c [u8]>) -> Self::WriteFut<'c> {
+        self.tx
+            .call(FsMsg::Write(msg), extractor!(Write))
+            .map(|e| e?)
     }
 
     type FlushFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
-    fn flush<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Flush) -> Self::FlushFut<'c> {
-        async move {
-            todo!()
-        }
+    fn flush<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Flush) -> Self::FlushFut<'c> {
+        self.tx.call(FsMsg::Flush(msg), AckCallback).map(|e| e?)
     }
 
     type ReadDirFut<'c> = impl 'c + Send + Future<Output = Result<ReadDirReply>> where T: 'c;
-    fn read_dir<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadDir) -> Self::ReadDirFut<'c> {
-        async move {
-            todo!()
-        }
+    fn read_dir<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: ReadDir) -> Self::ReadDirFut<'c> {
+        self.tx
+            .call(FsMsg::ReadDir(msg), extractor!(ReadDir))
+            .map(|e| e?)
     }
 
     type LinkFut<'c> = impl 'c + Send + Future<Output = Result<LinkReply>> where T: 'c;
-    fn link<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Link<'c>) -> Self::LinkFut<'c> {
-        async move {
-            todo!()
-        }
+    fn link<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Link<'c>) -> Self::LinkFut<'c> {
+        self.tx.call(FsMsg::Link(msg), extractor!(Link)).map(|e| e?)
     }
 
     type UnlinkFut<'c> = impl 'c +  Send + Future<Output = Result<()>> where T: 'c;
-    fn unlink<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlink<'c>) -> Self::UnlinkFut<'c> {
-        async move {
-            todo!()
-        }
+    fn unlink<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Unlink<'c>) -> Self::UnlinkFut<'c> {
+        self.tx.call(FsMsg::Unlink(msg), AckCallback).map(|e| e?)
     }
 
     type ReadMetaFut<'c> = impl 'c + Send + Future<Output = Result<ReadMetaReply>> where T: 'c;
-    fn read_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMeta) -> Self::ReadMetaFut<'c> {
-        async move {
-            todo!()
-        }
+    fn read_meta<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: ReadMeta) -> Self::ReadMetaFut<'c> {
+        self.tx
+            .call(FsMsg::ReadMeta(msg), extractor!(ReadMeta))
+            .map(|e| e?)
     }
 
     type WriteMetaFut<'c> = impl 'c + Send + Future<Output = Result<WriteMetaReply>> where T: 'c;
-    fn write_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: WriteMeta)
-        -> Self::WriteMetaFut<'c> {
-        async move {
-            todo!()
-        }
+    fn write_meta<'c>(
+        &'c self,
+        _from: &'c Arc<BlockPath>,
+        msg: WriteMeta,
+    ) -> Self::WriteMetaFut<'c> {
+        self.tx
+            .call(FsMsg::WriteMeta(msg), extractor!(WriteMeta))
+            .map(|e| e?)
     }
 
     type AllocateFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
-    fn allocate<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Allocate) -> Self::AllocateFut<'c> {
-        async move {
-            todo!()
-        }
+    fn allocate<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Allocate) -> Self::AllocateFut<'c> {
+        self.tx.call(FsMsg::Allocate(msg), AckCallback).map(|e| e?)
     }
 
     type CloseFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
-    fn close<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Close) -> Self::CloseFut<'c> {
-        async move {
-            todo!()
-        }
+    fn close<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Close) -> Self::CloseFut<'c> {
+        self.tx.call(FsMsg::Close(msg), AckCallback).map(|e| e?)
     }
 
     type ForgetFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
-    fn forget<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Forget) -> Self::ForgetFut<'c> {
-        async move {
-            todo!()
-        }
+    fn forget<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Forget) -> Self::ForgetFut<'c> {
+        self.tx.call(FsMsg::Forget(msg), AckCallback).map(|e| e?)
     }
 
     type LockFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
-    fn lock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lock) -> Self::LockFut<'c> {
-        async move {
-            todo!()
-        }
+    fn lock<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Lock) -> Self::LockFut<'c> {
+        self.tx.call(FsMsg::Lock(msg), AckCallback).map(|e| e?)
     }
 
     type UnlockFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
-    fn unlock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlock) -> Self::UnlockFut<'c> {
-        async move {
-            todo!()
-        }
+    fn unlock<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Unlock) -> Self::UnlockFut<'c> {
+        self.tx.call(FsMsg::Unlock(msg), AckCallback).map(|e| e?)
     }
 
     type AddReacapFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
     fn add_readcap<'c>(
         &'c self,
-        from: &'c Arc<BlockPath>,
+        _from: &'c Arc<BlockPath>,
         msg: AddReadcap,
     ) -> Self::AddReacapFut<'c> {
-        async move {
-            todo!()
-        }
+        self.tx
+            .call(FsMsg::AddReadcap(msg), AckCallback)
+            .map(|e| e?)
     }
 
     type GrantAccessFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
     fn grant_access<'c>(
         &'c self,
-        from: &'c Arc<BlockPath>,
+        _from: &'c Arc<BlockPath>,
         msg: GrantAccess,
     ) -> Self::GrantAccessFut<'c> {
-        async move {
-            todo!()
-        }
+        self.tx
+            .call(FsMsg::GrantAccess(msg), AckCallback)
+            .map(|e| e?)
     }
 }
 

+ 1 - 0
crates/btfuse/Cargo.toml

@@ -10,6 +10,7 @@ btlib = { path = "../btlib" }
 btserde = { path = "../btserde" }
 swtpm-harness = { path = "../swtpm-harness" }
 btfproto = { path = "../btfproto" }
+btmsg = { path = "../btmsg" }
 tokio = { version = "1.24.2", features = ["rt", "rt-multi-thread"] }
 fuse-backend-rs = { version = "0.9.6", features = ["async-io"] }
 log = "0.4.17"

+ 47 - 5
crates/btfuse/src/config.rs

@@ -1,11 +1,24 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
 use super::DEFAULT_CONFIG;
 
-use std::path::{Path, PathBuf};
+use btlib::BlockPath;
+use btmsg::BlockAddr;
+
+use std::{
+    net::IpAddr,
+    path::{Path, PathBuf},
+    sync::Arc,
+};
+
+#[derive(PartialEq, Eq, Clone)]
+pub enum FsKind {
+    Local(PathBuf),
+    Remote(BlockAddr),
+}
 
 #[derive(PartialEq, Eq, Clone)]
 pub struct Config {
-    pub block_dir: PathBuf,
+    pub fs_kind: FsKind,
     pub mnt_dir: PathBuf,
     pub tpm_state_file: PathBuf,
     pub tabrmd: String,
@@ -32,6 +45,8 @@ pub struct ConfigRef<'a> {
 pub struct ConfigBuilder {
     pub block_dir: Option<PathBuf>,
     pub mnt_dir: Option<PathBuf>,
+    pub remote_ip: Option<String>,
+    pub remote_path: Option<String>,
     pub tpm_state_file: Option<PathBuf>,
     pub tabrmd: Option<String>,
     pub mnt_options: Option<String>,
@@ -69,6 +84,16 @@ impl ConfigBuilder {
         self
     }
 
+    pub fn with_remote_ip(mut self, remote_ip: Option<String>) -> Self {
+        self.remote_ip = remote_ip;
+        self
+    }
+
+    pub fn with_remote_path(mut self, remote_path: Option<String>) -> Self {
+        self.remote_path = remote_path;
+        self
+    }
+
     #[allow(dead_code)]
     pub fn with_threads(mut self, threads: Option<usize>) -> Self {
         self.threads = threads;
@@ -76,10 +101,24 @@ impl ConfigBuilder {
     }
 
     pub fn build(self) -> Config {
-        Config {
-            block_dir: self
+        let remote_addr = self
+            .remote_ip
+            .zip(self.remote_path)
+            .map(|(ip_str, path_str)| {
+                let ip: IpAddr = ip_str.parse().unwrap();
+                let path = BlockPath::try_from(path_str.as_str()).unwrap();
+                BlockAddr::new(ip, Arc::new(path))
+            });
+        let fs_kind = if let Some(remote_addr) = remote_addr {
+            FsKind::Remote(remote_addr)
+        } else {
+            let block_dir = self
                 .block_dir
-                .unwrap_or_else(|| Path::new(DEFAULT_CONFIG.block_dir).to_owned()),
+                .unwrap_or_else(|| Path::new(DEFAULT_CONFIG.block_dir).to_owned());
+            FsKind::Local(block_dir)
+        };
+        Config {
+            fs_kind,
             mnt_dir: self
                 .mnt_dir
                 .unwrap_or_else(|| Path::new(DEFAULT_CONFIG.mnt_dir).to_owned()),
@@ -100,6 +139,9 @@ impl ConfigBuilder {
 }
 
 pub struct EnvVars {
+    pub block_dir: &'static str,
     pub tabrmd: &'static str,
     pub mnt_options: &'static str,
+    pub remote_ip: &'static str,
+    pub remote_path: &'static str,
 }

+ 173 - 105
crates/btfuse/src/main.rs

@@ -4,9 +4,9 @@ use fuse_daemon::FuseDaemon;
 mod config;
 mod fuse_fs;
 
-use config::{Config, ConfigRef, EnvVars};
+use config::{Config, ConfigRef, EnvVars, FsKind};
 
-use btfproto::{local_fs::LocalFs, server::FsProvider};
+use btfproto::{client::FsClient, local_fs::LocalFs, server::FsProvider};
 use btlib::{
     config_helpers::from_envvar,
     crypto::{
@@ -15,6 +15,7 @@ use btlib::{
     },
     Result,
 };
+use btmsg::{transmitter, BlockAddr};
 use std::{
     fs::{self},
     io,
@@ -24,8 +25,11 @@ use std::{
 use tokio::sync::oneshot;
 
 const ENVVARS: EnvVars = EnvVars {
+    block_dir: "BT_BLOCKDIR",
     tabrmd: "BT_TABRMD",
     mnt_options: "BT_MNTOPTS",
+    remote_ip: "BT_REMOTEIP",
+    remote_path: "BT_REMOTEPATH",
 };
 
 const DEFAULT_CONFIG: ConfigRef<'static> = ConfigRef {
@@ -58,7 +62,7 @@ fn node_creds(state_file: PathBuf, tabrmd_cfg: &str) -> Result<TpmCreds> {
     cred_store.node_creds()
 }
 
-async fn provider<C: 'static + Creds + Send + Sync>(
+async fn local_provider<C: 'static + Creds + Send + Sync>(
     btdir: PathBuf,
     node_creds: C,
 ) -> Result<impl FsProvider> {
@@ -71,6 +75,15 @@ async fn provider<C: 'static + Creds + Send + Sync>(
     }
 }
 
+async fn remote_provider<C: 'static + Creds + Send + Sync>(
+    remote_addr: BlockAddr,
+    node_creds: C,
+) -> Result<impl FsProvider> {
+    let tx = transmitter(Arc::new(remote_addr), Arc::new(node_creds)).await?;
+    let client = FsClient::new(tx);
+    Ok(client)
+}
+
 async fn run_daemon(config: Config, mounted_signal: Option<oneshot::Sender<()>>) {
     let node_creds =
         node_creds(config.tpm_state_file, &config.tabrmd).expect("failed to get node creds");
@@ -81,17 +94,32 @@ async fn run_daemon(config: Config, mounted_signal: Option<oneshot::Sender<()>>)
             .unwrap();
         Arc::new(writecap.bind_path())
     };
-    let provider = provider(config.block_dir, node_creds)
-        .await
-        .expect("failed to create FS provider");
-
-    let mut daemon = FuseDaemon::new(
-        config.mnt_dir,
-        config.threads,
-        fallback_path,
-        mounted_signal,
-        provider,
-    )
+    let mut daemon = match config.fs_kind {
+        FsKind::Local(btdir) => {
+            let provider = local_provider(btdir, node_creds)
+                .await
+                .expect("failed to create local provider");
+            FuseDaemon::new(
+                config.mnt_dir,
+                config.threads,
+                fallback_path,
+                mounted_signal,
+                provider,
+            )
+        }
+        FsKind::Remote(remote_addr) => {
+            let provider = remote_provider(remote_addr, node_creds)
+                .await
+                .expect("failed to create remote provider");
+            FuseDaemon::new(
+                config.mnt_dir,
+                config.threads,
+                fallback_path,
+                mounted_signal,
+                provider,
+            )
+        }
+    }
     .expect("failed to create FUSE daemon");
     daemon.finished().await
 }
@@ -101,7 +129,9 @@ async fn main() {
     env_logger::init();
     let mut args = std::env::args_os().skip(1).map(PathBuf::from);
     let builder = Config::builder()
-        .with_block_dir(args.next())
+        .with_block_dir(from_envvar(ENVVARS.block_dir).unwrap().map(PathBuf::from))
+        .with_remote_ip(from_envvar(ENVVARS.remote_ip).unwrap())
+        .with_remote_path(from_envvar(ENVVARS.remote_path).unwrap())
         .with_mnt_dir(args.next())
         .with_tabrmd(from_envvar(ENVVARS.tabrmd).unwrap())
         .with_mnt_options(from_envvar(ENVVARS.mnt_options).unwrap());
@@ -113,7 +143,9 @@ async fn main() {
 mod test {
     use super::*;
 
+    use btfproto::{local_fs::ModeAuthorizer, server::new_fs_server};
     use btlib::{crypto::Creds, log::BuilderExt, Epoch, Principaled};
+    use btmsg::Receiver;
     use ctor::ctor;
     use std::{
         ffi::{OsStr, OsString},
@@ -122,6 +154,7 @@ mod test {
             set_permissions, write, OpenOptions, Permissions, ReadDir,
         },
         io::{Read, Seek, SeekFrom, Write},
+        net::{IpAddr, Ipv6Addr},
         os::unix::fs::PermissionsExt,
         thread::JoinHandle,
         time::Duration,
@@ -130,7 +163,7 @@ mod test {
     use tempdir::TempDir;
     use tokio::sync::oneshot::error::TryRecvError;
 
-    /// An optional timeout to wait for the FUSE daemon to start in tests.
+    /// An optional timeout to limit the time spent waiting for the FUSE daemon to start in tests.
     const TIMEOUT: Option<Duration> = Some(Duration::from_millis(1000));
 
     /// The log level to use when running tests.
@@ -168,63 +201,84 @@ mod test {
 
     const ROOT_PASSWD: &str = "password";
 
-    struct TestCase {
+    struct TestCase<R> {
         config: Config,
         handle: Option<JoinHandle<()>>,
         node_principal: OsString,
         // Note that the drop order of these fields is significant.
+        _receiver: Option<R>,
         _cred_store: TpmCredStore,
         _swtpm: SwtpmHarness,
         _temp_dir: TempDir,
     }
 
-    impl TestCase {
-        fn new() -> TestCase {
-            let tmp = TempDir::new("btfuse").unwrap();
-            let (mounted_tx, mut mounted_rx) = oneshot::channel();
-            let (swtpm, cred_store) = Self::swtpm();
-            let config = Config::builder()
-                .with_block_dir(Some(tmp.path().join("bt")))
-                .with_mnt_dir(Some(tmp.path().join("mnt")))
-                .with_tpm_state_file(Some(swtpm.state_path().to_owned().into()))
-                .with_tabrmd(Some(swtpm.tabrmd_config().to_owned()))
-                .build();
-            let config_clone = config.clone();
-            let handle = std::thread::spawn(move || Self::run(mounted_tx, config_clone));
-            match TIMEOUT {
-                Some(duration) => {
-                    let deadline = std::time::Instant::now() + duration;
-                    loop {
-                        if std::time::Instant::now() > deadline {
-                            panic!("timed out waiting for the mounted signal");
-                        }
-                        match mounted_rx.try_recv() {
-                            Ok(_) => break,
-                            Err(err) => match err {
-                                TryRecvError::Empty => {
-                                    std::thread::sleep(Duration::from_millis(10));
-                                }
-                                TryRecvError::Closed => {
-                                    panic!("channel was closed before mounted signal was sent")
-                                }
-                            },
-                        }
+    async fn new_local() -> TestCase<impl Receiver> {
+        new(false).await
+    }
+
+    async fn new_remote() -> TestCase<impl Receiver> {
+        new(true).await
+    }
+
+    async fn new(remote: bool) -> TestCase<impl Receiver> {
+        let tmp = TempDir::new("btfuse").unwrap();
+        let (mounted_tx, mut mounted_rx) = oneshot::channel();
+        let (swtpm, cred_store) = TestCase::<()>::swtpm();
+        let builder = Config::builder()
+            .with_mnt_dir(Some(tmp.path().join("mnt")))
+            .with_tpm_state_file(Some(swtpm.state_path().to_owned().into()))
+            .with_tabrmd(Some(swtpm.tabrmd_config().to_owned()));
+        let block_dir = tmp.path().join("bt");
+        let (config, receiver) = if remote {
+            let node_creds = Arc::new(cred_store.node_creds().unwrap());
+            let local_fs = LocalFs::new_empty(block_dir, 0, node_creds.clone(), ModeAuthorizer)
+                .await
+                .unwrap();
+            let ip_addr = IpAddr::V6(Ipv6Addr::LOCALHOST);
+            let receiver = new_fs_server(ip_addr, node_creds, Arc::new(local_fs)).unwrap();
+            let builder = builder.with_remote_ip(Some(ip_addr.to_string()));
+            (builder.build(), Some(receiver))
+        } else {
+            (builder.with_block_dir(Some(block_dir)).build(), None)
+        };
+        let config_clone = config.clone();
+        let handle = std::thread::spawn(move || TestCase::<()>::run(mounted_tx, config_clone));
+        match TIMEOUT {
+            Some(duration) => {
+                let deadline = std::time::Instant::now() + duration;
+                loop {
+                    if std::time::Instant::now() > deadline {
+                        panic!("timed out waiting for the mounted signal");
+                    }
+                    match mounted_rx.try_recv() {
+                        Ok(_) => break,
+                        Err(err) => match err {
+                            TryRecvError::Empty => {
+                                std::thread::sleep(Duration::from_millis(10));
+                            }
+                            TryRecvError::Closed => {
+                                panic!("channel was closed before mounted signal was sent")
+                            }
+                        },
                     }
                 }
-                None => mounted_rx.blocking_recv().unwrap(),
-            };
-            let node_principal =
-                OsString::from(cred_store.node_creds().unwrap().principal().to_string());
-            Self {
-                config,
-                handle: Some(handle),
-                node_principal,
-                _temp_dir: tmp,
-                _swtpm: swtpm,
-                _cred_store: cred_store,
             }
+            None => mounted_rx.blocking_recv().unwrap(),
+        };
+        let node_principal =
+            OsString::from(cred_store.node_creds().unwrap().principal().to_string());
+        TestCase {
+            config,
+            handle: Some(handle),
+            node_principal,
+            _receiver: receiver,
+            _temp_dir: tmp,
+            _swtpm: swtpm,
+            _cred_store: cred_store,
         }
+    }
 
+    impl<R> TestCase<R> {
         fn run(mounted_tx: oneshot::Sender<()>, config: Config) {
             let runtime = tokio::runtime::Builder::new_current_thread()
                 .build()
@@ -284,7 +338,7 @@ mod test {
         }
     }
 
-    impl Drop for TestCase {
+    impl<R> Drop for TestCase<R> {
         fn drop(&mut self) {
             self.unmount_and_wait()
         }
@@ -292,18 +346,32 @@ mod test {
 
     /// Creates a new file system and mounts it at `/tmp/btfuse.<random>/mnt` so it can be
     /// tested manually.
-    //#[test]
+    //#[tokio::test]
     #[allow(dead_code)]
-    fn manual_test() {
-        let mut case = TestCase::new();
+    async fn manual_test() {
+        let mut case = new_local().await;
         case.wait()
     }
 
-    #[test]
-    fn write_read() -> Result<()> {
+    #[tokio::test]
+    async fn write_read() -> Result<()> {
+        const EXPECTED: &[u8] =
+            b"The paths to failure are uncountable, yet to success there is but one.";
+        let case = new_local().await;
+        let file_path = case.mnt_dir().join("file");
+
+        write(&file_path, EXPECTED)?;
+
+        let actual = read(&file_path)?;
+        assert_eq!(EXPECTED, actual);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn write_read_remote() -> Result<()> {
         const EXPECTED: &[u8] =
             b"The paths to failure are uncountable, yet to success there is but one.";
-        let case = TestCase::new();
+        let case = new_remote().await;
         let file_path = case.mnt_dir().join("file");
 
         write(&file_path, EXPECTED)?;
@@ -313,11 +381,11 @@ mod test {
         Ok(())
     }
 
-    #[test]
-    fn create_file_then_readdir() {
+    #[tokio::test]
+    async fn create_file_then_readdir() {
         const DATA: &[u8] = b"Au revoir Shoshanna!";
         let file_name = OsStr::new("landa_dialog.txt");
-        let case = TestCase::new();
+        let case = new_local().await;
         let mut expected = case.initial_contents();
         expected.push(file_name);
         let mnt_path = case.mnt_dir();
@@ -331,11 +399,11 @@ mod test {
         assert!(second.eq(expected));
     }
 
-    #[test]
-    fn create_then_delete_file() {
+    #[tokio::test]
+    async fn create_then_delete_file() {
         const DATA: &[u8] = b"The universe is hostile, so impersonal. Devour to survive";
         let file_name = OsStr::new("tool_lyrics.txt");
-        let case = TestCase::new();
+        let case = new_local().await;
         let mnt_path = case.mnt_dir();
         let file_path = mnt_path.join(file_name);
         write(&file_path, DATA).expect("write failed");
@@ -347,12 +415,12 @@ mod test {
         assert!(actual.eq(expected));
     }
 
-    #[test]
-    fn hard_link_then_remove() {
+    #[tokio::test]
+    async fn hard_link_then_remove() {
         const EXPECTED: &[u8] = b"And the lives we've reclaimed";
         let name1 = OsStr::new("refugee_lyrics.txt");
         let name2 = OsStr::new("rise_against_lyrics.txt");
-        let case = TestCase::new();
+        let case = new_local().await;
         let mnt_path = case.mnt_dir();
         let path1 = mnt_path.join(name1);
         let path2 = mnt_path.join(name2);
@@ -365,12 +433,12 @@ mod test {
         assert_eq!(EXPECTED, actual);
     }
 
-    #[test]
-    fn hard_link_then_remove_both() {
+    #[tokio::test]
+    async fn hard_link_then_remove_both() {
         const EXPECTED: &[u8] = b"And the lives we've reclaimed";
         let name1 = OsStr::new("refugee_lyrics.txt");
         let name2 = OsStr::new("rise_against_lyrics.txt");
-        let case = TestCase::new();
+        let case = new_local().await;
         let mnt_path = case.mnt_dir();
         let path1 = mnt_path.join(name1);
         let path2 = mnt_path.join(name2);
@@ -384,10 +452,10 @@ mod test {
         assert!(file_names(read_dir(&mnt_path).expect("read_dir failed")).eq(expected));
     }
 
-    #[test]
-    fn set_mode_bits() {
+    #[tokio::test]
+    async fn set_mode_bits() {
         const EXPECTED: u32 = libc::S_IFREG | 0o777;
-        let case = TestCase::new();
+        let case = new_local().await;
         let file_path = case.mnt_dir().join("bagobits");
         write(&file_path, []).expect("write failed");
         let original = metadata(&file_path)
@@ -406,10 +474,10 @@ mod test {
         assert_eq!(EXPECTED, actual);
     }
 
-    #[test]
-    fn create_directory() {
+    #[tokio::test]
+    async fn create_directory() {
         const EXPECTED: &str = "etc";
-        let case = TestCase::new();
+        let case = new_local().await;
         let mnt_path = case.mnt_dir();
         let dir_path = mnt_path.join(EXPECTED);
         let mut expected = case.initial_contents();
@@ -421,11 +489,11 @@ mod test {
         assert!(actual.eq(expected));
     }
 
-    #[test]
-    fn create_file_under_new_directory() {
+    #[tokio::test]
+    async fn create_file_under_new_directory() {
         const DIR_NAME: &str = "etc";
         const FILE_NAME: &str = "file";
-        let case = TestCase::new();
+        let case = new_local().await;
         let mnt_path = case.mnt_dir();
         let dir_path = mnt_path.join(DIR_NAME);
         let file_path = dir_path.join(FILE_NAME);
@@ -437,10 +505,10 @@ mod test {
         assert!(actual.eq([FILE_NAME]));
     }
 
-    #[test]
-    fn create_then_remove_directory() {
+    #[tokio::test]
+    async fn create_then_remove_directory() {
         const DIR_NAME: &str = "etc";
-        let case = TestCase::new();
+        let case = new_local().await;
         let mnt_path = case.mnt_dir();
         let dir_path = mnt_path.join(DIR_NAME);
 
@@ -451,10 +519,10 @@ mod test {
         assert!(actual.eq(case.initial_contents()));
     }
 
-    #[test]
-    fn read_only_dir_cant_create_subdir() {
+    #[tokio::test]
+    async fn read_only_dir_cant_create_subdir() {
         const DIR_NAME: &str = "etc";
-        let case = TestCase::new();
+        let case = new_local().await;
         let dir_path = case.mnt_dir().join(DIR_NAME);
         create_dir(&dir_path).expect("create_dir failed");
         set_permissions(&dir_path, Permissions::from_mode(libc::S_IFDIR | 0o444))
@@ -467,10 +535,10 @@ mod test {
         assert_eq!(os_err, libc::EACCES);
     }
 
-    #[test]
-    fn read_only_dir_cant_remove_subdir() {
+    #[tokio::test]
+    async fn read_only_dir_cant_remove_subdir() {
         const DIR_NAME: &str = "etc";
-        let case = TestCase::new();
+        let case = new_local().await;
         let dir_path = case.mnt_dir().join(DIR_NAME);
         let sub_path = dir_path.join("sub");
         create_dir(&dir_path).expect("create_dir failed");
@@ -485,11 +553,11 @@ mod test {
         assert_eq!(os_err, libc::EACCES);
     }
 
-    #[test]
-    fn rename_file() {
+    #[tokio::test]
+    async fn rename_file() {
         const FILE_NAME: &str = "parabola.txt";
         const EXPECTED: &[u8] = b"We are eternal all this pain is an illusion";
-        let case = TestCase::new();
+        let case = new_local().await;
         let src_path = case.mnt_dir().join(FILE_NAME);
         let dst_path = case.mnt_dir().join("parabola_lyrics.txt");
 
@@ -500,8 +568,8 @@ mod test {
         assert_eq!(EXPECTED, actual)
     }
 
-    #[test]
-    fn write_read_with_file_struct() {
+    #[tokio::test]
+    async fn write_read_with_file_struct() {
         const FILE_NAME: &str = "big.dat";
         const LEN: usize = btlib::SECTOR_SZ_DEFAULT + 1;
         fn fill(buf: &mut Vec<u8>, value: u8) {
@@ -509,7 +577,7 @@ mod test {
             buf.extend(std::iter::repeat(value).take(buf.capacity()));
         }
 
-        let case = TestCase::new();
+        let case = new_local().await;
         let file_path = case.mnt_dir().join(FILE_NAME);
         let mut buf = vec![1u8; LEN];
         let mut file = OpenOptions::new()
@@ -530,17 +598,17 @@ mod test {
         assert_eq!(buf, actual);
     }
 
-    //#[test]
+    //#[tokio::test]
     #[allow(dead_code)]
     /// KMC: This test is currently not working, and I've not been able to figure out why, nor
     /// reproduce it at a lower layer of the stack.
-    fn read_more_than_whats_buffered() {
+    async fn read_more_than_whats_buffered() {
         const FILE_NAME: &str = "big.dat";
         const SECT_SZ: usize = btlib::SECTOR_SZ_DEFAULT;
         const DIVISOR: usize = 8;
         const READ_SZ: usize = SECT_SZ / DIVISOR;
 
-        let case = TestCase::new();
+        let case = new_local().await;
         let file_path = case.mnt_dir().join(FILE_NAME);
         let mut file = OpenOptions::new()
             .create(true)

+ 54 - 4
crates/btlib/src/crypto.rs

@@ -37,6 +37,8 @@ use std::{
     fmt::Display,
     io::{Read, Write},
     marker::PhantomData,
+    ops::Deref,
+    sync::Arc,
 };
 use strum_macros::{Display, EnumDiscriminants, FromRepr};
 use zeroize::ZeroizeOnDrop;
@@ -1798,9 +1800,9 @@ pub trait Encrypter {
     fn encrypt(&self, slice: &[u8]) -> Result<Vec<u8>>;
 }
 
-impl<T: Encrypter> Encrypter for &T {
+impl<T: Deref<Target = C>, C: Encrypter> Encrypter for T {
     fn encrypt(&self, slice: &[u8]) -> Result<Vec<u8>> {
-        (*self).encrypt(slice)
+        self.deref().encrypt(slice)
     }
 }
 
@@ -1820,9 +1822,9 @@ pub trait Decrypter {
     fn decrypt(&self, slice: &[u8]) -> Result<Vec<u8>>;
 }
 
-impl<T: Decrypter> Decrypter for &T {
+impl<T: Deref<Target = C>, C: Decrypter> Decrypter for T {
     fn decrypt(&self, slice: &[u8]) -> Result<Vec<u8>> {
-        (*self).decrypt(slice)
+        self.deref().decrypt(slice)
     }
 }
 
@@ -1960,6 +1962,22 @@ impl<T: Signer> Signer for &T {
     }
 }
 
+impl<T: Signer> Signer for Arc<T> {
+    type Op<'s> = T::Op<'s> where Self: 's;
+
+    fn init_sign(&self) -> Result<Self::Op<'_>> {
+        self.deref().init_sign()
+    }
+
+    fn sign<'a, I: Iterator<Item = &'a [u8]>>(&self, parts: I) -> Result<Signature> {
+        self.deref().sign(parts)
+    }
+
+    fn kind(&self) -> Sign {
+        self.deref().kind()
+    }
+}
+
 pub trait VerifyOp: Sized {
     type Arg;
 
@@ -2084,6 +2102,22 @@ impl<T: Verifier> Verifier for &T {
     }
 }
 
+impl<T: Verifier> Verifier for Arc<T> {
+    type Op<'v> = T::Op<'v> where Self: 'v;
+
+    fn init_verify(&self) -> Result<Self::Op<'_>> {
+        self.deref().init_verify()
+    }
+
+    fn verify<'a, I: Iterator<Item = &'a [u8]>>(&self, parts: I, signature: &[u8]) -> Result<()> {
+        self.deref().verify(parts, signature)
+    }
+
+    fn kind(&self) -> Sign {
+        self.deref().kind()
+    }
+}
+
 /// Trait for types which can be used as public credentials.
 pub trait CredsPub: Verifier + Encrypter + Principaled {
     /// Returns a reference to the public signing key which can be used to verify signatures.
@@ -2106,6 +2140,16 @@ impl<T: CredsPub> CredsPub for &T {
     }
 }
 
+impl<T: CredsPub> CredsPub for Arc<T> {
+    fn public_sign(&self) -> &AsymKey<Public, Sign> {
+        self.deref().public_sign()
+    }
+
+    fn concrete_pub(&self) -> ConcretePub {
+        self.deref().concrete_pub()
+    }
+}
+
 /// Trait for types which contain private credentials.
 pub trait CredsPriv: Decrypter + Signer {
     /// Returns a reference to the writecap associated with these credentials, if one has been
@@ -2123,6 +2167,12 @@ impl<T: CredsPriv> CredsPriv for &T {
     }
 }
 
+impl<T: CredsPriv> CredsPriv for Arc<T> {
+    fn writecap(&self) -> Option<&Writecap> {
+        self.deref().writecap()
+    }
+}
+
 /// Trait for types which contain both public and private credentials.
 pub trait Creds: CredsPriv + CredsPub + Clone {
     fn issue_writecap(

+ 3 - 3
crates/btlib/src/lib.rs

@@ -36,7 +36,7 @@ use std::{
     hash::Hash as Hashable,
     io::{self, Read, Seek, SeekFrom, Write},
     net::IpAddr,
-    ops::{Add, Sub},
+    ops::{Add, Deref, Sub},
     os::unix::prelude::MetadataExt,
     time::{Duration, SystemTime},
 };
@@ -1375,9 +1375,9 @@ pub trait Principaled {
     }
 }
 
-impl<T: Principaled> Principaled for &T {
+impl<T: Deref<Target = C>, C: Principaled> Principaled for T {
     fn principal_of_kind(&self, kind: HashKind) -> Principal {
-        (*self).principal_of_kind(kind)
+        self.deref().principal_of_kind(kind)
     }
 }
 

+ 1 - 1
crates/btlib/src/sectored_buf.rs

@@ -176,7 +176,7 @@ mod private {
                 pos: 0,
             };
             sectored.buf.resize(sect_sz, 0);
-            sectored.inner.seek(SeekFrom::Start(0))?;
+            sectored.inner.rewind()?;
             sectored.fill_internal_buf()?;
             Ok(sectored)
         }