Quellcode durchsuchen

Resolved most of the issues encountered when mounting a remote FS.
There are still issues with some of the tests causing them to hang
intermittently.

Matthew Carr vor 1 Jahr
Ursprung
Commit
7d82aa7c40

+ 3 - 1
crates/btfproto/src/local_fs.rs

@@ -1191,12 +1191,14 @@ mod private {
             inode: Inode,
             handle: Handle,
             offset: u64,
-            size: u64,
+            mut size: u64,
         ) -> Result<BufGuard<C>> {
             let table = table.read_owned().await;
             let entry = table.get(&inode).ok_or(Error::NotOpen(inode))?;
             let inode_guard = {
                 let inode_guard = entry.clone().read_owned().await;
+                let total_size = inode_guard.block().meta_body().secrets()?.size;
+                size = size.min(total_size.saturating_sub(offset));
                 let mut handle_guard = inode_guard.handle_guard(from, handle).await?;
                 handle_guard.flags.assert_readable()?;
                 let pos = handle_guard.pos() as u64;

+ 2 - 2
crates/btfproto/src/msg.rs

@@ -399,7 +399,7 @@ impl BitOrAssign<Self> for AttrsSet {
     }
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Debug, Serialize, Deserialize)]
 pub struct Entry {
     pub attr: BlockMetaSecrets,
     pub attr_timeout: Duration,
@@ -412,7 +412,7 @@ pub struct Lookup<'a> {
     pub name: &'a str,
 }
 
