Browse Source

Finished testing btfsd.

Matthew Carr 2 years ago
parent
commit
681c7160ca

+ 2 - 0
Cargo.lock

@@ -250,8 +250,10 @@ dependencies = [
  "btlib",
  "btlib-tests",
  "btmsg",
+ "btserde",
  "ctor",
  "env_logger",
+ "libc",
  "log",
  "swtpm-harness",
  "tempdir",

+ 19 - 0
README.md

@@ -50,6 +50,25 @@ The you can run flamegraph as your user account with:
 ```
 (source: https://crates.io/crates/flamegraph)
 
+## Test coverage with tarpaulin
+The [tarpaulin](https://github.com/xd009642/tarpaulin) crate can be used to generate code coverage
+reports. You can install it with cargo with `cargo install tarpaulin`. To generate an HTML report
+for the entire repository, execute `cargo tarpaulin --out Html` from the root of
+this repository. The generated report will be saved to the root of the repository. 
+You can generate coverage reports for specific crates by first navigating
+to the crates directory then executing the above command.
+Note that even if you run the tool in a subdirectory, the report will still be saved in the root
+of the repository.
+Please do not commit the coverage report.
+
+## Test Flakiness in btfuse
+The tests in btfuse are currently suffering from an issue where they randomly fail with the error
+`fusermount exited with a non-zero status: 1`. This doesn't happen every time, but it does
+happen annoyingly often. The issue is that the `fusermount` program is being used to umount
+the test file system, and it fails if the kernel is still accessing this file system when it's
+called. In the future I'd like to solve this by adopting (or writing) a different FUSE library,
+perhaps one which just wraps `libfuse`.
+
 ## License
 Copyright 2023 Delease, LLC. The software contained in this repository is licensed under the
 GNU Affero General Public License Version 3 or later. A copy of this license is provided in the

+ 1 - 1
crates/btfproto-tests/src/lib.rs

@@ -1,5 +1,5 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
-pub mod local_fs;
+pub mod local_fs_tests;
 pub mod mode_authorizer;
 
 use btlib::{

+ 12 - 7
crates/btfproto-tests/src/local_fs.rs → crates/btfproto-tests/src/local_fs_tests.rs

@@ -5,6 +5,7 @@ use super::node_creds;
 
 use btfproto::{
     local_fs::{LocalFs, ModeAuthorizer},
+    msg::{GrantAccess, SpecInodes},
     server::FsProvider,
 };
 use btlib::{
@@ -95,7 +96,11 @@ impl LocalFsTest {
                 supp_gids: Vec::new(),
             },
         };
-        fs.grant_access(&root_bind_path, proc_rec).await.unwrap();
+        let msg = GrantAccess {
+            inode: SpecInodes::RootDir.into(),
+            record: proc_rec,
+        };
+        fs.grant_access(&root_bind_path, msg).await.unwrap();
     }
 
     pub fn into_parts(self) -> (TempDir, ConcreteFs, Arc<BlockPath>) {
@@ -696,7 +701,7 @@ mod tests {
         let alloc_msg = Allocate {
             inode,
             handle,
-            offset: 0,
+            offset: None,
             size: actual.len() as u64,
         };
         bt.allocate(from, alloc_msg).await.unwrap();
@@ -738,7 +743,7 @@ mod tests {
         let alloc_msg = Allocate {
             inode,
             handle,
-            offset: 0,
+            offset: None,
             size,
         };
         bt.allocate(from, alloc_msg).await.unwrap();
@@ -763,7 +768,7 @@ mod tests {
         };
         let ReadMetaReply { attrs, .. } = bt.read_meta(from, read_meta_msg).await.unwrap();
 
-        assert_eq!(vec![0u8; LEN], actual.into_inner());
+        assert!(vec![0u8; LEN].eq(&actual.into_inner()));
         assert_eq!(LEN as u64, attrs.size);
     }
 
@@ -787,7 +792,7 @@ mod tests {
         let alloc_msg = Allocate {
             inode,
             handle,
-            offset: 0,
+            offset: None,
             size,
         };
         bt.allocate(from, alloc_msg).await.unwrap();
@@ -832,11 +837,11 @@ mod tests {
             .write(from, inode, handle, 0, LEN as u64, [1u8; LEN].as_slice())
             .await
             .unwrap();
-        assert_eq!(8, written);
+        assert_eq!(LEN as u64, written);
         let alloc_msg = Allocate {
             inode,
             handle,
-            offset: 0,
+            offset: None,
             size: (LEN / 2) as u64,
         };
         bt.allocate(from, alloc_msg).await.unwrap();

+ 44 - 9
crates/btfproto/src/client.rs

@@ -1,7 +1,11 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
 use crate::{msg::*, Handle, Inode};
 
-use btlib::{bterr, Result};
+use btlib::{
+    bterr,
+    crypto::{AsymKeyPub, Encrypt},
+    IssuedProcRec, Principal, Result,
+};
 use btmsg::{DeserCallback, Transmitter};
 
 use core::future::{ready, Future, Ready};
@@ -199,6 +203,22 @@ impl<T: Transmitter> FsClient<T> {
         self.tx.call(msg, AckCallback).await?
     }
 
+    pub async fn read_dir(
+        &self,
+        inode: Inode,
+        handle: Handle,
+        limit: u32,
+        state: u64,
+    ) -> Result<ReadDirReply> {
+        let msg = FsMsg::ReadDir(ReadDir {
+            inode,
+            handle,
+            limit,
+            state,
+        });
+        self.tx.call(msg, ExtractReadDir).await?
+    }
+
     pub async fn link(&self, inode: Inode, new_parent: Inode, name: &str) -> Result<LinkReply> {
         let msg = FsMsg::Link(Link {
             inode,
@@ -234,6 +254,16 @@ impl<T: Transmitter> FsClient<T> {
         self.tx.call(msg, ExtractWriteMeta).await?
     }
 
+    pub async fn allocate(&self, inode: Inode, handle: Handle, size: u64) -> Result<()> {
+        let msg = FsMsg::Allocate(Allocate {
+            inode,
+            handle,
+            offset: None,
+            size,
+        });
+        self.tx.call(msg, AckCallback).await?
+    }
+
     pub async fn close(&self, inode: Inode, handle: Handle) -> Result<()> {
         let msg = FsMsg::Close(Close { inode, handle });
         self.tx.call(msg, AckCallback).await?
@@ -258,20 +288,25 @@ impl<T: Transmitter> FsClient<T> {
         self.tx.call(msg, AckCallback).await?
     }
 
-    pub async fn read_dir(
+    pub async fn add_readcap(
         &self,
         inode: Inode,
         handle: Handle,
-        limit: u32,
-        state: u64,
-    ) -> Result<ReadDirReply> {
-        let msg = FsMsg::ReadDir(ReadDir {
+        principal: Principal,
+        enc_key: AsymKeyPub<Encrypt>,
+    ) -> Result<()> {
+        let msg = FsMsg::AddReadcap(AddReadcap {
             inode,
             handle,
-            limit,
-            state,
+            principal,
+            enc_key,
         });
-        self.tx.call(msg, ExtractReadDir).await?
+        self.tx.call(msg, AckCallback).await?
+    }
+
+    pub async fn grant_access(&self, inode: Inode, record: IssuedProcRec) -> Result<()> {
+        let msg = FsMsg::GrantAccess(GrantAccess { inode, record });
+        self.tx.call(msg, AckCallback).await?
     }
 }
 

+ 62 - 29
crates/btfproto/src/local_fs.rs

@@ -7,9 +7,9 @@ use btlib::{
     crypto::{Creds, Decrypter, Signer},
     error::{BtErr, DisplayErr},
     AuthzAttrs, BlockAccessor, BlockError, BlockMeta, BlockMetaSecrets, BlockOpenOptions,
-    BlockPath, BlockReader, DirEntry, Directory, Epoch, FileBlock, FlushMeta,
-    IssuedProcRec, MetaAccess, MetaReader, Positioned, Principal, Principaled, ProcRec, Result,
-    Split, TrySeek, WriteDual, ZeroExtendable,
+    BlockPath, BlockReader, DirEntry, Directory, Epoch, FileBlock, FlushMeta, IssuedProcRec,
+    MetaAccess, MetaReader, Positioned, Principal, Principaled, ProcRec, Result, Split, TrySeek,
+    WriteDual, ZeroExtendable,
 };
 use btserde::{read_from, write_to};
 use core::future::{ready, Ready};
@@ -256,7 +256,7 @@ mod private {
             let accessor = accessor
                 .into_inner()
                 .display_err()?
-                .ok_or_else(|| bterr!("a thread paniced while it help the accessor lock"))?;
+                .ok_or_else(|| bterr!("a thread paniced while it held the accessor lock"))?;
             let mut accessor = Accessor::combine(accessor, block);
             let dir = accessor.read_dir()?;
             let (accessor, ..) = accessor.split();
@@ -568,7 +568,11 @@ mod private {
                     },
                 };
                 root_block_path.push_component(root_principal.to_string());
-                fs.grant_access_to(&Arc::new(root_block_path), proc_rec)?;
+                fs.grant_access_to(
+                    &Arc::new(root_block_path),
+                    SpecInodes::RootDir.into(),
+                    proc_rec,
+                )?;
             }
             Ok(fs)
         }
@@ -876,17 +880,37 @@ mod private {
         /// # Warning
         /// This method calls `self.authz_attrs`, so the same consideration for avoiding deadlock
         /// apply to this method as well. See the documentation of `self.authz_attrs` for details.
-        fn grant_access_to(&self, from: &Arc<BlockPath>, proc_rec: IssuedProcRec) -> Result<()> {
+        fn grant_access_to(
+            &self,
+            from: &Arc<BlockPath>,
+            inode: Inode,
+            proc_rec: IssuedProcRec,
+        ) -> Result<()> {
             let authz_attrs = self.authz_attrs(from)?;
             let principal = proc_rec.pub_creds.principal();
-            let bind_path = proc_rec.writecap.bind_path();
-            let proc_rec_name = bind_path
-                .components()
-                .last()
-                .ok_or_else(|| bterr!("invalid bind path"))?
-                .to_owned();
-            let inode = self.next_inode()?;
-            self.borrow_block_mut(SpecInodes::RootDir.into(), |block| {
+            let next_inode = if inode == SpecInodes::RootDir.value() {
+                // If the inode is for the root directory we need to add a readcap for the
+                // superblock when we access it.
+                self.borrow_block_mut(SpecInodes::Sb.into(), |mut block| {
+                    let next_inode = self.next_inode.fetch_add(1, Ordering::Relaxed);
+                    let sb = Superblock {
+                        generation: self.generation,
+                        next_inode: next_inode + 1,
+                    };
+                    block.rewind()?;
+                    write_to(&sb, &mut block)?;
+                    block
+                        .mut_meta_body()
+                        .add_readcap_for(principal.clone(), &proc_rec.pub_creds.enc)?;
+                    block.flush()?;
+
+                    Ok(next_inode)
+                })
+            } else {
+                self.next_inode()
+            }?;
+
+            self.borrow_block_mut(inode, |block| {
                 self.authorizer
                     .can_write(&AuthzContext::new(from, &authz_attrs, block.meta()))?;
                 block
@@ -894,21 +918,18 @@ mod private {
                     .add_readcap_for(principal.clone(), &proc_rec.pub_creds.enc)?;
 
                 let mut dir = block.read_dir()?;
-                dir.add_file(proc_rec_name, inode)?;
+                let proc_rec_name = principal.to_string();
+                dir.add_file(proc_rec_name, next_inode)?;
                 // Note that write_dir calls flush, which also ensures metadata is written to disk.
                 block.write_dir(&dir)?;
 
                 Ok(())
             })?;
-            self.borrow_block_mut(SpecInodes::Sb.into(), |block| {
-                block
-                    .mut_meta_body()
-                    .add_readcap_for(principal.clone(), &proc_rec.pub_creds.enc)?;
-                block.flush_meta()
-            })?;
+
             let self_writecap = self.creds.writecap().ok_or(BlockError::MissingWritecap)?;
             let self_bind_path = Arc::new(self_writecap.bind_path());
-            self.open_value(self_bind_path.clone(), inode, bind_path, |value| {
+            let bind_path = proc_rec.writecap.bind_path();
+            self.open_value(self_bind_path.clone(), next_inode, bind_path, |value| {
                 value.borrow_block_mut(|block| {
                     // TODO: This should not be needed because the principal has been given access
                     // to the root directory. Once proper inherited readcaps are implemented we will
@@ -922,7 +943,7 @@ mod private {
             // We must ensure the reference count for the inode is decremented, otherwise the table
             // entry will never be freed.
             let mut inodes = self.inodes.write().display_err()?;
-            self.inode_forget(&mut inodes, &self_bind_path, inode, 1)?;
+            self.inode_forget(&mut inodes, &self_bind_path, next_inode, 1)?;
             Ok(())
         }
 
@@ -972,7 +993,7 @@ mod private {
             let inode = self.lookup_inode(relative.components())?;
             let proc_rec =
                 self.open_value(from.clone(), inode, from.as_ref().to_owned(), |value| {
-                    value.borrow_block_mut(|block| block.read_proc())
+                    value.borrow_block_mut(|block| block.read_proc_rec())
                 })?;
             let proc_rec = proc_rec.validate()?;
             Ok(proc_rec.authz_attrs)
@@ -1394,7 +1415,9 @@ mod private {
                 let ReadMeta { inode, handle } = msg;
                 debug!("read_meta: inode {inode}, handle {:?}", handle);
                 let attrs = if let Some(handle) = handle {
-                    self.access_block(from, inode, handle, |block, _| Ok(block.meta_body().secrets()?.to_owned()))?
+                    self.access_block(from, inode, handle, |block, _| {
+                        Ok(block.meta_body().secrets()?.to_owned())
+                    })?
                 } else {
                     self.borrow_block(inode, |block| Ok(block.meta_body().secrets()?.to_owned()))?
                 };
@@ -1484,10 +1507,18 @@ mod private {
                 offset,
                 size,
             } = msg;
-            debug!("allocate: inode {inode}, handle {handle}, offset {offset}, size {size}");
+            debug!(
+                "allocate: inode {inode}, handle {handle}, offset {:?}, size {size}",
+                offset
+            );
             let result = self.access_block_mut(from, inode, handle, |block, _| {
                 let curr_size = block.meta_body().secrets()?.size;
-                let new_size = curr_size.max(offset + size);
+                if let Some(offset) = offset {
+                    if curr_size != offset {
+                        return Err(bterr!("only allocations at the end of files are supported"));
+                    }
+                }
+                let new_size = curr_size.max(size);
                 if new_size > curr_size {
                     block.zero_extend(new_size - curr_size)?;
                 }
@@ -1561,9 +1592,11 @@ mod private {
         fn grant_access<'c>(
             &'c self,
             from: &'c Arc<BlockPath>,
-            msg: IssuedProcRec,
+            msg: GrantAccess,
         ) -> Self::GrantAccessFut<'c> {
-            ready(self.grant_access_to(from, msg))
+            let GrantAccess { inode, record } = msg;
+            debug!("grant_access: inode {inode}, record {:?}", record);
+            ready(self.grant_access_to(from, inode, record))
         }
     }
 }

+ 18 - 5
crates/btfproto/src/msg.rs

@@ -2,8 +2,9 @@
 use super::{Handle, Inode};
 
 use btlib::{
+    bterr,
     crypto::{AsymKeyPub, Encrypt},
-    BlockMetaSecrets, DirEntry, Epoch, IssuedProcRec, Principal, DirEntryKind, bterr,
+    BlockMetaSecrets, DirEntry, DirEntryKind, Epoch, IssuedProcRec, Principal,
 };
 use btmsg::CallMsg;
 use core::time::Duration;
@@ -39,7 +40,7 @@ pub enum FsMsg<'a> {
     Lock(Lock),
     Unlock(Unlock),
     AddReadcap(AddReadcap),
-    GrantAccess(IssuedProcRec),
+    GrantAccess(GrantAccess),
 }
 
 #[derive(Serialize, Deserialize)]
@@ -84,9 +85,15 @@ pub enum SpecInodes {
     FirstFree = 11,
 }
 
+impl SpecInodes {
+    pub fn value(self) -> Inode {
+        self as Inode
+    }
+}
+
 impl From<SpecInodes> for Inode {
     fn from(special: SpecInodes) -> Self {
-        special as Inode
+        special.value()
     }
 }
 
@@ -348,7 +355,7 @@ impl AttrsSet {
     field!(5, CTIME);
 
     pub const ALL: Self = Self::new(
-        Self::MODE.0 | Self::UID.0 | Self::GID.0 | Self::ATIME.0 | Self::MTIME.0 | Self::CTIME.0
+        Self::MODE.0 | Self::UID.0 | Self::GID.0 | Self::ATIME.0 | Self::MTIME.0 | Self::CTIME.0,
     );
 
     pub const fn new(value: u16) -> Self {
@@ -539,7 +546,7 @@ pub struct WriteMetaReply {
 pub struct Allocate {
     pub inode: Inode,
     pub handle: Handle,
-    pub offset: u64,
+    pub offset: Option<u64>,
     pub size: u64,
 }
 
@@ -582,3 +589,9 @@ pub struct AddReadcap {
     pub principal: Principal,
     pub enc_key: AsymKeyPub<Encrypt>,
 }
+
+#[derive(Serialize, Deserialize)]
+pub struct GrantAccess {
+    pub inode: Inode,
+    pub record: IssuedProcRec,
+}

+ 5 - 5
crates/btfproto/src/server.rs

@@ -4,7 +4,7 @@ use crate::{
     Handle, Inode,
 };
 
-use btlib::{crypto::Creds, BlockPath, IssuedProcRec, Result};
+use btlib::{crypto::Creds, BlockPath, Result};
 use btmsg::{receiver, MsgCallback, MsgReceived, Receiver};
 use core::future::Future;
 use std::{io::Read, net::IpAddr, sync::Arc};
@@ -115,7 +115,7 @@ pub trait FsProvider: Send + Sync {
     fn grant_access<'c>(
         &'c self,
         from: &'c Arc<BlockPath>,
-        msg: IssuedProcRec,
+        msg: GrantAccess,
     ) -> Self::GrantAccessFut<'c>;
 }
 
@@ -230,7 +230,7 @@ impl<P: FsProvider> FsProvider for &P {
     fn grant_access<'c>(
         &'c self,
         from: &'c Arc<BlockPath>,
-        msg: IssuedProcRec,
+        msg: GrantAccess,
     ) -> Self::GrantAccessFut<'c> {
         (*self).grant_access(from, msg)
     }
@@ -311,8 +311,8 @@ impl<P: 'static + Send + Sync + FsProvider> MsgCallback for ServerCallback<P> {
                 FsMsg::AddReadcap(add_readcap) => {
                     FsReply::Ack(provider.add_readcap(&from, add_readcap).await?)
                 }
-                FsMsg::GrantAccess(proc_rec) => {
-                    FsReply::Ack(provider.grant_access(&from, proc_rec).await?)
+                FsMsg::GrantAccess(grant_access) => {
+                    FsReply::Ack(provider.grant_access(&from, grant_access).await?)
                 }
             };
             replier.unwrap().reply(reply).await

+ 3 - 1
crates/btfsd/Cargo.toml

@@ -14,7 +14,9 @@ log = "0.4.17"
 env_logger = "0.9.0"
 
 [dev-dependencies]
+btserde = { path = "../btserde" }
 swtpm-harness = { path = "../swtpm-harness" }
 btlib-tests = { path = "../btlib-tests" }
 tempdir = "0.3.7"
-ctor = { version = "0.1.22" }
+ctor = { version = "0.1.22" }
+libc = { version = "0.2.137" }

+ 345 - 13
crates/btfsd/src/main.rs

@@ -73,10 +73,20 @@ mod tests {
     use super::*;
 
     use btfproto::{client::FsClient, msg::*};
-    use btlib::{log::BuilderExt, BlockMetaSecrets, Epoch};
+    use btlib::{
+        crypto::{ConcreteCreds, CredsPriv, CredsPub},
+        log::BuilderExt,
+        AuthzAttrs, BlockMetaSecrets, Epoch, IssuedProcRec, Principaled, ProcRec,
+    };
     use btlib_tests::TpmCredStoreHarness;
-    use btmsg::Transmitter;
-    use std::{future::ready, net::Ipv4Addr};
+    use btmsg::{BlockAddr, Transmitter};
+    use btserde::from_slice;
+    use std::{
+        future::ready,
+        net::{Ipv4Addr, SocketAddr},
+        time::Duration,
+    };
+    use swtpm_harness::SwtpmHarness;
     use tempdir::TempDir;
 
     const LOG_LEVEL: &str = "warn";
@@ -90,12 +100,13 @@ mod tests {
     struct TestCase<R, T> {
         client: FsClient<T>,
         rx: R,
-        _harness: TpmCredStoreHarness,
+        harness: TpmCredStoreHarness,
         _dir: TempDir,
     }
 
     const ROOT_PASSWD: &str = "existential_threat";
     const LOCALHOST: IpAddr = IpAddr::V6(Ipv6Addr::LOCALHOST);
+    const BT_DIR: &str = "bt";
 
     async fn test_case(
         dir: TempDir,
@@ -106,14 +117,14 @@ mod tests {
             ip_addr,
             tabrmd: harness.swtpm().tabrmd_config().to_owned(),
             tpm_state_path: harness.swtpm().state_path().to_owned(),
-            block_dir: dir.path().join("bt"),
+            block_dir: dir.path().join(BT_DIR),
         };
         let rx = receiver(config);
         let tx = rx.transmitter(rx.addr().clone()).await.unwrap();
         let client = FsClient::new(tx);
         TestCase {
             _dir: dir,
-            _harness: harness,
+            harness,
             rx,
             client,
         }
@@ -130,7 +141,11 @@ mod tests {
     ) -> TestCase<impl Receiver, impl Transmitter> {
         case.rx.stop().await.unwrap();
         case.rx.complete().unwrap().await.unwrap();
-        let TestCase { _dir, _harness, .. } = case;
+        let TestCase {
+            _dir,
+            harness: _harness,
+            ..
+        } = case;
         test_case(_dir, _harness, IpAddr::V4(Ipv4Addr::LOCALHOST)).await
     }
 
@@ -456,13 +471,12 @@ mod tests {
             .unwrap();
         let before_read = Epoch::now();
         let ReadMetaReply { attrs, .. } = client.read_meta(inode, Some(handle)).await.unwrap();
-        let after = Epoch::now();
 
         let actual = attrs.mode;
         assert_eq!(FileType::Reg | EXPECTED, actual);
         assert!(before_create <= attrs.ctime && attrs.ctime <= before_read);
-        assert!(before_read <= attrs.mtime && attrs.mtime <= after);
-        assert!(before_read <= attrs.atime && attrs.atime <= after);
+        assert!(before_create <= attrs.mtime && attrs.mtime <= before_read);
+        assert!(before_create <= attrs.atime && attrs.atime <= before_read);
         assert_eq!(attrs.block_id.inode, inode);
         assert_eq!(attrs.size, 0);
         assert!(attrs.tags.is_empty());
@@ -470,7 +484,7 @@ mod tests {
 
     #[tokio::test]
     async fn write_meta() {
-        fn check(expected: &Attrs, actual: &BlockMetaSecrets) {
+        fn assert_eq(expected: &Attrs, actual: &BlockMetaSecrets) {
             assert_eq!(expected.mode, actual.mode);
             assert_eq!(expected.uid, actual.uid);
             assert_eq!(expected.gid, actual.gid);
@@ -511,9 +525,327 @@ mod tests {
             .write_meta(inode, Some(handle), expected.clone(), AttrsSet::ALL)
             .await
             .unwrap();
-        check(&expected, &attrs);
+        assert_eq(&expected, &attrs);
         let ReadMetaReply { attrs, .. } = client.read_meta(inode, Some(handle)).await.unwrap();
 
-        check(&expected, &attrs);
+        assert_eq(&expected, &attrs);
+    }
+
+    #[tokio::test]
+    async fn allocate_when_empty() {
+        const FILENAME: &str = "output.dat";
+        const EXPECTED: &[u8] = &[0u8; 8];
+        let case = new_case().await;
+        let client = &case.client;
+
+        let CreateReply { inode, handle, .. } = client
+            .create(
+                SpecInodes::RootDir.into(),
+                FILENAME,
+                FlagValue::ReadWrite.into(),
+                0o644,
+                0,
+            )
+            .await
+            .unwrap();
+        client
+            .allocate(inode, handle, EXPECTED.len() as u64)
+            .await
+            .unwrap();
+        let msg = Read {
+            inode,
+            handle,
+            offset: 0,
+            size: EXPECTED.len() as u64,
+        };
+        let actual = client
+            .read(msg, |reply| ready(Vec::from(reply.data)))
+            .await
+            .unwrap();
+
+        assert_eq!(EXPECTED, actual);
+    }
+
+    #[tokio::test]
+    async fn allocate_with_data_present() {
+        const FILENAME: &str = "output.dat";
+        const EXPECTED: &[u8] = &[1, 1, 1, 1, 0, 0, 0, 0];
+        let case = new_case().await;
+        let client = &case.client;
+
+        let CreateReply { inode, handle, .. } = client
+            .create(
+                SpecInodes::RootDir.into(),
+                FILENAME,
+                FlagValue::ReadWrite.into(),
+                0o644,
+                0,
+            )
+            .await
+            .unwrap();
+        client.write(inode, handle, 0, &[1, 1, 1, 1]).await.unwrap();
+        client
+            .allocate(inode, handle, EXPECTED.len() as u64)
+            .await
+            .unwrap();
+        let msg = Read {
+            inode,
+            handle,
+            offset: 0,
+            size: EXPECTED.len() as u64,
+        };
+        let actual = client
+            .read(msg, |reply| ready(Vec::from(reply.data)))
+            .await
+            .unwrap();
+
+        assert_eq!(EXPECTED, actual);
+    }
+
+    #[tokio::test]
+    async fn forget() {
+        const FILENAME: &str = "seed.dat";
+        let case = new_case().await;
+        let client = &case.client;
+
+        let CreateReply { inode, handle, .. } = client
+            .create(
+                SpecInodes::RootDir.into(),
+                FILENAME,
+                FlagValue::ReadWrite.into(),
+                0o644,
+                0,
+            )
+            .await
+            .unwrap();
+        client.close(inode, handle).await.unwrap();
+        let result = client.forget(inode, 1).await;
+
+        assert!(result.is_ok());
+    }
+
+    #[tokio::test]
+    async fn add_readcap() {
+        const FILENAME: &str = "net";
+        let case = new_case().await;
+        let client = &case.client;
+        let creds = ConcreteCreds::generate().unwrap();
+
+        let CreateReply { inode, handle, .. } = client
+            .create(
+                SpecInodes::RootDir.into(),
+                FILENAME,
+                FlagValue::ReadWrite | FlagValue::Directory,
+                0o755,
+                0,
+            )
+            .await
+            .unwrap();
+        client
+            .add_readcap(inode, handle, creds.principal(), creds.concrete_pub().enc)
+            .await
+            .unwrap();
+    }
+
+    #[tokio::test]
+    async fn grant_access_to_root() {
+        let case = new_case().await;
+        let client = &case.client;
+        let mut creds = ConcreteCreds::generate().unwrap();
+        let root_creds = case.harness.root_creds().unwrap();
+        let writecap = root_creds
+            .issue_writecap(
+                creds.principal(),
+                vec![],
+                Epoch::now() + Duration::from_secs(3600),
+            )
+            .unwrap();
+        creds.set_writecap(writecap);
+        let expected = IssuedProcRec {
+            addr: SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 52982),
+            pub_creds: creds.concrete_pub(),
+            writecap: creds.writecap().unwrap().to_owned(),
+            authz_attrs: AuthzAttrs {
+                uid: 9001,
+                gid: 9001,
+                supp_gids: vec![12, 41, 19],
+            },
+        };
+
+        client
+            .grant_access(SpecInodes::RootDir.into(), expected.clone())
+            .await
+            .unwrap();
+        let LookupReply { inode, entry, .. } = client
+            .lookup(SpecInodes::RootDir.into(), &creds.principal().to_string())
+            .await
+            .unwrap();
+        let OpenReply { handle, .. } = client
+            .open(inode, FlagValue::ReadOnly.into())
+            .await
+            .unwrap();
+        let read = Read {
+            inode,
+            handle,
+            offset: 0,
+            size: entry.attr.size,
+        };
+        let record = client
+            .read(read, |reply| ready(from_slice::<ProcRec>(reply.data)))
+            .await
+            .unwrap()
+            .unwrap();
+        let actual = record.validate().unwrap();
+
+        assert_eq!(expected, actual);
+    }
+
+    #[tokio::test]
+    async fn grant_access_to_non_root_dir() {
+        const DIRNAME: &str = "var";
+        let case = new_case().await;
+        let client = &case.client;
+        let mut creds = ConcreteCreds::generate().unwrap();
+        let root_creds = case.harness.root_creds().unwrap();
+        let writecap = root_creds
+            .issue_writecap(
+                creds.principal(),
+                vec![DIRNAME.to_owned()],
+                Epoch::now() + Duration::from_secs(3600),
+            )
+            .unwrap();
+        creds.set_writecap(writecap);
+        let expected = IssuedProcRec {
+            addr: SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 52982),
+            pub_creds: creds.concrete_pub(),
+            writecap: creds.writecap().unwrap().to_owned(),
+            authz_attrs: AuthzAttrs {
+                uid: 9001,
+                gid: 9001,
+                supp_gids: vec![12, 41, 19],
+            },
+        };
+
+        let CreateReply { inode, handle, .. } = client
+            .create(
+                SpecInodes::RootDir.into(),
+                DIRNAME,
+                FlagValue::Directory | FlagValue::ReadWrite,
+                0o755,
+                0,
+            )
+            .await
+            .unwrap();
+        client.close(inode, handle).await.unwrap();
+        client.grant_access(inode, expected.clone()).await.unwrap();
+        let LookupReply {
+            inode: record_inode,
+            entry,
+            ..
+        } = client
+            .lookup(inode, &creds.principal().to_string())
+            .await
+            .unwrap();
+        let OpenReply {
+            handle: record_handle,
+            ..
+        } = client
+            .open(record_inode, FlagValue::ReadOnly.into())
+            .await
+            .unwrap();
+        let read = Read {
+            inode: record_inode,
+            handle: record_handle,
+            offset: 0,
+            size: entry.attr.size,
+        };
+        let record = client
+            .read(read, |reply| ready(from_slice::<ProcRec>(reply.data)))
+            .await
+            .unwrap()
+            .unwrap();
+        let actual = record.validate().unwrap();
+
+        assert_eq!(expected, actual);
+    }
+
+    #[tokio::test]
+    async fn grant_access_non_root_user() {
+        const ROOT_FILE: &str = "root.txt";
+        const USER_FILE: &str = "user.txt";
+        let user_tpm = SwtpmHarness::new().unwrap();
+        let case = new_case().await;
+        let client = &case.client;
+        let root_creds = case.harness.root_creds().unwrap();
+        let user_creds = {
+            let cred_store = TpmCredStore::from_context(
+                user_tpm.context().unwrap(),
+                user_tpm.state_path().to_owned(),
+            )
+            .unwrap();
+            let mut creds = cred_store.node_creds().unwrap();
+            let writecap = root_creds
+                .issue_writecap(
+                    creds.principal(),
+                    vec![],
+                    Epoch::now() + Duration::from_secs(3600),
+                )
+                .unwrap();
+            cred_store
+                .assign_node_writecap(&mut creds, writecap)
+                .unwrap();
+            creds
+        };
+        let expected = IssuedProcRec {
+            addr: SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 52982),
+            pub_creds: user_creds.concrete_pub(),
+            writecap: user_creds.writecap().unwrap().to_owned(),
+            authz_attrs: AuthzAttrs {
+                uid: 9001,
+                gid: 9001,
+                supp_gids: vec![12, 41, 19],
+            },
+        };
+        let root_inode = SpecInodes::RootDir.into();
+
+        client
+            .grant_access(root_inode, expected.clone())
+            .await
+            .unwrap();
+        // Note that non-root users do not have permission to write to ROOT_FILE, but
+        // UID 9001 has permission to write to USER_FILE.
+        client
+            .create(root_inode, ROOT_FILE, FlagValue::ReadWrite.into(), 0o644, 0)
+            .await
+            .unwrap();
+        let CreateReply { inode, handle, .. } = client
+            .create(root_inode, USER_FILE, FlagValue::ReadWrite.into(), 0o644, 0)
+            .await
+            .unwrap();
+        let mut attrs = Attrs::default();
+        attrs.uid = expected.authz_attrs.uid;
+        attrs.gid = expected.authz_attrs.gid;
+        client
+            .write_meta(inode, Some(handle), attrs, AttrsSet::UID | AttrsSet::GID)
+            .await
+            .unwrap();
+
+        let node_creds = case.harness.cred_store().node_creds().unwrap();
+        let bind_path = node_creds.writecap().unwrap().bind_path();
+        let block_addr = Arc::new(BlockAddr::new(LOCALHOST, Arc::new(bind_path)));
+        let tx = btmsg::transmitter(block_addr, Arc::new(user_creds))
+            .await
+            .unwrap();
+        let client = FsClient::new(tx);
+        let root_dir = SpecInodes::RootDir.value();
+
+        let LookupReply { inode, .. } = client.lookup(root_dir, USER_FILE).await.unwrap();
+        let result = client.open(inode, FlagValue::ReadWrite.into()).await;
+        assert!(result.is_ok());
+        let LookupReply { inode, .. } = client.lookup(root_dir, ROOT_FILE).await.unwrap();
+        let result = client.open(inode, FlagValue::ReadWrite.into()).await;
+        let err = result.err().unwrap().to_string();
+
+        assert_eq!("write access denied", err);
     }
 }

+ 2 - 2
crates/btfuse/src/fuse_fs.rs

@@ -537,7 +537,7 @@ mod private {
                 let msg = Allocate {
                     inode,
                     handle,
-                    offset,
+                    offset: Some(offset),
                     size: length,
                 };
                 self.provider.allocate(path, msg).await?;
@@ -552,7 +552,7 @@ mod tests {
     use super::*;
 
     use btfproto::Inode;
-    use btfproto_tests::local_fs::{ConcreteFs, LocalFsTest};
+    use btfproto_tests::local_fs_tests::{ConcreteFs, LocalFsTest};
     use fuse_backend_rs::api::filesystem::Context;
     use std::ffi::CString;
     use tempdir::TempDir;

+ 0 - 6
crates/btfuse/src/main.rs

@@ -274,12 +274,6 @@ mod test {
         case.wait()
     }
 
-    /// Tests if the file system can be mount then unmounted successfully.
-    #[test]
-    fn mount_then_unmount() {
-        let _ = TestCase::new();
-    }
-
     #[test]
     fn write_read() -> Result<()> {
         const EXPECTED: &[u8] =

+ 8 - 1
crates/btlib-tests/src/tpm_cred_store_harness.rs

@@ -2,7 +2,10 @@
 //! Module containing [TpmCredStoreHarness].
 
 use btlib::{
-    crypto::{tpm::TpmCredStore, CredStore, Creds},
+    crypto::{
+        tpm::{TpmCredStore, TpmCreds},
+        CredStore, Creds,
+    },
     error::AnyhowErrorExt,
     Epoch, Principaled, Result,
 };
@@ -50,4 +53,8 @@ impl TpmCredStoreHarness {
     pub fn cred_store(&self) -> &TpmCredStore {
         &self.cred_store
     }
+
+    pub fn root_creds(&self) -> Result<TpmCreds> {
+        self.cred_store.root_creds(&self.root_passwd)
+    }
 }

+ 114 - 46
crates/btlib/src/crypto/tpm.rs

@@ -887,7 +887,7 @@ impl TpmCredStore {
         Ok(())
     }
 
-    fn persist<F: FnOnce(&mut Storage, StoredCredData)>(
+    fn persist<F: FnOnce(&mut State, StoredCredData)>(
         &self,
         creds: &TpmCreds,
         update_storage: F,
@@ -908,7 +908,7 @@ impl TpmCredStore {
             enc: creds.enc.to_stored(enc_handle),
             writecap: creds.writecap.clone(),
         };
-        update_storage(&mut guard.storage, handles);
+        update_storage(&mut guard, handles);
         match self.save_storage(&mut guard) {
             Ok(_) => Ok(()),
             Err(error) => {
@@ -993,7 +993,10 @@ impl TpmCredStore {
             writecap: None,
         };
         let creds = TpmCreds::new(cred_data, &self.state);
-        self.persist(&creds, |storage, handles| storage.node = Some(handles))?;
+        self.persist(&creds, |state, handles| {
+            state.storage.node = Some(handles);
+            state.node_creds = Some(creds.clone());
+        })?;
         Ok(creds)
     }
 
@@ -1022,46 +1025,6 @@ impl TpmCredStore {
     }
 }
 
-#[derive(Serialize, Deserialize)]
-struct DerivationParams {
-    iter: usize,
-    hash: HashKind,
-    kind: AeadKeyKind,
-    salt: Vec<u8>,
-    iv: Vec<u8>,
-}
-
-impl DerivationParams {
-    const PBKDF2_ITER: usize = 1000000;
-    const PBKDF2_HASH: HashKind = HashKind::Sha2_256;
-    const EXPORT_KEY_KIND: AeadKeyKind = AeadKeyKind::AesGcm256;
-
-    fn new() -> Result<DerivationParams> {
-        const_assert!(
-            DerivationParams::PBKDF2_HASH.len() == DerivationParams::EXPORT_KEY_KIND.key_len()
-        );
-        Ok(DerivationParams {
-            iter: Self::PBKDF2_ITER,
-            hash: Self::PBKDF2_HASH,
-            kind: Self::EXPORT_KEY_KIND,
-            salt: rand_vec(Self::PBKDF2_HASH.len())?,
-            iv: rand_vec(Self::EXPORT_KEY_KIND.iv_len())?,
-        })
-    }
-
-    fn derive_key(&self, password: &str) -> Result<AeadKey> {
-        let mut key = Zeroizing::new([0u8; Self::EXPORT_KEY_KIND.key_len()]);
-        pbkdf2_hmac(
-            password.as_bytes(),
-            self.salt.as_slice(),
-            self.iter,
-            self.hash.into(),
-            key.as_mut_slice(),
-        )?;
-        AeadKey::copy_components(self.kind, key.as_slice(), &self.iv)
-    }
-}
-
 impl CredStore for TpmCredStore {
     type CredHandle = TpmCreds;
     type ExportedCreds = ExportedCreds;
@@ -1116,7 +1079,7 @@ impl CredStore for TpmCredStore {
         }
         let mut creds = TpmCreds::new(cred_data, &self.state);
         creds.init_root_writecap(Epoch::now() + Self::DEFAULT_WRITECAP_EXP)?;
-        self.persist(&creds, |storage, handles| storage.root = Some(handles))?;
+        self.persist(&creds, |state, handles| state.storage.root = Some(handles))?;
         Ok(creds)
     }
 
@@ -1189,7 +1152,7 @@ impl CredStore for TpmCredStore {
             };
             TpmCreds::new(cred_data, &self.state)
         };
-        self.persist(&creds, |storage, handles| storage.root = Some(handles))?;
+        self.persist(&creds, |state, handles| state.storage.root = Some(handles))?;
         Ok(creds)
     }
 
@@ -1210,6 +1173,46 @@ impl CredStore for TpmCredStore {
     }
 }
 
+#[derive(Serialize, Deserialize)]
+struct DerivationParams {
+    iter: usize,
+    hash: HashKind,
+    kind: AeadKeyKind,
+    salt: Vec<u8>,
+    iv: Vec<u8>,
+}
+
+impl DerivationParams {
+    const PBKDF2_ITER: usize = 1000000;
+    const PBKDF2_HASH: HashKind = HashKind::Sha2_256;
+    const EXPORT_KEY_KIND: AeadKeyKind = AeadKeyKind::AesGcm256;
+
+    fn new() -> Result<DerivationParams> {
+        const_assert!(
+            DerivationParams::PBKDF2_HASH.len() == DerivationParams::EXPORT_KEY_KIND.key_len()
+        );
+        Ok(DerivationParams {
+            iter: Self::PBKDF2_ITER,
+            hash: Self::PBKDF2_HASH,
+            kind: Self::EXPORT_KEY_KIND,
+            salt: rand_vec(Self::PBKDF2_HASH.len())?,
+            iv: rand_vec(Self::EXPORT_KEY_KIND.iv_len())?,
+        })
+    }
+
+    fn derive_key(&self, password: &str) -> Result<AeadKey> {
+        let mut key = Zeroizing::new([0u8; Self::EXPORT_KEY_KIND.key_len()]);
+        pbkdf2_hmac(
+            password.as_bytes(),
+            self.salt.as_slice(),
+            self.iter,
+            self.hash.into(),
+            key.as_mut_slice(),
+        )?;
+        AeadKey::copy_components(self.kind, key.as_slice(), &self.iv)
+    }
+}
+
 impl<S: Scheme> AsymKeyPub<S> {
     fn try_from(public: Public, scheme: S) -> Result<AsymKeyPub<S>> {
         match public {
@@ -1558,7 +1561,8 @@ mod test {
     }
 
     /// Displays the message associated with a TSS2 return code.
-    #[test]
+    //#[test]
+    #[allow(dead_code)]
     fn print_error_message() {
         const RC: TSS2_RC = 2461;
         let msg = tss2_rc_decode(RC);
@@ -2044,4 +2048,68 @@ mod test {
             .unwrap();
         Ok(())
     }
+
+    /// Ensures that the [Writecap] assigned using one instance of [TpmCredStore] is available from
+    /// another.
+    #[test]
+    fn writecap_persisted_between_cred_stores() {
+        let (harness, store) = test_store().unwrap();
+        let expected = {
+            let root_creds = store.gen_root_creds("TURTLES").unwrap();
+            let mut node_creds = store.node_creds().unwrap();
+            let expires = Epoch::now() + Duration::from_secs(3600);
+            let writecap = root_creds
+                .issue_writecap(node_creds.principal(), vec![], expires)
+                .unwrap();
+            store
+                .assign_node_writecap(&mut node_creds, writecap.clone())
+                .unwrap();
+            writecap
+        };
+        drop(store);
+
+        let store =
+            TpmCredStore::from_tabrmd(harness.tabrmd_config(), harness.state_path().to_owned())
+                .unwrap();
+        let node_creds = store.node_creds().unwrap();
+        let actual = node_creds.writecap().unwrap();
+
+        assert_eq!(&expected, actual);
+    }
+
+    /// Checks that when a writecap is assigned to the node creds it is present in a subsequently
+    /// returned instance of the node creds.
+    #[test]
+    fn writecap_present_in_subsequent_node_creds() {
+        let (_harness, store) = test_store().unwrap();
+        let expected = {
+            let root_creds = store.gen_root_creds("POWER").unwrap();
+            let mut node_creds = store.node_creds().unwrap();
+            let expires = Epoch::now() + Duration::from_secs(3600);
+            let writecap = root_creds
+                .issue_writecap(node_creds.principal(), vec![], expires)
+                .unwrap();
+            store
+                .assign_node_writecap(&mut node_creds, writecap.clone())
+                .unwrap();
+            writecap
+        };
+
+        let node_creds = store.node_creds().unwrap();
+        let actual = node_creds.writecap().unwrap();
+
+        assert_eq!(&expected, actual);
+    }
+
+    /// Tests that multiple instance of the node creds can be requested from the [TpmCredStore] with
+    /// overlapping lifetimes.
+    #[test]
+    fn multiple_node_creds_can_coexist() {
+        let (_harness, store) = test_store().unwrap();
+
+        let first = store.node_creds().unwrap();
+        let second = store.node_creds().unwrap();
+
+        assert_eq!(first.concrete_pub(), second.concrete_pub());
+    }
 }

+ 19 - 1
crates/btlib/src/lib.rs

@@ -153,7 +153,7 @@ pub trait BlockReader: Read + Seek + AsRef<BlockMeta> + Size + Sectored {
         read_from_block::<Directory, _>(self)
     }
 
-    fn read_proc(&mut self) -> Result<ProcRec> {
+    fn read_proc_rec(&mut self) -> Result<ProcRec> {
         read_from_block::<ProcRec, _>(self)
     }
 }
@@ -1130,6 +1130,10 @@ impl Writecap {
         &writecap.body.signing_key
     }
 
+    pub fn issued_to(&self) -> &Principal {
+        &self.body.issued_to
+    }
+
     /// Returns the principal of the root key which was used to sign this writecap.
     pub fn root_principal(&self) -> Principal {
         self.root_signing_key().principal()
@@ -1859,4 +1863,18 @@ mod tests {
             assert_eq!(EXPECTED, actual);
         }
     }
+
+    /// Tests that the last component of a [Writecap]'s bind path is the string representation of
+    /// the [Principal] to which the [Writecap] was issued.
+    #[test]
+    fn writecap_bind_path_last_component_is_principal() {
+        let creds = node_creds();
+        let writecap = creds.writecap().unwrap();
+        let expected = writecap.issued_to().to_string();
+
+        let bind_path = writecap.bind_path();
+        let actual = bind_path.components().last().unwrap();
+
+        assert_eq!(expected, actual);
+    }
 }

+ 9 - 0
crates/btmsg/src/callback_framed.rs

@@ -111,6 +111,15 @@ pub trait DeserCallback {
     fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de>;
 }
 
+impl<'a, T: DeserCallback> DeserCallback for &'a mut T {
+    type Arg<'de> = T::Arg<'de> where T: 'de, 'a: 'de;
+    type Return = T::Return;
+    type CallFut<'de> = T::CallFut<'de> where T: 'de, 'a: 'de;
+    fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
+        (*self).call(arg)
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

+ 53 - 9
crates/btmsg/src/lib.rs

@@ -150,6 +150,12 @@ impl<T> Envelope<T> {
     }
 }
 
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
+enum ReplyEnvelope<T> {
+    Ok(T),
+    Err(String),
+}
+
 /// A message tagged with the block path that it was sent from.
 pub struct MsgReceived<T> {
     from: Arc<BlockPath>,
@@ -241,7 +247,7 @@ pub trait Transmitter {
     /// `F::Arg` is a Generic Associated Type I have been unable to express this constraint in the
     /// where clause of this method. I'm not sure if the errors I've encountered are due to a lack
     /// of understanding on my part or due to the current limitations of the borrow checker in
-    /// its handling GATs.
+    /// its handling of GATs.
     fn call<'call, T, F>(&'call self, msg: T, callback: F) -> Self::CallFut<'call, T, F>
     where
         T: 'call + CallMsg<'call>,
@@ -340,7 +346,13 @@ impl Replier {
 
     pub async fn reply<T: Serialize + Send>(&mut self, reply: T) -> Result<()> {
         let mut guard = self.stream.lock().await;
-        guard.send(reply).await?;
+        guard.send(ReplyEnvelope::Ok(reply)).await?;
+        Ok(())
+    }
+
+    pub async fn reply_err(&mut self, err: String) -> Result<()> {
+        let mut guard = self.stream.lock().await;
+        guard.send(ReplyEnvelope::<()>::Err(err)).await?;
         Ok(())
     }
 }
@@ -361,17 +373,25 @@ impl<F: MsgCallback> MsgRecvdCallback<F> {
     }
 }
 
-impl<F: MsgCallback> DeserCallback for MsgRecvdCallback<F> {
+impl<F: 'static + MsgCallback> DeserCallback for MsgRecvdCallback<F> {
     type Arg<'de> = Envelope<F::Arg<'de>> where Self: 'de;
     type Return = Result<()>;
-    type CallFut<'de> = F::CallFut<'de> where F: 'de, Self: 'de;
+    type CallFut<'de> = impl 'de + Future<Output = Self::Return> + Send where F: 'de, Self: 'de;
     fn call<'de>(&'de mut self, arg: Envelope<F::Arg<'de>>) -> Self::CallFut<'de> {
         let replier = match arg.kind {
             MsgKind::Call => Some(self.replier.clone()),
             MsgKind::Send => None,
         };
-        self.inner
-            .call(MsgReceived::new(self.path.clone(), arg, replier))
+        async move {
+            let result = self
+                .inner
+                .call(MsgReceived::new(self.path.clone(), arg, replier))
+                .await;
+            if let Err(ref err) = result {
+                self.replier.reply_err(err.to_string()).await?;
+            }
+            result
+        }
     }
 }
 
@@ -489,7 +509,7 @@ impl QuicReceiver {
         }
     }
 
-    async fn handle_message<F: MsgCallback>(
+    async fn handle_message<F: 'static + MsgCallback>(
         client_path: Arc<BlockPath>,
         send_stream: SendStream,
         recv_stream: RecvStream,
@@ -636,13 +656,13 @@ impl QuicTransmitter {
         let buffer = guard.take().unwrap();
         let mut callback_framed = CallbackFramed::from_parts(recv_stream, buffer);
         let result = callback_framed
-            .next(callback)
+            .next(ReplyCallback::new(callback))
             .await
             .ok_or_else(|| bterr!("server hung up before sending reply"));
         let (_, buffer) = callback_framed.into_parts();
         let output = cleanup_on_err!(result, guard, buffer);
         *guard = Some(buffer);
-        output
+        output?
     }
 }
 
@@ -673,3 +693,27 @@ impl Transmitter for QuicTransmitter {
         self.call(msg, callback)
     }
 }
+
+struct ReplyCallback<F> {
+    inner: F,
+}
+
+impl<F> ReplyCallback<F> {
+    fn new(inner: F) -> Self {
+        Self { inner }
+    }
+}
+
+impl<F: 'static + Send + DeserCallback> DeserCallback for ReplyCallback<F> {
+    type Arg<'de> = ReplyEnvelope<F::Arg<'de>>;
+    type Return = Result<F::Return>;
+    type CallFut<'de> = impl 'de + Future<Output = Self::Return> + Send;
+    fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
+        async move {
+            match arg {
+                ReplyEnvelope::Ok(msg) => Ok(self.inner.call(msg).await),
+                ReplyEnvelope::Err(err) => Err(bterr!(err)),
+            }
+        }
+    }
+}