-#[derive(Serialize, Deserialize)]
+#[derive(Debug, Serialize, Deserialize)]
 pub struct LookupReply {
     pub inode: Inode,
     pub generation: u64,

+ 37 - 0
crates/btfsd/src/main.rs

@@ -187,6 +187,43 @@ mod tests {
         assert_eq!(EXPECTED, &actual);
     }
 
+    #[tokio::test]
+    async fn read_full_sector() {
+        const FILENAME: &str = "file.txt";
+        const EXPECTED: &[u8] = b"prawn crisps";
+        let case = new_case().await;
+        let client = case.client;
+
+        let CreateReply {
+            inode,
+            handle,
+            entry,
+            ..
+        } = client
+            .create(
+                SpecInodes::RootDir.into(),
+                FILENAME,
+                FlagValue::ReadWrite.into(),
+                0o644,
+                0,
+            )
+            .await
+            .unwrap();
+        let WriteReply { written, .. } = client.write(inode, handle, 0, EXPECTED).await.unwrap();
+        assert_eq!(EXPECTED.len() as u64, written);
+        assert!(entry.attr.sect_sz > EXPECTED.len() as u64);
+        let actual = client
+            .read(inode, handle, 0, entry.attr.sect_sz, |reply| {
+                let mut buf = Vec::with_capacity(EXPECTED.len());
+                buf.extend_from_slice(reply.data);
+                ready(buf)
+            })
+            .await
+            .unwrap();
+
+        assert!(EXPECTED.eq(&actual));
+    }
+
     #[tokio::test]
     async fn read_from_different_instance() {
         const FILENAME: &str = "file.txt";

+ 11 - 7
crates/btfuse/src/config.rs

@@ -8,6 +8,7 @@ use std::{
     net::IpAddr,
     path::{Path, PathBuf},
     sync::Arc,
+    thread::available_parallelism, num::NonZeroUsize,
 };
 
 #[derive(PartialEq, Eq, Clone)]
@@ -23,7 +24,7 @@ pub struct Config {
     pub tpm_state_file: PathBuf,
     pub tabrmd: String,
     pub mnt_options: String,
-    pub threads: usize,
+    pub threads: NonZeroUsize,
 }
 
 impl Config {
@@ -38,7 +39,7 @@ pub struct ConfigRef<'a> {
     pub tpm_state_file: &'a str,
     pub tabrmd: &'a str,
     pub mnt_options: &'a str,
-    pub threads: usize,
+    pub threads: Option<NonZeroUsize>,
 }
 
 #[derive(Default, PartialEq, Eq, Clone)]
@@ -50,7 +51,7 @@ pub struct ConfigBuilder {
     pub tpm_state_file: Option<PathBuf>,
     pub tabrmd: Option<String>,
     pub mnt_options: Option<String>,
-    pub threads: Option<usize>,
+    pub threads: Option<NonZeroUsize>,
 }
 
 impl ConfigBuilder {
@@ -95,7 +96,7 @@ impl ConfigBuilder {
     }
 
     #[allow(dead_code)]
-    pub fn with_threads(mut self, threads: Option<usize>) -> Self {
+    pub fn with_threads(mut self, threads: Option<NonZeroUsize>) -> Self {
         self.threads = threads;
         self
     }
@@ -131,9 +132,12 @@ impl ConfigBuilder {
             mnt_options: self
                 .mnt_options
                 .unwrap_or_else(|| DEFAULT_CONFIG.mnt_options.to_owned()),
-            threads: self
-                .threads
-                .unwrap_or_else(|| DEFAULT_CONFIG.threads.to_owned()),
+            threads: self.threads.unwrap_or_else(|| {
+                DEFAULT_CONFIG.threads.unwrap_or_else(|| {
+                    let threads = available_parallelism().unwrap().get() / 2;
+                    NonZeroUsize::new(threads.max(1)).unwrap()
+                })
+            }),
         }
     }
 }

+ 21 - 6
crates/btfuse/src/fuse_daemon.rs

@@ -8,7 +8,7 @@ use fuse_backend_rs::{
     transport::{self, FuseChannel, FuseSession},
 };
 use futures::future::FutureExt;
-use log::error;
+use log::{debug, error};
 use std::path::PathBuf;
 use std::{
     ffi::{c_char, c_int, CString},
@@ -22,6 +22,8 @@ use tokio::{sync::oneshot, task::JoinSet};
 pub use private::FuseDaemon;
 
 mod private {
+    use std::num::NonZeroUsize;
+
     use super::*;
 
     #[link(name = "fuse3")]
@@ -41,6 +43,7 @@ mod private {
 
     pub struct FuseDaemon {
         set: JoinSet<()>,
+        session: FuseSession,
     }
 
     impl FuseDaemon {
@@ -49,7 +52,7 @@ mod private {
 
         pub fn new<P: 'static + FsProvider>(
             mnt_path: PathBuf,
-            num_threads: usize,
+            num_tasks: NonZeroUsize,
             fallback_path: Arc<BlockPath>,
             mounted_signal: Option<oneshot::Sender<()>>,
             provider: P,
@@ -61,11 +64,13 @@ mod private {
                     .map_err(|_| bterr!("failed to send mounted signal"))?;
             }
             let mut set = JoinSet::new();
-            for _ in 0..num_threads {
+            log::debug!("spawning {num_tasks} blocking tasks");
+            for task_num in 0..num_tasks.get() {
                 let server = server.clone();
                 let channel = session.new_channel()?;
-                let future =
-                    tokio::task::spawn_blocking(move || Self::server_loop(server, channel));
+                let future = tokio::task::spawn_blocking(move || {
+                    Self::server_loop(task_num, server, channel)
+                });
                 let future = future.map(|result| {
                     if let Err(err) = result {
                         error!("server_loop produced an error: {err}");
@@ -73,7 +78,7 @@ mod private {
                 });
                 set.spawn(future);
             }
-            Ok(Self { set })
+            Ok(Self { set, session })
         }
 
         fn session<T: AsRef<Path>>(mnt_path: T) -> Result<FuseSession> {
@@ -86,6 +91,7 @@ mod private {
 
         /// Opens a channel to the kernel and processes messages received in an infinite loop.
         fn server_loop<P: 'static + FsProvider>(
+            task_num: usize,
             server: Arc<Server<FuseFs<P>>>,
             mut channel: FuseChannel,
         ) {
@@ -108,10 +114,19 @@ mod private {
                     }
                 }
             }
+            debug!("server_loop {task_num} exiting");
         }
 
         pub async fn finished(&mut self) {
             while self.set.join_next().await.is_some() {}
         }
     }
+
+    impl Drop for FuseDaemon {
+        fn drop(&mut self) {
+            if let Err(err) = self.session.wake() {
+                error!("failed to wake FuseSession: {err}");
+            }
+        }
+    }
 }

+ 9 - 2
crates/btfuse/src/fuse_fs.rs

@@ -153,6 +153,7 @@ mod private {
         type Handle = btfproto::Handle;
 
         fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> {
+            log::debug!("init called");
             let provider_ref = &self.provider;
             let default_path = self.path_by_luid.k2v().default();
             let default_path_clone = default_path.clone();
@@ -170,6 +171,7 @@ mod private {
         }
 
         fn lookup(&self, ctx: &Context, parent: Self::Inode, name: &CStr) -> IoResult<Entry> {
+            log::debug!("lookup called");
             block_on(async move {
                 let path = self.path_from_luid(ctx.uid);
                 let name = name.to_str().display_err()?;
@@ -182,7 +184,10 @@ mod private {
                                 debug!("lookup returned io::Error: {err}");
                                 Err(err)
                             }
-                            Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string())),
+                            Err(err) => {
+                                debug!("lookup returned an unknown error: {err}");
+                                Err(io::Error::new(io::ErrorKind::Other, err.to_string()))
+                            }
                         }
                     }
                 };
@@ -311,7 +316,7 @@ mod private {
             handle: Self::Handle,
             w: &mut dyn ZeroCopyWriter,
             size: u32,
-            offset: u64,
+            mut offset: u64,
             _lock_owner: Option<u64>,
             _flags: u32,
         ) -> IoResult<usize> {
@@ -333,6 +338,8 @@ mod private {
                     }
                     w.write_all(&guard)?;
                     total_written += slice.len();
+                    let len: u64 = slice.len().try_into().display_err()?;
+                    offset += len;
                 }
                 Ok(total_written)
             })

+ 296 - 181
crates/btfuse/src/main.rs

@@ -38,7 +38,7 @@ const DEFAULT_CONFIG: ConfigRef<'static> = ConfigRef {
     tpm_state_file: "./tpm_state",
     tabrmd: "bus_type=session",
     mnt_options: "default_permissions",
-    threads: 8,
+    threads: None,
 };
 
 trait PathExt {
@@ -84,7 +84,11 @@ async fn remote_provider<C: 'static + Creds + Send + Sync>(
     Ok(client)
 }
 
-async fn run_daemon(config: Config, mounted_signal: Option<oneshot::Sender<()>>) {
+async fn run_daemon(
+    config: Config,
+    mounted_signal: Option<oneshot::Sender<()>>,
+    stop_signal: Option<oneshot::Receiver<()>>,
+) {
     let node_creds =
         node_creds(config.tpm_state_file, &config.tabrmd).expect("failed to get node creds");
     let fallback_path = {
@@ -96,6 +100,7 @@ async fn run_daemon(config: Config, mounted_signal: Option<oneshot::Sender<()>>)
     };
     let mut daemon = match config.fs_kind {
         FsKind::Local(btdir) => {
+            log::info!("starting daemon with local provider using {:?}", btdir);
             let provider = local_provider(btdir, node_creds)
                 .await
                 .expect("failed to create local provider");
@@ -108,6 +113,10 @@ async fn run_daemon(config: Config, mounted_signal: Option<oneshot::Sender<()>>)
             )
         }
         FsKind::Remote(remote_addr) => {
+            log::info!(
+                "starting daemon with remote provider using {:?}",
+                remote_addr.socket_addr()
+            );
             let provider = remote_provider(remote_addr, node_creds)
                 .await
                 .expect("failed to create remote provider");
@@ -121,7 +130,15 @@ async fn run_daemon(config: Config, mounted_signal: Option<oneshot::Sender<()>>)
         }
     }
     .expect("failed to create FUSE daemon");
-    daemon.finished().await
+
+    if let Some(stop_signal) = stop_signal {
+        tokio::select! {
+            _  = daemon.finished() => (),
+            _ = stop_signal => (),
+        };
+    } else {
+        daemon.finished().await;
+    }
 }
 
 #[tokio::main]
@@ -136,7 +153,7 @@ async fn main() {
         .with_tabrmd(from_envvar(ENVVARS.tabrmd).unwrap())
         .with_mnt_options(from_envvar(ENVVARS.mnt_options).unwrap());
     let config = builder.build();
-    run_daemon(config, None).await;
+    run_daemon(config, None, None).await;
 }
 
 #[cfg(test)]
@@ -149,19 +166,22 @@ mod test {
     use ctor::ctor;
     use std::{
         ffi::{OsStr, OsString},
-        fs::{
-            create_dir, hard_link, metadata, read, read_dir, remove_dir, remove_file, rename,
-            set_permissions, write, OpenOptions, Permissions, ReadDir,
-        },
-        io::{Read, Seek, SeekFrom, Write},
+        fs::Permissions,
+        io::SeekFrom,
         net::{IpAddr, Ipv6Addr},
         os::unix::fs::PermissionsExt,
-        thread::JoinHandle,
         time::Duration,
     };
     use swtpm_harness::SwtpmHarness;
     use tempdir::TempDir;
-    use tokio::sync::oneshot::error::TryRecvError;
+    use tokio::{
+        fs::{
+            create_dir, hard_link, metadata, read, read_dir, remove_dir, remove_file, rename,
+            set_permissions, write, OpenOptions, ReadDir,
+        },
+        io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
+        task::JoinHandle,
+    };
 
     /// An optional timeout to limit the time spent waiting for the FUSE daemon to start in tests.
     const TIMEOUT: Option<Duration> = Some(Duration::from_millis(1000));
@@ -195,16 +215,21 @@ mod test {
         }
     }
 
-    fn file_names(read_dir: ReadDir) -> impl Iterator<Item = OsString> {
-        read_dir.map(|entry| entry.unwrap().file_name())
+    async fn file_names(mut read_dir: ReadDir) -> Vec<OsString> {
+        let mut output = Vec::new();
+        while let Some(entry) = read_dir.next_entry().await.unwrap() {
+            output.push(entry.file_name());
+        }
+        output
     }
 
     const ROOT_PASSWD: &str = "password";
 
-    struct TestCase<R> {
+    struct TestCase<R: Receiver> {
         config: Config,
         handle: Option<JoinHandle<()>>,
         node_principal: OsString,
+        stop_tx: Option<oneshot::Sender<()>>,
         // Note that the drop order of these fields is significant.
         _receiver: Option<R>,
         _cred_store: TpmCredStore,
@@ -222,8 +247,9 @@ mod test {
 
     async fn new(remote: bool) -> TestCase<impl Receiver> {
         let tmp = TempDir::new("btfuse").unwrap();
-        let (mounted_tx, mut mounted_rx) = oneshot::channel();
-        let (swtpm, cred_store) = TestCase::<()>::swtpm();
+        let (mounted_tx, mounted_rx) = oneshot::channel();
+        let (stop_tx, stop_rx) = oneshot::channel();
+        let (swtpm, cred_store) = swtpm();
         let builder = Config::builder()
             .with_mnt_dir(Some(tmp.path().join("mnt")))
             .with_tpm_state_file(Some(swtpm.state_path().to_owned().into()))
@@ -231,46 +257,40 @@ mod test {
         let block_dir = tmp.path().join("bt");
         let (config, receiver) = if remote {
             let node_creds = Arc::new(cred_store.node_creds().unwrap());
+            let bind_path = node_creds.bind_path().unwrap();
+            block_dir.try_create_dir().unwrap();
             let local_fs = LocalFs::new_empty(block_dir, 0, node_creds.clone(), ModeAuthorizer)
                 .await
                 .unwrap();
             let ip_addr = IpAddr::V6(Ipv6Addr::LOCALHOST);
-            let receiver = new_fs_server(ip_addr, node_creds, Arc::new(local_fs)).unwrap();
-            let builder = builder.with_remote_ip(Some(ip_addr.to_string()));
+            let receiver = new_fs_server(ip_addr, node_creds.clone(), Arc::new(local_fs)).unwrap();
+            let builder = builder
+                .with_remote_ip(Some(ip_addr.to_string()))
+                .with_remote_path(Some(bind_path.to_string()));
             (builder.build(), Some(receiver))
         } else {
             (builder.with_block_dir(Some(block_dir)).build(), None)
         };
         let config_clone = config.clone();
-        let handle = std::thread::spawn(move || TestCase::<()>::run(mounted_tx, config_clone));
-        match TIMEOUT {
-            Some(duration) => {
-                let deadline = std::time::Instant::now() + duration;
-                loop {
-                    if std::time::Instant::now() > deadline {
-                        panic!("timed out waiting for the mounted signal");
-                    }
-                    match mounted_rx.try_recv() {
-                        Ok(_) => break,
-                        Err(err) => match err {
-                            TryRecvError::Empty => {
-                                std::thread::sleep(Duration::from_millis(10));
-                            }
-                            TryRecvError::Closed => {
-                                panic!("channel was closed before mounted signal was sent")
-                            }
-                        },
-                    }
-                }
-            }
-            None => mounted_rx.blocking_recv().unwrap(),
-        };
+        let handle =
+            tokio::spawn(
+                async move { run_daemon(config_clone, Some(mounted_tx), Some(stop_rx)).await },
+            );
+        if let Some(timeout) = TIMEOUT {
+            tokio::time::timeout(timeout, mounted_rx)
+                .await
+                .unwrap()
+                .unwrap();
+        } else {
+            mounted_rx.await.unwrap();
+        }
         let node_principal =
             OsString::from(cred_store.node_creds().unwrap().principal().to_string());
         TestCase {
             config,
             handle: Some(handle),
             node_principal,
+            stop_tx: Some(stop_tx),
             _receiver: receiver,
             _temp_dir: tmp,
             _swtpm: swtpm,
@@ -278,59 +298,34 @@ mod test {
         }
     }
 
-    impl<R> TestCase<R> {
-        fn run(mounted_tx: oneshot::Sender<()>, config: Config) {
-            let runtime = tokio::runtime::Builder::new_current_thread()
-                .build()
-                .unwrap();
-            // run_daemon can only be called in the context of a runtime, hence the need to call
-            // spawn_blocking.
-            let started = runtime.spawn_blocking(move || run_daemon(config, Some(mounted_tx)));
-            let future = runtime.block_on(started).unwrap();
-
-            runtime.block_on(future);
-        }
-
-        fn swtpm() -> (SwtpmHarness, TpmCredStore) {
-            let swtpm = SwtpmHarness::new().unwrap();
-            let state_path: PathBuf = swtpm.state_path().to_owned();
-            let cred_store = {
-                let context = swtpm.context().unwrap();
-                TpmCredStore::from_context(context, state_path.clone()).unwrap()
-            };
-            let root_creds = cred_store.gen_root_creds(ROOT_PASSWD).unwrap();
-            let mut node_creds = cred_store.node_creds().unwrap();
-            let expires = Epoch::now() + Duration::from_secs(3600);
-            let writecap = root_creds
-                .issue_writecap(node_creds.principal(), vec![], expires)
-                .unwrap();
-            cred_store
-                .assign_node_writecap(&mut node_creds, writecap)
-                .unwrap();
-            (swtpm, cred_store)
-        }
+    fn swtpm() -> (SwtpmHarness, TpmCredStore) {
+        let swtpm = SwtpmHarness::new().unwrap();
+        let state_path: PathBuf = swtpm.state_path().to_owned();
+        let cred_store = {
+            let context = swtpm.context().unwrap();
+            TpmCredStore::from_context(context, state_path.clone()).unwrap()
+        };
+        let root_creds = cred_store.gen_root_creds(ROOT_PASSWD).unwrap();
+        let mut node_creds = cred_store.node_creds().unwrap();
+        let expires = Epoch::now() + Duration::from_secs(3600);
+        let writecap = root_creds
+            .issue_writecap(node_creds.principal(), vec![], expires)
+            .unwrap();
+        cred_store
+            .assign_node_writecap(&mut node_creds, writecap)
+            .unwrap();
+        (swtpm, cred_store)
+    }
 
+    impl<R: Receiver> TestCase<R> {
         fn mnt_dir(&self) -> &Path {
             &self.config.mnt_dir
         }
 
-        fn wait(&mut self) {
+        async fn finished(&mut self) {
             if let Some(handle) = self.handle.take() {
-                handle.join().expect("join failed");
-            }
-        }
-
-        fn unmount_and_wait(&mut self) {
-            // If `handle` has already been taken that means `wait` has already been called. If
-            // this thread called `wait` and subsequently became unblocked, we know that the FUSE
-            // thread halted, which only happens when the file system is unmounted. Hence
-            // we don't have to unmount it, nor wait for the FUSE thread.
-            if self.handle.is_none() {
-                return;
+                handle.await.expect("join failed");
             }
-            let mnt_path = self.mnt_dir();
-            unmount(mnt_path);
-            self.wait();
         }
 
         fn initial_contents(&self) -> Vec<&OsStr> {
@@ -338,9 +333,14 @@ mod test {
         }
     }
 
-    impl<R> Drop for TestCase<R> {
+    impl<R: Receiver> Drop for TestCase<R> {
         fn drop(&mut self) {
-            self.unmount_and_wait()
+            self.stop_tx.take().map(|stop_tx| {
+                if let Err(_) = stop_tx.send(()) {
+                    log::error!("failed to send the TestCase stop signal");
+                }
+            });
+            unmount(self.mnt_dir());
         }
     }
 
@@ -350,124 +350,159 @@ mod test {
     #[allow(dead_code)]
     async fn manual_test() {
         let mut case = new_local().await;
-        case.wait()
+        case.finished().await
     }
 
-    #[tokio::test]
-    async fn write_read() -> Result<()> {
+    async fn write_read(case: TestCase<impl Receiver>) -> Result<()> {
         const EXPECTED: &[u8] =
-            b"The paths to failure are uncountable, yet to success there is but one.";
-        let case = new_local().await;
+            b"The paths to failure are uncountable, yet to success there are few.";
         let file_path = case.mnt_dir().join("file");
 
-        write(&file_path, EXPECTED)?;
+        write(&file_path, EXPECTED).await?;
 
-        let actual = read(&file_path)?;
+        let actual = read(&file_path).await?;
         assert_eq!(EXPECTED, actual);
         Ok(())
     }
 
     #[tokio::test]
-    async fn write_read_remote() -> Result<()> {
-        const EXPECTED: &[u8] =
-            b"The paths to failure are uncountable, yet to success there is but one.";
-        let case = new_remote().await;
-        let file_path = case.mnt_dir().join("file");
-
-        write(&file_path, EXPECTED)?;
+    async fn write_read_local() -> Result<()> {
+        write_read(new_local().await).await
+    }
 
-        let actual = read(&file_path)?;
-        assert_eq!(EXPECTED, actual);
-        Ok(())
+    // When the current thread runtime is used the test executable does not exit after the test
+    // method returns because one of the FuseDaemon blocking threads is blocked calling
+    // `FuseChannel::get_request`.
+    #[tokio::test(flavor = "multi_thread")]
+    async fn write_read_remote() -> Result<()> {
+        write_read(new_remote().await).await
     }
 
-    #[tokio::test]
-    async fn create_file_then_readdir() {
+    async fn create_file_then_readdir(case: TestCase<impl Receiver>) {
         const DATA: &[u8] = b"Au revoir Shoshanna!";
         let file_name = OsStr::new("landa_dialog.txt");
-        let case = new_local().await;
         let mut expected = case.initial_contents();
         expected.push(file_name);
         let mnt_path = case.mnt_dir();
         let file_path = mnt_path.join(file_name);
 
-        write(&file_path, DATA).expect("write failed");
+        write(&file_path, DATA).await.expect("write failed");
 
-        let first = file_names(read_dir(&mnt_path).expect("read_dir failed"));
-        assert!(first.eq(expected.iter().map(|e| *e)));
-        let second = file_names(read_dir(&mnt_path).expect("read_dir failed"));
-        assert!(second.eq(expected));
+        let first = file_names(read_dir(&mnt_path).await.expect("read_dir failed")).await;
+        assert_eq!(expected, first);
+        let second = file_names(read_dir(&mnt_path).await.expect("read_dir failed")).await;
+        assert_eq!(expected, second);
     }
 
     #[tokio::test]
-    async fn create_then_delete_file() {
+    async fn create_file_then_readdir_local() {
+        create_file_then_readdir(new_local().await).await
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn create_file_then_readdir_remote() {
+        create_file_then_readdir(new_remote().await).await
+    }
+
+    async fn create_then_delete_file(case: TestCase<impl Receiver>) {
         const DATA: &[u8] = b"The universe is hostile, so impersonal. Devour to survive";
         let file_name = OsStr::new("tool_lyrics.txt");
-        let case = new_local().await;
         let mnt_path = case.mnt_dir();
         let file_path = mnt_path.join(file_name);
-        write(&file_path, DATA).expect("write failed");
+        write(&file_path, DATA).await.expect("write failed");
 
-        remove_file(&file_path).expect("remove_file failed");
+        remove_file(&file_path).await.expect("remove_file failed");
 
         let expected = case.initial_contents();
-        let actual = file_names(read_dir(&mnt_path).expect("read_dir failed"));
-        assert!(actual.eq(expected));
+        let actual = file_names(read_dir(&mnt_path).await.expect("read_dir failed")).await;
+        assert_eq!(expected, actual);
     }
 
     #[tokio::test]
-    async fn hard_link_then_remove() {
+    async fn create_then_delete_file_local() {
+        create_then_delete_file(new_local().await).await
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn create_then_delete_file_remote() {
+        create_then_delete_file(new_remote().await).await
+    }
+
+    async fn hard_link_then_remove(case: TestCase<impl Receiver>) {
         const EXPECTED: &[u8] = b"And the lives we've reclaimed";
         let name1 = OsStr::new("refugee_lyrics.txt");
         let name2 = OsStr::new("rise_against_lyrics.txt");
-        let case = new_local().await;
         let mnt_path = case.mnt_dir();
         let path1 = mnt_path.join(name1);
         let path2 = mnt_path.join(name2);
-        write(&path1, EXPECTED).expect("write failed");
+        write(&path1, EXPECTED).await.expect("write failed");
 
-        hard_link(&path1, &path2).expect("hard_link failed");
-        remove_file(&path1).expect("remove_file failed");
+        hard_link(&path1, &path2).await.expect("hard_link failed");
+        remove_file(&path1).await.expect("remove_file failed");
 
-        let actual = read(&path2).expect("read failed");
+        let actual = read(&path2).await.expect("read failed");
         assert_eq!(EXPECTED, actual);
     }
 
     #[tokio::test]
-    async fn hard_link_then_remove_both() {
+    async fn hard_link_then_remove_local() {
+        hard_link_then_remove(new_local().await).await
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn hard_link_then_remove_remote() {
+        hard_link_then_remove(new_remote().await).await
+    }
+
+    async fn hard_link_then_remove_both(case: TestCase<impl Receiver>) {
         const EXPECTED: &[u8] = b"And the lives we've reclaimed";
         let name1 = OsStr::new("refugee_lyrics.txt");
         let name2 = OsStr::new("rise_against_lyrics.txt");
-        let case = new_local().await;
         let mnt_path = case.mnt_dir();
         let path1 = mnt_path.join(name1);
         let path2 = mnt_path.join(name2);
-        write(&path1, EXPECTED).expect("write failed");
+        write(&path1, EXPECTED).await.expect("write failed");
 
-        hard_link(&path1, &path2).expect("hard_link failed");
-        remove_file(&path1).expect("remove_file on path1 failed");
-        remove_file(&path2).expect("remove_file on path2 failed");
+        hard_link(&path1, &path2).await.expect("hard_link failed");
+        remove_file(&path1)
+            .await
+            .expect("remove_file on path1 failed");
+        remove_file(&path2)
+            .await
+            .expect("remove_file on path2 failed");
 
         let expected = case.initial_contents();
-        assert!(file_names(read_dir(&mnt_path).expect("read_dir failed")).eq(expected));
+        let actual = file_names(read_dir(&mnt_path).await.expect("read_dir failed")).await;
+        assert_eq!(expected, actual);
     }
 
     #[tokio::test]
-    async fn set_mode_bits() {
+    async fn hard_link_then_remove_both_local() {
+        hard_link_then_remove_both(new_local().await).await
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn hard_link_then_remove_both_remote() {
+        hard_link_then_remove_both(new_remote().await).await
+    }
+
+    async fn set_mode_bits(case: TestCase<impl Receiver>) {
         const EXPECTED: u32 = libc::S_IFREG | 0o777;
-        let case = new_local().await;
         let file_path = case.mnt_dir().join("bagobits");
-        write(&file_path, []).expect("write failed");
+        write(&file_path, []).await.expect("write failed");
         let original = metadata(&file_path)
+            .await
             .expect("metadata failed")
             .permissions()
             .mode();
         assert_ne!(EXPECTED, original);
 
         set_permissions(&file_path, Permissions::from_mode(EXPECTED))
+            .await
             .expect("set_permissions failed");
 
         let actual = metadata(&file_path)
+            .await
             .expect("metadata failed")
             .permissions()
             .mode();
@@ -475,60 +510,93 @@ mod test {
     }
 
     #[tokio::test]
-    async fn create_directory() {
+    async fn set_mode_bits_local() {
+        set_mode_bits(new_local().await).await
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn set_mode_bits_remote() {
+        set_mode_bits(new_remote().await).await
+    }
+
+    async fn create_directory(case: TestCase<impl Receiver>) {
         const EXPECTED: &str = "etc";
-        let case = new_local().await;
         let mnt_path = case.mnt_dir();
         let dir_path = mnt_path.join(EXPECTED);
         let mut expected = case.initial_contents();
         expected.push(OsStr::new(EXPECTED));
 
-        create_dir(&dir_path).expect("create_dir failed");
+        create_dir(&dir_path).await.expect("create_dir failed");
 
-        let actual = file_names(read_dir(mnt_path).expect("read_dir failed"));
-        assert!(actual.eq(expected));
+        let actual = file_names(read_dir(mnt_path).await.expect("read_dir failed")).await;
+        assert_eq!(expected, actual);
     }
 
     #[tokio::test]
-    async fn create_file_under_new_directory() {
+    async fn create_directory_local() {
+        create_directory(new_local().await).await
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn create_directory_remote() {
+        create_directory(new_remote().await).await
+    }
+
+    async fn create_file_under_new_directory(case: TestCase<impl Receiver>) {
         const DIR_NAME: &str = "etc";
         const FILE_NAME: &str = "file";
-        let case = new_local().await;
         let mnt_path = case.mnt_dir();
         let dir_path = mnt_path.join(DIR_NAME);
         let file_path = dir_path.join(FILE_NAME);
 
-        create_dir(&dir_path).expect("create_dir failed");
-        write(&file_path, []).expect("write failed");
+        create_dir(&dir_path).await.expect("create_dir failed");
+        write(&file_path, []).await.expect("write failed");
 
-        let actual = file_names(read_dir(dir_path).expect("read_dir failed"));
-        assert!(actual.eq([FILE_NAME]));
+        let actual = file_names(read_dir(dir_path).await.expect("read_dir failed")).await;
+        assert_eq!([FILE_NAME].as_slice(), actual.as_slice());
     }
 
     #[tokio::test]
-    async fn create_then_remove_directory() {
+    async fn create_file_under_new_directory_local() {
+        create_file_under_new_directory(new_local().await).await
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn create_file_under_new_directory_remote() {
+        create_file_under_new_directory(new_remote().await).await
+    }
+
+    async fn create_then_remove_directory(case: TestCase<impl Receiver>) {
         const DIR_NAME: &str = "etc";
-        let case = new_local().await;
         let mnt_path = case.mnt_dir();
         let dir_path = mnt_path.join(DIR_NAME);
 
-        create_dir(&dir_path).expect("create_dir failed");
-        remove_dir(&dir_path).expect("remove_dir failed");
+        create_dir(&dir_path).await.expect("create_dir failed");
+        remove_dir(&dir_path).await.expect("remove_dir failed");
 
-        let actual = file_names(read_dir(&mnt_path).expect("read_dir failed"));
-        assert!(actual.eq(case.initial_contents()));
+        let actual = file_names(read_dir(&mnt_path).await.expect("read_dir failed")).await;
+        assert_eq!(case.initial_contents(), actual);
     }
 
     #[tokio::test]
-    async fn read_only_dir_cant_create_subdir() {
+    async fn create_then_remote_directory_local() {
+        create_then_remove_directory(new_local().await).await
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn create_then_remote_directory_remote() {
+        create_then_remove_directory(new_remote().await).await
+    }
+
+    async fn read_only_dir_cant_create_subdir(case: TestCase<impl Receiver>) {
         const DIR_NAME: &str = "etc";
-        let case = new_local().await;
         let dir_path = case.mnt_dir().join(DIR_NAME);
-        create_dir(&dir_path).expect("create_dir failed");
+        create_dir(&dir_path).await.expect("create_dir failed");
         set_permissions(&dir_path, Permissions::from_mode(libc::S_IFDIR | 0o444))
+            .await
             .expect("set_permissions failed");
 
-        let result = create_dir(dir_path.join("sub"));
+        let result = create_dir(dir_path.join("sub")).await;
 
         let err = result.err().expect("create_dir returned `Ok`");
         let os_err = err.raw_os_error().expect("raw_os_error was empty");
@@ -536,17 +604,26 @@ mod test {
     }
 
     #[tokio::test]
-    async fn read_only_dir_cant_remove_subdir() {
+    async fn read_only_dir_cant_create_subdir_local() {
+        read_only_dir_cant_create_subdir(new_local().await).await
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn read_only_dir_cant_create_subdir_remote() {
+        read_only_dir_cant_create_subdir(new_remote().await).await
+    }
+
+    async fn read_only_dir_cant_remove_subdir(case: TestCase<impl Receiver>) {
         const DIR_NAME: &str = "etc";
-        let case = new_local().await;
         let dir_path = case.mnt_dir().join(DIR_NAME);
         let sub_path = dir_path.join("sub");
-        create_dir(&dir_path).expect("create_dir failed");
-        create_dir(&sub_path).expect("create_dir failed");
+        create_dir(&dir_path).await.expect("create_dir failed");
+        create_dir(&sub_path).await.expect("create_dir failed");
         set_permissions(&dir_path, Permissions::from_mode(libc::S_IFDIR | 0o444))
+            .await
             .expect("set_permissions failed");
 
-        let result = remove_dir(&sub_path);
+        let result = remove_dir(&sub_path).await;
 
         let err = result.err().expect("remove_dir returned `Ok`");
         let os_err = err.raw_os_error().expect("raw_os_error was empty");
@@ -554,22 +631,39 @@ mod test {
     }
 
     #[tokio::test]
-    async fn rename_file() {
+    async fn read_only_dir_cant_remove_subdir_local() {
+        read_only_dir_cant_remove_subdir(new_local().await).await
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn read_only_dir_cant_remove_subdir_remote() {
+        read_only_dir_cant_remove_subdir(new_remote().await).await
+    }
+
+    async fn rename_file(case: TestCase<impl Receiver>) {
         const FILE_NAME: &str = "parabola.txt";
         const EXPECTED: &[u8] = b"We are eternal all this pain is an illusion";
-        let case = new_local().await;
         let src_path = case.mnt_dir().join(FILE_NAME);
         let dst_path = case.mnt_dir().join("parabola_lyrics.txt");
 
-        write(&src_path, EXPECTED).unwrap();
-        rename(&src_path, &dst_path).unwrap();
+        write(&src_path, EXPECTED).await.unwrap();
+        rename(&src_path, &dst_path).await.unwrap();
 
-        let actual = read(&dst_path).unwrap();
+        let actual = read(&dst_path).await.unwrap();
         assert_eq!(EXPECTED, actual)
     }
 
     #[tokio::test]
-    async fn write_read_with_file_struct() {
+    async fn rename_file_local() {
+        rename_file(new_local().await).await
+    }
+
+    #[tokio::test(flavor = "multi_thread")]
+    async fn rename_file_remote() {
+        rename_file(new_remote().await).await
+    }
+
+    async fn write_read_with_file_struct(case: TestCase<impl Receiver>) {
         const FILE_NAME: &str = "big.dat";
         const LEN: usize = btlib::SECTOR_SZ_DEFAULT + 1;
         fn fill(buf: &mut Vec<u8>, value: u8) {
@@ -577,7 +671,6 @@ mod test {
             buf.extend(std::iter::repeat(value).take(buf.capacity()));
         }
 
-        let case = new_local().await;
         let file_path = case.mnt_dir().join(FILE_NAME);
         let mut buf = vec![1u8; LEN];
         let mut file = OpenOptions::new()
@@ -585,51 +678,73 @@ mod test {
             .read(true)
             .write(true)
             .open(&file_path)
+            .await
             .unwrap();
 
-        file.write_all(&buf).unwrap();
+        file.write_all(&buf).await.unwrap();
         fill(&mut buf, 2);
-        file.write_all(&buf).unwrap();
-        file.rewind().unwrap();
+        file.write_all(&buf).await.unwrap();
+        file.rewind().await.unwrap();
         let mut actual = vec![0u8; LEN];
-        file.read_exact(&mut actual).unwrap();
+        file.read_exact(&mut actual).await.unwrap();
 
         fill(&mut buf, 1);
         assert_eq!(buf, actual);
     }
 
-    //#[tokio::test]
-    #[allow(dead_code)]
+    #[tokio::test]
+    async fn write_read_with_file_struct_local() {
+        write_read_with_file_struct(new_local().await).await
+    }
+
+    //#[tokio::test(flavor = "multi_thread")]
+    async fn write_read_with_file_struct_remote() {
+        write_read_with_file_struct(new_remote().await).await
+    }
+
     /// KMC: This test is currently not working, and I've not been able to figure out why, nor
     /// reproduce it at a lower layer of the stack.
-    async fn read_more_than_whats_buffered() {
+    async fn read_more_than_whats_buffered(case: TestCase<impl Receiver>) {
         const FILE_NAME: &str = "big.dat";
         const SECT_SZ: usize = btlib::SECTOR_SZ_DEFAULT;
         const DIVISOR: usize = 8;
         const READ_SZ: usize = SECT_SZ / DIVISOR;
 
-        let case = new_local().await;
         let file_path = case.mnt_dir().join(FILE_NAME);
         let mut file = OpenOptions::new()
             .create(true)
             .read(true)
             .write(true)
             .open(&file_path)
+            .await
             .unwrap();
 
         let mut buf = vec![1u8; 2 * SECT_SZ];
-        file.write_all(&buf).unwrap();
-        file.flush().unwrap();
+        file.write_all(&buf).await.unwrap();
+        file.flush().await.unwrap();
         let mut file = OpenOptions::new()
             .read(true)
             .write(true)
             .open(&file_path)
+            .await
             .unwrap();
-        file.seek(SeekFrom::Start(SECT_SZ as u64)).unwrap();
+        file.seek(SeekFrom::Start(SECT_SZ as u64)).await.unwrap();
         let mut actual = vec![0u8; READ_SZ];
-        file.read_exact(&mut actual).unwrap();
+        file.read_exact(&mut actual).await.unwrap();
 
         buf.truncate(READ_SZ);
         assert!(buf == actual);
     }
+
+    //#[tokio::test]
+    #[allow(dead_code)]
+    async fn read_more_than_whats_buffered_local() {
+        read_more_than_whats_buffered(new_local().await).await
+    }
+
+    //#[tokio::test(flavor = "multi_thread")]
+    #[allow(dead_code)]
+    async fn read_more_than_whats_buffered_remote() {
+        read_more_than_whats_buffered(new_remote().await).await
+    }
 }

+ 12 - 2
crates/btlib/src/crypto.rs

@@ -10,8 +10,8 @@ pub use secret_stream::SecretStream;
 //pub use sign_stream::SignStream;
 
 use crate::{
-    btensure, bterr, fmt, io, BigArray, BlockMeta, BlockPath, Deserialize, Epoch, Formatter,
-    Hashable, Principal, Principaled, Result, Serialize, Writecap, WritecapBody,
+    btensure, bterr, fmt, io, BigArray, BlockError, BlockMeta, BlockPath, Deserialize, Epoch,
+    Formatter, Hashable, Principal, Principaled, Result, Serialize, Writecap, WritecapBody,
 };
 
 use btserde::{self, from_vec, to_vec, write_to};
@@ -2159,6 +2159,16 @@ pub trait CredsPriv: Decrypter + Signer {
     fn sign_kind(&self) -> Sign {
         Signer::kind(self)
     }
+
+    /// Returns the path these credentials are authorized to bind to according,
+    /// as specified by their [Writecap]. If these creds haven't been issued a [Writecap], then
+    /// an `Err` variant containing [BlockError::MissingWritecap] is returned.
+    fn bind_path(&self) -> Result<BlockPath> {
+        Ok(self
+            .writecap()
+            .ok_or(BlockError::MissingWritecap)?
+            .bind_path())
+    }
 }
 
 impl<T: CredsPriv> CredsPriv for &T {

+ 64 - 2
crates/btmsg/src/callback_framed.rs

@@ -67,6 +67,7 @@ macro_rules! attempt {
 
 impl<S: AsyncRead + Unpin> CallbackFramed<S> {
     pub async fn next<F: DeserCallback>(&mut self, mut callback: F) -> Option<Result<F::Return>> {
+        let mut total_read = 0;
         loop {
             if self.buffer.capacity() - self.buffer.len() == 0 {
                 // If there is no space left in the buffer we reserve additional bytes to ensure
@@ -77,7 +78,8 @@ impl<S: AsyncRead + Unpin> CallbackFramed<S> {
             if 0 == read_ct {
                 return None;
             }
-            match attempt!(Self::decode(&self.buffer[..read_ct]).await) {
+            total_read += read_ct;
+            match attempt!(Self::decode(&self.buffer[..total_read]).await) {
                 DecodeStatus::None => continue,
                 DecodeStatus::Reserve(count) => {
                     self.buffer.reserve(count);
@@ -128,7 +130,11 @@ mod tests {
 
     use futures::{future::Ready, SinkExt};
     use serde::Serialize;
-    use std::io::{Cursor, Seek};
+    use std::{
+        future::ready,
+        io::{Cursor, Seek},
+        task::Poll,
+    };
     use tokio_util::codec::FramedWrite;
 
     #[derive(Serialize, Deserialize)]
@@ -165,4 +171,60 @@ mod tests {
 
         assert!(matched)
     }
+
+    struct WindowedCursor {
+        window_sz: usize,
+        pos: usize,
+        buf: Vec<u8>,
+    }
+
+    impl WindowedCursor {
+        fn new(data: Vec<u8>, window_sz: usize) -> Self {
+            WindowedCursor {
+                window_sz,
+                pos: 0,
+                buf: data,
+            }
+        }
+    }
+
+    impl AsyncRead for WindowedCursor {
+        fn poll_read(
+            mut self: std::pin::Pin<&mut Self>,
+            _cx: &mut std::task::Context<'_>,
+            buf: &mut tokio::io::ReadBuf<'_>,
+        ) -> std::task::Poll<std::io::Result<()>> {
+            let end = self.buf.len().min(self.pos + self.window_sz);
+            let window = &self.buf[self.pos..end];
+            buf.put_slice(window);
+            self.as_mut().pos += window.len();
+            Poll::Ready(Ok(()))
+        }
+    }
+
+    struct CopyCallback;
+
+    impl DeserCallback for CopyCallback {
+        type Arg<'de> = Msg<'de>;
+        type Return = Vec<u8>;
+        type CallFut<'de> = std::future::Ready<Self::Return>;
+        fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
+            ready(arg.0.to_owned())
+        }
+    }
+
+    #[tokio::test]
+    async fn read_in_multiple_parts() {
+        const EXPECTED: &[u8] = b"We live in the most interesting of times.";
+        let mut write = FramedWrite::new(Cursor::new(Vec::<u8>::new()), MsgEncoder);
+        write.send(Msg(EXPECTED)).await.unwrap();
+        let data = write.into_inner().into_inner();
+        // This will force the CallbackFramed to read the message in multiple iterations.
+        let io = WindowedCursor::new(data, EXPECTED.len() / 2);
+        let mut read = CallbackFramed::new(io);
+
+        let actual = read.next(CopyCallback).await.unwrap().unwrap();
+
+        assert_eq!(EXPECTED, &actual);
+    }
 }

+ 37 - 12
crates/btmsg/src/lib.rs

@@ -23,6 +23,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
 use std::{
     any::Any,
     hash::Hash,
+    io,
     marker::PhantomData,
     net::{IpAddr, Ipv6Addr, SocketAddr},
     result::Result as StdResult,
@@ -153,7 +154,16 @@ impl<T> Envelope<T> {
 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
 enum ReplyEnvelope<T> {
     Ok(T),
-    Err(String),
+    Err {
+        message: String,
+        os_code: Option<i32>,
+    },
+}
+
+impl<T> ReplyEnvelope<T> {
+    fn err(message: String, os_code: Option<i32>) -> Self {
+        Self::Err { message, os_code }
+    }
 }
 
 /// A message tagged with the block path that it was sent from.
@@ -206,7 +216,7 @@ pub trait Receiver {
     /// Creates a [Transmitter] which is connected to the given address.
     fn transmitter(&self, addr: Arc<BlockAddr>) -> Self::TransmitterFut<'_>;
 
-    type CompleteErr: std::error::Error;
+    type CompleteErr: std::error::Error + Send;
     type CompleteFut<'a>: 'a + Future<Output = StdResult<(), Self::CompleteErr>> + Send
     where
         Self: 'a;
@@ -350,9 +360,9 @@ impl Replier {
         Ok(())
     }
 
-    pub async fn reply_err(&mut self, err: String) -> Result<()> {
+    pub async fn reply_err(&mut self, err: String, os_code: Option<i32>) -> Result<()> {
         let mut guard = self.stream.lock().await;
-        guard.send(ReplyEnvelope::<()>::Err(err)).await?;
+        guard.send(ReplyEnvelope::<()>::err(err, os_code)).await?;
         Ok(())
     }
 }
@@ -364,10 +374,10 @@ struct MsgRecvdCallback<F> {
 }
 
 impl<F: MsgCallback> MsgRecvdCallback<F> {
-    fn new(path: Arc<BlockPath>, framed_msg: FramedMsg, inner: F) -> Self {
+    fn new(path: Arc<BlockPath>, framed_msg: Arc<Mutex<FramedMsg>>, inner: F) -> Self {
         Self {
             path,
-            replier: Replier::new(Arc::new(Mutex::new(framed_msg))),
+            replier: Replier::new(framed_msg),
             inner,
         }
     }
@@ -387,10 +397,17 @@ impl<F: 'static + MsgCallback> DeserCallback for MsgRecvdCallback<F> {
                 .inner
                 .call(MsgReceived::new(self.path.clone(), arg, replier))
                 .await;
-            if let Err(ref err) = result {
-                self.replier.reply_err(err.to_string()).await?;
+            match result {
+                Ok(value) => Ok(value),
+                Err(err) => match err.downcast::<io::Error>() {
+                    Ok(err) => {
+                        self.replier
+                            .reply_err(err.to_string(), err.raw_os_error())
+                            .await
+                    }
+                    Err(err) => self.replier.reply_err(err.to_string(), None).await,
+                },
             }
-            result
         }
     }
 }
@@ -515,8 +532,9 @@ impl QuicReceiver {
         recv_stream: RecvStream,
         callback: F,
     ) {
-        let framed_msg = FramedWrite::new(send_stream, MsgEncoder::new());
-        let callback = MsgRecvdCallback::new(client_path.clone(), framed_msg, callback.clone());
+        let framed_msg = Arc::new(Mutex::new(FramedWrite::new(send_stream, MsgEncoder::new())));
+        let callback =
+            MsgRecvdCallback::new(client_path.clone(), framed_msg.clone(), callback.clone());
         let mut msg_stream = CallbackFramed::new(recv_stream);
         let result = msg_stream
             .next(callback)
@@ -712,7 +730,14 @@ impl<F: 'static + Send + DeserCallback> DeserCallback for ReplyCallback<F> {
         async move {
             match arg {
                 ReplyEnvelope::Ok(msg) => Ok(self.inner.call(msg).await),
-                ReplyEnvelope::Err(err) => Err(bterr!(err)),
+                ReplyEnvelope::Err { message, os_code } => {
+                    if let Some(os_code) = os_code {
+                        let err = bterr!(io::Error::from_raw_os_error(os_code)).context(message);
+                        Err(err)
+                    } else {
+                        Err(bterr!(message))
+                    }
+                }
             }
         }
     }