Преглед изворни кода

Managed to get zero-copy reads working in ServerCallback.

Matthew Carr пре 2 година
родитељ
комит
ee87c26b3f

+ 0 - 2
Cargo.lock

@@ -199,7 +199,6 @@ dependencies = [
  "btfproto",
  "btlib",
  "btserde",
- "bytes",
  "lazy_static",
  "libc",
  "tempdir",
@@ -233,7 +232,6 @@ dependencies = [
  "btfproto-tests",
  "btlib",
  "btserde",
- "bytes",
  "ctor",
  "env_logger",
  "fuse-backend-rs",

+ 0 - 1
crates/btfproto-tests/Cargo.toml

@@ -11,6 +11,5 @@ btfproto = { path = "../btfproto" }
 btserde = { path = "../btserde" }
 tempdir = { version = "0.3.7" }
 lazy_static = { version = "1.4.0" }
-bytes = { version = "1.3.0" }
 libc = { version = "0.2.137" }
 tokio = { version = "1.24.2" }

+ 161 - 102
crates/btfproto-tests/src/local_fs_tests.rs

@@ -80,7 +80,9 @@ impl LocalFsTest {
             .writecap()
             .ok_or(BlockError::MissingWritecap)
             .unwrap();
-        let fs = LocalFs::new_empty(path, 0, root_creds, ModeAuthorizer).unwrap();
+        let fs = LocalFs::new_empty(path, 0, root_creds, ModeAuthorizer)
+            .await
+            .unwrap();
 
         let proc_rec = IssuedProcRec {
             addr: IpAddr::V6(Ipv6Addr::LOCALHOST),
@@ -122,9 +124,9 @@ mod tests {
 
     use btfproto::{local_fs::Error, msg::*};
     use btlib::{Inode, Result, SECTOR_SZ_DEFAULT};
-    use bytes::BytesMut;
     use std::{
         io::{self, Cursor, Write as IoWrite},
+        ops::Deref,
         sync::Arc,
     };
 
@@ -148,30 +150,24 @@ mod tests {
 
         const LEN: usize = 32;
         let expected = [1u8; LEN];
-        let WriteReply { written, .. } = bt
-            .write(
-                from,
-                inode,
-                handle,
-                0,
-                expected.len() as u64,
-                expected.as_slice(),
-            )
-            .await
-            .unwrap();
+        let write_msg = Write {
+            inode,
+            handle,
+            offset: 0,
+            data: expected.as_slice(),
+        };
+        let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
         assert_eq!(LEN as u64, written);
 
-        let mut actual = [0u8; LEN];
         let read_msg = Read {
             inode,
             handle,
             offset: 0,
             size: LEN as u64,
         };
-        bt.read(from, read_msg, |data| actual.copy_from_slice(data))
-            .unwrap();
+        let guard = bt.read(from, read_msg).await.unwrap();
 
-        assert_eq!(expected, actual)
+        assert_eq!(expected, guard.deref())
     }
 
     #[tokio::test]
@@ -220,10 +216,13 @@ mod tests {
             };
             let CreateReply { handle, inode, .. } = bt.create(from, create_msg).await.unwrap();
 
-            let WriteReply { written, .. } = bt
-                .write(from, inode, handle, 0, EXPECTED.len() as u64, EXPECTED)
-                .await
-                .unwrap();
+            let write_msg = Write {
+                inode,
+                handle,
+                offset: 0,
+                data: EXPECTED,
+            };
+            let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
             assert_eq!(EXPECTED.len() as u64, written);
 
             let close_msg = Close { inode, handle };
@@ -246,17 +245,15 @@ mod tests {
         };
         let OpenReply { handle, .. } = bt.open(from, open_msg).await.unwrap();
 
-        let mut actual = BytesMut::new();
         let read_msg = Read {
             inode,
             handle,
             offset: 0,
             size: EXPECTED.len() as u64,
         };
-        bt.read(from, read_msg, |data| actual.extend_from_slice(data))
-            .unwrap();
+        let guard = bt.read(from, read_msg).await.unwrap();
 
-        assert_eq!(EXPECTED, &actual)
+        assert_eq!(EXPECTED, guard.deref())
     }
 
     /// Tests that an error is returned by the `Blocktree::write` method if the file was opened
@@ -287,9 +284,13 @@ mod tests {
         let OpenReply { handle, .. } = bt.open(from, open_msg).await.unwrap();
 
         let data = [1u8; 32];
-        let result = bt
-            .write(from, inode, handle, 0, data.len() as u64, data.as_slice())
-            .await;
+        let write_msg = Write {
+            inode,
+            handle,
+            offset: 0,
+            data: data.as_slice(),
+        };
+        let result = bt.write(from, write_msg).await;
 
         let err = result.err().unwrap();
         let err = err.downcast::<Error>().unwrap();
@@ -380,23 +381,24 @@ mod tests {
             umask: 0,
         };
         let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
-        let WriteReply { written, .. } = bt
-            .write(from, inode, handle, 0, DATA.len() as u64, DATA.as_slice())
-            .await
-            .unwrap();
+        let write_msg = Write {
+            inode,
+            handle,
+            offset: 0,
+            data: DATA.as_slice(),
+        };
+        let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
         assert_eq!(DATA.len() as u64, written);
         const SIZE: usize = DATA.len() / 2;
-        let mut actual = Vec::with_capacity(SIZE);
         let read_msg = Read {
             inode,
             handle,
             offset: 0,
             size: SIZE as u64,
         };
-        bt.read(from, read_msg, |data| actual.extend_from_slice(data))
-            .unwrap();
+        let guard = bt.read(from, read_msg).await.unwrap();
 
-        assert_eq!(&[0, 1, 2, 3], actual.as_slice());
+        assert_eq!(&[0, 1, 2, 3], guard.deref());
     }
 
     /// Returns an integer array starting at the given value and increasing by one for each
@@ -434,36 +436,35 @@ mod tests {
             umask: 0,
         };
         let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
-        let WriteReply { written, .. } = bt
-            .write(from, inode, handle, 0, DATA.len() as u64, DATA.as_slice())
-            .await
-            .unwrap();
+        let write_msg = Write {
+            inode,
+            handle,
+            offset: 0,
+            data: DATA.as_slice(),
+        };
+        let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
         assert_eq!(DATA.len() as u64, written);
-        let case = Box::new(case);
-        let cb = Arc::new(Box::new(move |offset: usize| {
-            // Notice that we have concurrent reads to different offsets using the same handle.
-            // Without proper synchronization, this shouldn't work.
-            let mut actual = Vec::with_capacity(SIZE);
-            let read_msg = Read {
-                inode,
-                handle,
-                offset: offset as u64,
-                size: SIZE as u64,
-            };
-            case.fs
-                .read(case.from(), read_msg, |data| actual.extend_from_slice(data))
-                .unwrap();
-            let expected = integer_array::<SIZE>(offset as u8);
-            assert_eq!(&expected, actual.as_slice());
-        }));
+        let case = Arc::new(case);
 
         let mut handles = Vec::with_capacity(NREADS);
         for offset in (0..NREADS).map(|e| e * SIZE) {
-            let thread_cb = cb.clone();
-            handles.push(std::thread::spawn(move || thread_cb(offset)));
+            let case = case.clone();
+            handles.push(tokio::spawn(async move {
+                // Notice that we have concurrent reads to different offsets using the same handle.
+                // Without proper synchronization, this shouldn't work.
+                let read_msg = Read {
+                    inode,
+                    handle,
+                    offset: offset as u64,
+                    size: SIZE as u64,
+                };
+                let guard = case.fs.read(case.from(), read_msg).await.unwrap();
+                let expected = integer_array::<SIZE>(offset as u8);
+                assert_eq!(&expected, guard.deref());
+            }));
         }
         for handle in handles {
-            handle.join().unwrap();
+            handle.await.unwrap();
         }
     }
 
@@ -665,9 +666,13 @@ mod tests {
             umask: 0,
         };
         let CreateReply { inode, handle, .. } = bt.create(owner, create_msg).await.unwrap();
-        let result = bt
-            .write(&other, inode, handle, 0, 3, [1, 2, 3].as_slice())
-            .await;
+        let write_msg = Write {
+            inode,
+            handle,
+            offset: 0,
+            data: [1, 2, 3].as_slice(),
+        };
+        let result = bt.write(&other, write_msg).await;
 
         let err = result.err().unwrap().downcast::<Error>().unwrap();
         let matched = if let Error::WrongOwner = err {
@@ -693,30 +698,29 @@ mod tests {
             umask: 0,
         };
         let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
-        let mut actual = [0u8; 8];
+        const LEN: u64 = 8;
         let alloc_msg = Allocate {
             inode,
             handle,
             offset: None,
-            size: actual.len() as u64,
+            size: LEN,
         };
         bt.allocate(from, alloc_msg).await.unwrap();
-        let read_msg = Read {
-            inode,
-            handle,
-            offset: 0,
-            size: actual.len() as u64,
-        };
-        bt.read(from, read_msg, |data| actual.copy_from_slice(data))
-            .unwrap();
         let read_meta_msg = ReadMeta {
             inode,
             handle: Some(handle),
         };
         let ReadMetaReply { attrs, .. } = bt.read_meta(from, read_meta_msg).await.unwrap();
+        let read_msg = Read {
+            inode,
+            handle,
+            offset: 0,
+            size: LEN,
+        };
+        let guard = bt.read(from, read_msg).await.unwrap();
 
-        assert_eq!([0u8; 8], actual);
-        assert_eq!(actual.len() as u64, attrs.size);
+        assert_eq!([0u8; 8], guard.deref());
+        assert_eq!(guard.len() as u64, attrs.size);
     }
 
     #[tokio::test]
@@ -751,12 +755,10 @@ mod tests {
                 offset: 0,
                 size,
             };
-            let cb = |data: &[u8]| {
-                actual.write(data)?;
-                size -= data.len() as u64;
-                Ok::<_, io::Error>(())
-            };
-            bt.read(from, read_msg, cb).unwrap().unwrap();
+            let guard = bt.read(from, read_msg).await.unwrap();
+            let data = guard.deref();
+            actual.write(data).unwrap();
+            size -= data.len() as u64;
         }
         let read_meta_msg = ReadMeta {
             inode,
@@ -792,22 +794,20 @@ mod tests {
             size,
         };
         bt.allocate(from, alloc_msg).await.unwrap();
-        let mut actual = vec![0u8; LEN];
+        let read_meta_msg = ReadMeta {
+            inode,
+            handle: Some(handle),
+        };
+        let ReadMetaReply { attrs, .. } = bt.read_meta(from, read_meta_msg).await.unwrap();
         let read_msg = Read {
             inode,
             handle,
             offset: 0,
             size,
         };
-        bt.read(from, read_msg, |data| actual.copy_from_slice(data))
-            .unwrap();
-        let read_meta_msg = ReadMeta {
-            inode,
-            handle: Some(handle),
-        };
-        let ReadMetaReply { attrs, .. } = bt.read_meta(from, read_meta_msg).await.unwrap();
+        let guard = bt.read(from, read_msg).await.unwrap();
 
-        assert_eq!(vec![0u8; LEN], actual);
+        assert_eq!(vec![0u8; LEN], guard.deref());
         assert_eq!(LEN as u64, attrs.size);
     }
 
@@ -829,10 +829,13 @@ mod tests {
         };
         let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
         const LEN: usize = 8;
-        let WriteReply { written, .. } = bt
-            .write(from, inode, handle, 0, LEN as u64, [1u8; LEN].as_slice())
-            .await
-            .unwrap();
+        let write_msg = Write {
+            inode,
+            handle,
+            offset: 0,
+            data: [1u8; LEN].as_slice(),
+        };
+        let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
         assert_eq!(LEN as u64, written);
         let alloc_msg = Allocate {
             inode,
@@ -841,22 +844,78 @@ mod tests {
             size: (LEN / 2) as u64,
         };
         bt.allocate(from, alloc_msg).await.unwrap();
-        let mut actual = [0u8; LEN];
+        let read_meta_msg = ReadMeta {
+            inode,
+            handle: Some(handle),
+        };
+        let ReadMetaReply { attrs, .. } = bt.read_meta(from, read_meta_msg).await.unwrap();
         let read_msg = Read {
             inode,
             handle,
             offset: 0,
             size: LEN as u64,
         };
-        bt.read(from, read_msg, |data| actual.copy_from_slice(data))
-            .unwrap();
-        let read_meta_msg = ReadMeta {
+        let actual = bt.read(from, read_msg).await.unwrap();
+
+        assert_eq!([1u8; LEN], actual.deref());
+        assert_eq!(LEN as u64, attrs.size);
+    }
+
+    #[tokio::test]
+    async fn read_at_non_current_position() {
+        const FILENAME: &str = "MANIFESTO.rtf";
+        let case = LocalFsTest::new_empty().await;
+        let bt = &case.fs;
+        let from = case.from();
+
+        let msg = Create {
+            parent: SpecInodes::RootDir.into(),
+            name: FILENAME,
+            flags: FlagValue::ReadWrite.into(),
+            mode: 0o644,
+            umask: 0,
+        };
+        let CreateReply {
             inode,
-            handle: Some(handle),
+            handle,
+            entry,
+            ..
+        } = bt.create(from, msg).await.unwrap();
+        let sect_sz64 = entry.attr.sect_sz;
+        let sect_sz: usize = sect_sz64.try_into().unwrap();
+        let mut data = vec![1u8; sect_sz];
+        let msg = Write {
+            inode,
+            handle,
+            offset: 0,
+            data: data.as_slice(),
         };
-        let ReadMetaReply { attrs, .. } = bt.read_meta(from, read_meta_msg).await.unwrap();
+        let WriteReply { written, .. } = bt.write(from, msg).await.unwrap();
+        assert_eq!(sect_sz64, written);
+        data.truncate(0);
+        data.extend(std::iter::repeat(2).take(sect_sz));
+        let msg = Write {
+            inode,
+            handle,
+            offset: sect_sz64,
+            data: data.as_slice(),
+        };
+        let WriteReply { written, .. } = bt.write(from, msg).await.unwrap();
+        assert_eq!(sect_sz64, written);
+        // The Accessor for  this block should now have the second sector loaded, so it will have to
+        // seek back to the first in order to respond to this read request.
+        let msg = Read {
+            inode,
+            handle,
+            offset: 0,
+            size: sect_sz64,
+        };
+        let guard = bt.read(from, msg).await.unwrap();
 
-        assert_eq!([1u8; LEN], actual);
-        assert_eq!(LEN as u64, attrs.size);
+        assert!(guard
+            .deref()
+            .iter()
+            .map(|e| *e)
+            .eq(std::iter::repeat(1u8).take(sect_sz)));
     }
 }

+ 204 - 222
crates/btfproto/src/local_fs.rs

@@ -5,23 +5,23 @@ use btlib::{
     accessor::Accessor,
     bterr,
     crypto::{Creds, Decrypter, Signer},
-    error::{BtErr, DisplayErr},
+    error::BtErr,
     AuthzAttrs, BlockAccessor, BlockError, BlockMeta, BlockMetaSecrets, BlockOpenOptions,
     BlockPath, BlockReader, DirEntry, Directory, Epoch, FileBlock, FlushMeta, IssuedProcRec,
     MetaAccess, MetaReader, Positioned, Principal, Principaled, ProcRec, Result, Split, TrySeek,
-    WriteDual, ZeroExtendable,
+    ZeroExtendable,
 };
 use btserde::{read_from, write_to};
-use core::future::{ready, Ready};
+use core::future::Ready;
 use log::{debug, error, warn};
-use positioned_io::Size;
+use positioned_io::{ReadAt, Size};
 use serde::{Deserialize, Serialize};
 use std::{
     collections::hash_map::{self, HashMap},
     fmt::{Display, Formatter},
     fs::File,
     future::Future,
-    io::{self, Read as IoRead, Seek, SeekFrom, Write as IoWrite},
+    io::{self, Seek, SeekFrom, Write as IoWrite},
     net::{IpAddr, Ipv6Addr},
     ops::{Deref, DerefMut},
     path::{Path, PathBuf},
@@ -31,17 +31,11 @@ use std::{
     },
     time::Duration,
 };
-use tokio::sync::{
-    Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard,
-};
+use tokio::sync::{Mutex, OwnedMutexGuard, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
 
 pub use private::{Authorizer, AuthzContext, Error, LocalFs, ModeAuthorizer};
 
 mod private {
-    use btlib::DropTrigger;
-
-    use crate::server::{ReadGuard, ReadingGuard, WriteGuard};
-
     use super::*;
 
     type Inode = u64;
@@ -173,21 +167,21 @@ mod private {
     }
 
     type HandleValueParts<'a> = (
-        &'a Mutex<Option<Accessor<&'static [u8]>>>,
+        &'a Arc<Mutex<Option<Accessor<&'static [u8]>>>>,
         &'a Arc<BlockPath>,
         Flags,
     );
 
-    struct HandleGuard<'a, B: Size> {
-        guard: MutexGuard<'a, Option<Accessor<&'static [u8]>>>,
+    struct HandleGuard<B: Size> {
+        guard: OwnedMutexGuard<Option<Accessor<&'static [u8]>>>,
         accessor: Option<Accessor<B>>,
         flags: Flags,
     }
 
-    impl<'a, B: Size> HandleGuard<'a, B> {
+    impl<B: Size> HandleGuard<B> {
         fn new(
             flags: Flags,
-            mut guard: MutexGuard<'a, Option<Accessor<&'static [u8]>>>,
+            mut guard: OwnedMutexGuard<Option<Accessor<&'static [u8]>>>,
             block: B,
         ) -> Self {
             let accessor = guard
@@ -201,7 +195,7 @@ mod private {
         }
     }
 
-    impl<'a, B: Size> Drop for HandleGuard<'a, B> {
+    impl<B: Size> Drop for HandleGuard<B> {
         fn drop(&mut self) {
             *self.guard = self.accessor.take().map(|accessor| {
                 let (accessor, _) = accessor.split();
@@ -210,14 +204,14 @@ mod private {
         }
     }
 
-    impl<'a, B: Size> Deref for HandleGuard<'a, B> {
+    impl<B: Size> Deref for HandleGuard<B> {
         type Target = Accessor<B>;
         fn deref(&self) -> &Self::Target {
             self.accessor.as_ref().unwrap()
         }
     }
 
-    impl<'a, B: Size> DerefMut for HandleGuard<'a, B> {
+    impl<B: Size> DerefMut for HandleGuard<B> {
         fn deref_mut(&mut self) -> &mut Self::Target {
             self.accessor.as_mut().unwrap()
         }
@@ -225,12 +219,12 @@ mod private {
 
     enum HandleValue {
         File {
-            accessor: Mutex<Option<Accessor<&'static [u8]>>>,
+            accessor: Arc<Mutex<Option<Accessor<&'static [u8]>>>>,
             owner: Arc<BlockPath>,
             flags: Flags,
         },
         Directory {
-            accessor: Mutex<Option<Accessor<&'static [u8]>>>,
+            accessor: Arc<Mutex<Option<Accessor<&'static [u8]>>>>,
             owner: Arc<BlockPath>,
             flags: Flags,
             dir: Directory,
@@ -241,7 +235,7 @@ mod private {
         fn new<T: Size>(accessor: Accessor<T>, owner: Arc<BlockPath>, flags: Flags) -> HandleValue {
             let (accessor, ..) = accessor.split();
             HandleValue::File {
-                accessor: Mutex::new(Some(accessor)),
+                accessor: Arc::new(Mutex::new(Some(accessor))),
                 owner,
                 flags,
             }
@@ -273,7 +267,7 @@ mod private {
         fn convert_to_dir<C: Signer + Principaled + Decrypter>(
             self,
             block: &mut FileBlock<C>,
-        ) -> io::Result<HandleValue> {
+        ) -> Result<HandleValue> {
             let (accessor, owner, flags) = match self {
                 Self::File {
                     accessor,
@@ -287,15 +281,18 @@ mod private {
                     ..
                 } => (accessor, owner, flags),
             };
+            let accessor = Arc::try_unwrap(accessor).map_err(|_| {
+                bterr!("LOGIC ERROR: accessor was still in use even though convert_to_dir owns it")
+            })?;
             let accessor = accessor
                 .into_inner()
-                .ok_or_else(|| bterr!("a thread paniced while it held the accessor lock"))?;
+                .ok_or_else(|| bterr!("LOGIC ERROR: accessor was not returned to mutex"))?;
             let mut accessor = Accessor::combine(accessor, block);
             let dir = accessor.read_dir()?;
             let (accessor, ..) = accessor.split();
             Ok(HandleValue::Directory {
                 dir,
-                accessor: Mutex::new(Some(accessor)),
+                accessor: Arc::new(Mutex::new(Some(accessor))),
                 owner,
                 flags,
             })
@@ -311,10 +308,21 @@ mod private {
             }
         }
 
-        async fn guard<'a, B: Size>(&'a self, from: &BlockPath, block: B) -> Result<HandleGuard<'a, B>> {
+        async fn lock(
+            &self,
+            from: &BlockPath,
+        ) -> Result<(Flags, OwnedMutexGuard<Option<Accessor<&'static [u8]>>>)> {
             let (mutex, owner, flags) = self.parts();
             owner.assert_eq(from)?;
-            let guard = mutex.lock().await;
+            Ok((flags, mutex.clone().lock_owned().await))
+        }
+
+        async fn guard<'a, B: Size>(
+            &'a self,
+            from: &BlockPath,
+            block: B,
+        ) -> Result<HandleGuard<B>> {
+            let (flags, guard) = self.lock(from).await?;
             Ok(HandleGuard::new(flags, guard, block))
         }
 
@@ -326,6 +334,47 @@ mod private {
         }
     }
 
+    struct BlockGuard<B> {
+        inner: B,
+    }
+
+    impl<B> BlockGuard<B> {
+        fn new(inner: B) -> Self {
+            Self { inner }
+        }
+    }
+
+    impl<C, B: Deref<Target = InodeTableValue<C>>> Deref for BlockGuard<B> {
+        type Target = FileBlock<C>;
+        fn deref(&self) -> &Self::Target {
+            self.inner.block.get_ref()
+        }
+    }
+
+    impl<C, B: DerefMut<Target = InodeTableValue<C>>> DerefMut for BlockGuard<B> {
+        fn deref_mut(&mut self) -> &mut Self::Target {
+            self.inner.block.get_mut()
+        }
+    }
+
+    impl<C, B: Deref<Target = InodeTableValue<C>>> Size for BlockGuard<B> {
+        fn size(&self) -> io::Result<Option<u64>> {
+            self.inner.block.size()
+        }
+    }
+
+    impl<C, B: Deref<Target = InodeTableValue<C>>> ReadAt for BlockGuard<B> {
+        fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result<usize> {
+            self.inner.block.get_ref().read_at(pos, buf)
+        }
+    }
+
+    impl<C: 'static, B: Deref<Target = InodeTableValue<C>>> AsRef<BlockMeta> for BlockGuard<B> {
+        fn as_ref(&self) -> &BlockMeta {
+            self.inner.block.as_ref()
+        }
+    }
+
     pub struct InodeTableValue<C> {
         block: Accessor<FileBlock<C>>,
         handle_values: HashMap<Handle, HandleValue>,
@@ -380,17 +429,28 @@ mod private {
             &'a self,
             from: &BlockPath,
             handle: Handle,
-        ) -> Result<HandleGuard<'a, &FileBlock<C>>> {
+        ) -> Result<HandleGuard<&FileBlock<C>>> {
             let value = self.value(handle)?;
             let block = self.block();
             value.guard(from, block).await
         }
 
+        async fn handle_guard_owned(
+            guard: OwnedRwLockReadGuard<Self>,
+            from: &BlockPath,
+            handle: Handle,
+        ) -> Result<HandleGuard<BlockGuard<OwnedRwLockReadGuard<Self>>>> {
+            let value = guard.value(handle)?;
+            let (flags, mutex_guard) = value.lock(from).await?;
+            let guard = BlockGuard::new(guard);
+            Ok(HandleGuard::new(flags, mutex_guard, guard))
+        }
+
         async fn handle_guard_mut<'a>(
             &'a mut self,
             from: &BlockPath,
             handle: Handle,
-        ) -> Result<HandleGuard<'a, &mut FileBlock<C>>> {
+        ) -> Result<HandleGuard<&mut FileBlock<C>>> {
             let value = self
                 .handle_values
                 .get(&handle)
@@ -469,32 +529,32 @@ mod private {
         }
     }
 
-    type InodeTable<C> = HashMap<Inode, RwLock<InodeTableValue<C>>>;
+    type InodeTable<C> = HashMap<Inode, Arc<RwLock<InodeTableValue<C>>>>;
 
-    struct TableGuard<'a, C> {
-        table_guard: RwLockReadGuard<'a, InodeTable<C>>,
+    struct TableGuard<C> {
+        table_guard: OwnedRwLockReadGuard<InodeTable<C>>,
     }
 
-    impl<'a, C> TableGuard<'a, C> {
-        async fn new(table: &'a RwLock<InodeTable<C>>) -> TableGuard<'a, C> {
-            let table_guard = table.read().await;
+    impl<C> TableGuard<C> {
+        async fn new(table: Arc<RwLock<InodeTable<C>>>) -> TableGuard<C> {
+            let table_guard = table.read_owned().await;
             TableGuard { table_guard }
         }
 
-        async fn read(&'a self, inode: Inode) -> Result<RwLockReadGuard<'a, InodeTableValue<C>>> {
+        async fn read(&self, inode: Inode) -> Result<OwnedRwLockReadGuard<InodeTableValue<C>>> {
             let value = self
                 .table_guard
                 .get(&inode)
                 .ok_or_else(|| bterr!(Error::NotOpen(inode)))?;
-            Ok(value.read().await)
+            Ok(value.clone().read_owned().await)
         }
 
-        async fn write(&'a self, inode: Inode) -> Result<RwLockWriteGuard<'a, InodeTableValue<C>>> {
+        async fn write(&self, inode: Inode) -> Result<OwnedRwLockWriteGuard<InodeTableValue<C>>> {
             let value = self
                 .table_guard
                 .get(&inode)
                 .ok_or_else(|| bterr!(Error::NotOpen(inode)))?;
-            Ok(value.write().await)
+            Ok(value.clone().write_owned().await)
         }
     }
 
@@ -512,7 +572,7 @@ mod private {
         /// The path to the directory in the local filesystem where this blocktree is located.
         path: PathBuf,
         /// A map from inode numbers to their reference counts.
-        inodes: RwLock<InodeTable<C>>,
+        inodes: Arc<RwLock<InodeTable<C>>>,
         /// The next inode that will be assigned to a new block.
         next_inode: AtomicU64,
         /// The generation number of this filesystem. This is the same for every other server in
@@ -531,7 +591,7 @@ mod private {
 
     impl<C: Creds + 'static, A: Authorizer> LocalFs<C, A> {
         /// Creates a new empty blocktree at the given path.
-        pub fn new_empty(
+        pub async fn new_empty(
             btdir: PathBuf,
             generation: u64,
             creds: C,
@@ -599,12 +659,12 @@ mod private {
                     },
                 };
                 root_block_path.push_component(root_principal.to_string());
-                todo!()
-                //fs.grant_access_to(
-                //    &Arc::new(root_block_path),
-                //    SpecInodes::RootDir.into(),
-                //    proc_rec,
-                //).await?;
+                fs.grant_access_to(
+                    &Arc::new(root_block_path),
+                    SpecInodes::RootDir.into(),
+                    proc_rec,
+                )
+                .await?;
             }
             Ok(fs)
         }
@@ -643,15 +703,18 @@ mod private {
             let empty_path = Arc::new(BlockPath::default());
             inodes.insert(
                 SpecInodes::Sb.into(),
-                RwLock::new(InodeTableValue::new(sb_block, empty_path.clone())),
+                Arc::new(RwLock::new(InodeTableValue::new(
+                    sb_block,
+                    empty_path.clone(),
+                ))),
             );
             inodes.insert(
                 SpecInodes::RootDir.into(),
-                RwLock::new(InodeTableValue::new(root_block, empty_path)),
+                Arc::new(RwLock::new(InodeTableValue::new(root_block, empty_path))),
             );
             Ok(LocalFs {
                 path: btdir,
-                inodes: RwLock::new(inodes),
+                inodes: Arc::new(RwLock::new(inodes)),
                 next_inode: AtomicU64::new(sb.next_inode),
                 generation: sb.generation,
                 creds,
@@ -705,8 +768,8 @@ mod private {
             Ok(block)
         }
 
-        async fn table_guard(&self) -> TableGuard<'_, C> {
-            TableGuard::new(&self.inodes).await
+        async fn table_guard(&self) -> TableGuard<C> {
+            TableGuard::new(self.inodes.clone()).await
         }
 
         async fn open_value(
@@ -716,9 +779,9 @@ mod private {
             block_path: BlockPath,
         ) -> Result<()> {
             let block = Self::open_block(&self.path, inode, self.creds.clone(), block_path)?;
-            let value = RwLock::new(InodeTableValue::new(block, from.clone()));
+            let value = Arc::new(RwLock::new(InodeTableValue::new(block, from.clone())));
             let mut inodes = self.inodes.write().await;
-            if let Some(_) = inodes.insert(inode, value) {
+            if inodes.insert(inode, value).is_some() {
                 error!(
                     "LOGIC ERROR: open_value was called with inode {inode}, which is already open"
                 );
@@ -731,9 +794,9 @@ mod private {
             from: &Arc<BlockPath>,
             inode: Inode,
             block_path: BlockPath,
-        ) -> Result<TableGuard<'a, C>> {
+        ) -> Result<TableGuard<C>> {
             {
-                let inodes = self.inodes.read().await;
+                let inodes = self.inodes.clone().read_owned().await;
                 if inodes.contains_key(&inode) {
                     return Ok(TableGuard {
                         table_guard: inodes,
@@ -741,7 +804,7 @@ mod private {
                 }
             }
             self.open_value(from, inode, block_path).await?;
-            Ok(TableGuard::new(&self.inodes).await)
+            Ok(TableGuard::new(self.inodes.clone()).await)
         }
 
         async fn inode_forget<'a>(
@@ -764,11 +827,9 @@ mod private {
                 value.total_lookup_count()
             };
             if 0 == lookup_count {
-                let delete = inodes
-                    .remove(&inode)
-                    .unwrap()
-                    .into_inner()
-                    .delete;
+                let entry = Arc::try_unwrap(inodes.remove(&inode).unwrap())
+                    .map_err(|_| bterr!("LOGIC ERROR: entry for inode {inode} was still in use while it was being forgotten"))?;
+                let delete = entry.into_inner().delete;
                 if delete {
                     let path = Self::block_path(&self.path, inode);
                     std::fs::remove_file(path)?;
@@ -881,7 +942,8 @@ mod private {
             let self_writecap = self.creds.writecap().ok_or(BlockError::MissingWritecap)?;
             let self_bind_path = Arc::new(self_writecap.bind_path());
             let bind_path = proc_rec.writecap.bind_path();
-            self.open_value(&self_bind_path, next_inode, bind_path).await?;
+            self.open_value(&self_bind_path, next_inode, bind_path)
+                .await?;
             {
                 let table_guard = self.table_guard().await;
                 let mut value_guard = table_guard.write(next_inode).await?;
@@ -910,7 +972,10 @@ mod private {
                 .map(|e| e.inode())
         }
 
-        async fn lookup_inode<'a, I: Iterator<Item = &'a str>>(&self, components: I) -> Result<Inode> {
+        async fn lookup_inode<'a, I: Iterator<Item = &'a str>>(
+            &self,
+            components: I,
+        ) -> Result<Inode> {
             const ROOT: Inode = SpecInodes::RootDir as Inode;
             let mut inode = ROOT;
             for component in components {
@@ -948,7 +1013,9 @@ mod private {
             let relative = from.relative_to(local_root)?;
             let inode = self.lookup_inode(relative.components()).await?;
             let proc_rec = {
-                let table_guard = self.ensure_open(from, inode, from.as_ref().to_owned()).await?;
+                let table_guard = self
+                    .ensure_open(from, inode, from.as_ref().to_owned())
+                    .await?;
                 let mut value_guard = table_guard.write(inode).await?;
                 value_guard.block.read_proc_rec()?
             };
@@ -959,99 +1026,73 @@ mod private {
 
     unsafe impl<C: Sync, A: Sync> Sync for LocalFs<C, A> {}
 
-    pub struct TopGuard<'a, C> {
-        guard: TableGuard<'a, C>,
-        inode: Inode,
-        handle: Handle,
-        from: &'a Arc<BlockPath>,
-        offset: u64,
-        size: u64,
-    }
-
-    impl<'a, C: 'a + Send + Sync + Principaled + Decrypter + Signer> ReadingGuard<'a> for TopGuard<'a, C> {
-        type ReadGuard<'b> = InodeGuard<'b, RwLockReadGuard<'b, InodeTableValue<C>>> where C: 'b, Self: 'b;
-        type ReadFut<'b> = impl 'b + Send + Future<Output = Result<Self::ReadGuard<'b>>> where Self: 'b;
-        fn read(&self) -> Self::ReadFut<'_> {
-            async move {
-                let guard = self.guard.read(self.inode).await?;
-                Ok(InodeGuard {
-                    guard,
-                    handle: self.handle,
-                    from: self.from,
-                    offset: self.offset,
-                    size: self.size,
-                })
-            }
-        }
-
-        type WriteGuard = InodeGuard<'a, RwLockWriteGuard<'a, InodeTableValue<C>>>;
-        type WriteFut = impl 'a + Send + Future<Output = Result<Self::WriteGuard>>;
-        fn write(&'a self) -> Self::WriteFut {
-            async move {
-                let guard = self.guard.write(self.inode).await?;
-                Ok(InodeGuard {
-                    guard,
-                    handle: self.handle,
-                    from: self.from,
-                    offset: self.offset,
-                    size: self.size,
-                })
-            }
-        }
-    }
-
-    pub struct InodeGuard<'a, T> {
-        guard: T,
-        handle: Handle,
-        from: &'a Arc<BlockPath>,
+    /// An owned guard which allows read access to file data.
+    pub struct BufGuard<C> {
         offset: u64,
         size: u64,
+        // Note that handle must come before _table to ensure the guards are dropped in the correct
+        // order.
+        handle: HandleGuard<BlockGuard<OwnedRwLockReadGuard<InodeTableValue<C>>>>,
+        _table: OwnedRwLockReadGuard<InodeTable<C>>,
     }
 
-    impl<'a, C: 'a + Sync + Principaled + Decrypter + Signer, T: 'a + Send + Sync + Deref<Target = InodeTableValue<C>>> ReadGuard<'a> for InodeGuard<'a, T> {
-        type BufGuard<'b> = BufferGuard<'b, C> where Self: 'b;
-        type BufFut<'b> = impl 'b + Send + Future<Output = Result<Option<Self::BufGuard<'b>>>> where Self: 'b;
-        fn get_buf(&self) -> Self::BufFut<'_>
-        {
-            async move {
-                let mut block = self.guard.handle_guard(&self.from, self.handle).await?;
-                block.flags.assert_readable()?;
-                let pos = block.pos() as u64;
-                if self.offset != pos {
-                    if let Err(err) = block.try_seek(SeekFrom::Start(self.offset)) {
+    impl<C: 'static + Principaled + Signer + Decrypter> BufGuard<C> {
+        async fn new(
+            table: Arc<RwLock<InodeTable<C>>>,
+            from: &BlockPath,
+            inode: Inode,
+            handle: Handle,
+            offset: u64,
+            size: u64,
+        ) -> Result<BufGuard<C>> {
+            let table = table.clone().read_owned().await;
+            let entry = table.get(&inode).ok_or(Error::NotOpen(inode))?;
+            let inode_guard = {
+                let inode_guard = entry.clone().read_owned().await;
+                let mut handle_guard = inode_guard.handle_guard(from, handle).await?;
+                handle_guard.flags.assert_readable()?;
+                let pos = handle_guard.pos() as u64;
+                if offset != pos {
+                    if let Err(err) = handle_guard.try_seek(SeekFrom::Start(offset)) {
                         //  An error with `ErrorKind::Unsupported` means that the `SectoredBuf`
                         // has unflushed data and it needs exclusive access to the block to
                         // perform this seek because this data needs to be written.
                         if let io::ErrorKind::Unsupported = err.kind() {
-                            return Ok::<_, btlib::Error>(None);
+                            None
                         } else {
                             return Err(err.into());
                         }
+                    } else {
+                        drop(handle_guard);
+                        Some(inode_guard)
                     }
+                } else {
+                    drop(handle_guard);
+                    Some(inode_guard)
                 }
-                Ok(Some(BufferGuard { handle: block, offset: self.offset, size: self.size }))
-            }
-        }
-    }
-
-    impl<'a, C: Send + Principaled + Decrypter + Signer, T: 'a + Send + DerefMut<Target = InodeTableValue<C>>> WriteGuard<'a> for InodeGuard<'a, T> {
-        type SeekFut = impl 'a + Send + Future<Output = Result<()>>;
-        fn seek(&'a mut self) -> Self::SeekFut {
-            async move {
-                let mut block = self.guard.handle_guard_mut(self.from, self.handle).await?;
-                block.seek(SeekFrom::Start(self.offset))?;
-                Ok(())
-            }
+            };
+            let inode_guard = match inode_guard {
+                Some(inode_guard) => inode_guard,
+                None => {
+                    {
+                        let mut inode_guard = entry.write().await;
+                        let mut handle_guard = inode_guard.handle_guard_mut(from, handle).await?;
+                        handle_guard.seek(SeekFrom::Start(offset))?;
+                    }
+                    entry.clone().read_owned().await
+                }
+            };
+            let handle = InodeTableValue::handle_guard_owned(inode_guard, from, handle).await?;
+            Ok(BufGuard {
+                handle,
+                _table: table,
+                offset,
+                size,
+            })
         }
     }
 
-    pub struct BufferGuard<'a, C> {
-        handle: HandleGuard<'a, &'a FileBlock<C>>,
-        offset: u64,
-        size: u64,
-    }
-
-    impl<'a, C> Deref for BufferGuard<'a, C> {
+    impl<C: 'static + Principaled + Decrypter + Signer> Deref for BufGuard<C> {
         type Target = [u8];
         fn deref(&self) -> &Self::Target {
             self.handle.get_buf(self.offset, self.size).unwrap()
@@ -1147,7 +1188,8 @@ mod private {
                 let (handle, stat) = {
                     let table_guard = self.ensure_open(from, inode, block_path).await?;
                     let mut value_guard = table_guard.write(inode).await?;
-                    let handle = value_guard.new_handle(from.clone(), FlagValue::ReadWrite.into())?;
+                    let handle =
+                        value_guard.new_handle(from.clone(), FlagValue::ReadWrite.into())?;
                     let stat = {
                         let mut block = value_guard.handle_guard_mut(from, handle).await?;
                         let stat = block.mut_meta_body().access_secrets(|secrets| {
@@ -1218,7 +1260,7 @@ mod private {
                                     drop(result);
                                     value.forget_handle(handle);
                                     return Err(bterr!(message));
-                                }, 
+                                }
                             }
                         };
                         let ctx = AuthzContext::new(from, &authz_attrs, block.meta());
@@ -1253,84 +1295,33 @@ mod private {
             }
         }
 
-        type ReadingGuard<'a> = TopGuard<'a, C> where Self: 'a;
-        type ReadFut<'c> = impl 'c + Send + Future<Output = Result<Self::ReadingGuard<'c>>>;
-        fn read2<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Read) -> Self::ReadFut<'c> {
+        type ReadGuard = BufGuard<C>;
+        type ReadFut<'c> = impl 'c + Send + Future<Output = Result<Self::ReadGuard>>;
+        fn read<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Read) -> Self::ReadFut<'c> {
             async move {
-                let Read { inode, handle, offset, size } = msg;
-                let guard = self.table_guard().await;
-                Ok(TopGuard {
-                    guard,
-                    from,
+                let Read {
                     inode,
                     handle,
                     offset,
                     size,
-                })
+                } = msg;
+                BufGuard::new(self.inodes.clone(), from, inode, handle, offset, size).await
             }
         }
 
-        fn read<'c, R, F>(&'c self, from: &'c Arc<BlockPath>, msg: Read, callback: F) -> Result<R>
-        where
-            F: 'c + FnOnce(&[u8]) -> R,
-        {
-            todo!()
-            //let Read {
-            //    inode,
-            //    handle,
-            //    offset,
-            //    size,
-            //} = msg;
-            //debug!("read: inode {inode}, handle {handle}, offset {offset}, size {size}");
-            //let mut callback = Some(callback);
-            //let table_guard = self.table_guard();
-            //let output = (|| {
-            //    let value_guard = table_guard.read(inode)?;
-            //    let mut block = value_guard.handle_guard(from, handle)?;
-            //    block.flags.assert_readable()?;
-            //    let pos = block.pos() as u64;
-            //    if offset != pos {
-            //        if let Err(err) = block.try_seek(SeekFrom::Start(offset)) {
-            //            //  An error with `ErrorKind::Unsupported` means that the `SectoredBuf`
-            //            // has unflushed data and it needs exclusive access to the block to
-            //            // perform this seek because this data needs to be written.
-            //            if let io::ErrorKind::Unsupported = err.kind() {
-            //                return Ok::<_, btlib::Error>(None);
-            //            } else {
-            //                return Err(err.into());
-            //            }
-            //        }
-            //    }
-            //    let buf = block.get_buf(offset, size)?;
-            //    let callback = callback.take().unwrap();
-            //    Ok(Some(callback(buf)))
-            //})()?;
-            //let output = match output {
-            //    Some(output) => output,
-            //    None => {
-            //        // The offset of this read requires us to flush data buffered from a previous
-            //        // write before seeking to a different sector, so we have to access the block
-            //        // mutably.
-            //        let mut value_guard = table_guard.write(inode)?;
-            //        let mut block = value_guard.handle_guard_mut(from, handle)?;
-            //        block.seek(SeekFrom::Start(offset))?;
-            //        let buf = block.get_buf(offset, size)?;
-            //        let callback = callback.take().unwrap();
-            //        callback(buf)
-            //    }
-            //};
-            //Ok(output)
-        }
-
         type WriteFut<'r> = impl 'r + Send + Future<Output = Result<WriteReply>>;
         fn write<'c>(
             &'c self,
             from: &'c Arc<BlockPath>,
             write: Write<&'c [u8]>,
-        ) -> Self::WriteFut<'c>
-        {
+        ) -> Self::WriteFut<'c> {
             async move {
-                let Write { inode, handle, offset, mut data } = write;
+                let Write {
+                    inode,
+                    handle,
+                    offset,
+                    mut data,
+                } = write;
                 debug!("write: inode {inode}, handle {handle}, offset {offset}");
                 let table_guard = self.table_guard().await;
                 let mut value_guard = table_guard.write(inode).await?;
@@ -1341,9 +1332,7 @@ mod private {
                     block.seek(SeekFrom::Start(offset))?;
                 }
                 let written = std::io::copy(&mut data, block.deref_mut())?;
-                Ok(WriteReply {
-                    written: written as u64,
-                })
+                Ok(WriteReply { written })
             }
         }
 
@@ -1355,15 +1344,8 @@ mod private {
                 let table_guard = self.table_guard().await;
                 let mut value_guard = table_guard.write(inode).await?;
                 let mut block = value_guard.handle_guard_mut(from, handle).await?;
-                block.flags.assert_writeable()?;
-                match block.flush().bterr() {
-                    Ok(value) => Ok(value),
-                    Err(err) => match err.downcast_ref::<Error>() {
-                        // If the handle is read-only we just return Ok.
-                        Some(Error::ReadOnlyHandle(..)) => Ok(()),
-                        _ => Err(err),
-                    },
-                }
+                block.flush()?;
+                Ok(())
             }
         }
 
@@ -1637,7 +1619,7 @@ mod private {
                     Ok(mut block) => {
                         block.flush()?;
                         block.flush_meta()?;
-                    },
+                    }
                     Err(err) => match err.downcast_ref::<Error>() {
                         // If the cause of the error is that the handle is read-only, then it is
                         // not actually an error.

+ 22 - 79
crates/btfproto/src/server.rs

@@ -1,34 +1,10 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
-use crate::{
-    msg::{Read as ReadMsg, *},
-    Handle, Inode,
-};
+use crate::msg::{Read as ReadMsg, *};
 
 use btlib::{crypto::Creds, BlockPath, Result};
 use btmsg::{receiver, MsgCallback, MsgReceived, Receiver};
 use core::future::Future;
-use std::{io::Read, net::IpAddr, sync::Arc, ops::Deref};
-
-pub trait ReadingGuard<'a>: Send + Sync {
-    type ReadGuard<'b>: 'b + ReadGuard<'b> where Self: 'b;
-    type ReadFut<'b>: 'b + Send + Future<Output = Result<Self::ReadGuard<'b>>> where Self: 'b;
-    fn read(&self) -> Self::ReadFut<'_>;
-
-    type WriteGuard: WriteGuard<'a>;
-    type WriteFut: Send + Future<Output = Result<Self::WriteGuard>>;
-    fn write(&'a self) -> Self::WriteFut;
-}
-
-pub trait ReadGuard<'a> {
-    type BufGuard<'b>: 'b + Send + Deref<Target = [u8]> where Self: 'b;
-    type BufFut<'b>: 'b + Send + Future<Output = Result<Option<Self::BufGuard<'b>>>> where Self: 'b;
-    fn get_buf(&self) -> Self::BufFut<'_>;
-}
-
-pub trait WriteGuard<'a> {
-    type SeekFut: Send + Future<Output = Result<()>>;
-    fn seek(&'a mut self) -> Self::SeekFut;
-}
+use std::{net::IpAddr, ops::Deref, sync::Arc};
 
 pub trait FsProvider: Send + Sync {
     type LookupFut<'c>: Send + Future<Output = Result<LookupReply>>
@@ -46,22 +22,20 @@ pub trait FsProvider: Send + Sync {
         Self: 'c;
     fn open<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c>;
 
-    type ReadingGuard<'a>: 'a + ReadingGuard<'a> where Self: 'a;
-    type ReadFut<'c>: Send + Future<Output = Result<()>> where Self: 'c;
-    fn read2<'c>(&'c self, from: &'c Arc<BlockPath>, guard: &mut Self::ReadingGuard<'c>, msg: ReadMsg) -> Self::ReadFut<'c>;
-
-    fn read<'c, R, F>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg, callback: F) -> Result<R>
+    type ReadGuard: Send + Sync + Deref<Target = [u8]>;
+    type ReadFut<'c>: Send + Future<Output = Result<Self::ReadGuard>>
     where
-        F: 'c + FnOnce(&[u8]) -> R;
+        Self: 'c;
+    /// Reads from the file specified in the given message.
+    /// ### WARNING
+    /// The returned guard must be dropped before another method is called on this provider.
+    /// Otherwise deadlock _will_ occur.
+    fn read<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg) -> Self::ReadFut<'c>;
 
     type WriteFut<'r>: Send + Future<Output = Result<WriteReply>>
     where
         Self: 'r;
-    fn write<'c>(
-        &'c self,
-        from: &'c Arc<BlockPath>,
-        write: Write<&'c [u8]>
-    ) -> Self::WriteFut<'c>;
+    fn write<'c>(&'c self, from: &'c Arc<BlockPath>, write: Write<&'c [u8]>) -> Self::WriteFut<'c>;
 
     type FlushFut<'c>: Send + Future<Output = Result<()>>
     where
@@ -154,26 +128,14 @@ impl<P: FsProvider> FsProvider for &P {
         (*self).open(from, msg)
     }
 
-    type ReadingGuard<'c> = P::ReadingGuard<'c> where Self: 'c;
+    type ReadGuard = P::ReadGuard;
     type ReadFut<'c> = P::ReadFut<'c> where Self: 'c;
-    fn read2<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg) -> Self::ReadFut<'c> {
-        (*self).read2(from, msg)
-    }
-
-    fn read<'c, R, F>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg, callback: F) -> Result<R>
-    where
-        F: 'c + FnOnce(&[u8]) -> R,
-    {
-        (*self).read(from, msg, callback)
+    fn read<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg) -> Self::ReadFut<'c> {
+        (*self).read(from, msg)
     }
 
     type WriteFut<'r> = P::WriteFut<'r> where Self: 'r;
-    fn write<'c>(
-        &'c self,
-        from: &'c Arc<BlockPath>,
-        write: Write<&'c [u8]>
-    ) -> Self::WriteFut<'c>
-    {
+    fn write<'c>(&'c self, from: &'c Arc<BlockPath>, write: Write<&'c [u8]>) -> Self::WriteFut<'c> {
         (*self).write(from, write)
     }
 
@@ -285,33 +247,14 @@ impl<P: 'static + Send + Sync + FsProvider> MsgCallback for ServerCallback<P> {
                 FsMsg::Create(create) => FsReply::Create(provider.create(&from, create).await?),
                 FsMsg::Open(open) => FsReply::Open(provider.open(&from, open).await?),
                 FsMsg::Read(read) => {
-                    let top_guard = provider.read2(&from, read).await?;
-                    {
-                        //let read_guard = top_guard.read().await?;
-                        //let option = read_guard.get_buf().await?;
-                        //if let Some(buf) = option {
-                        //    todo!()
-                        //}
-                    }
-                    todo!()
-                    //let buf = provider.read(&from, read, move |data| {
-                    //    // TODO: Avoid allocating a buffer on every read. If possible, avoid coping
-                    //    // data altogether.
-                    //    let mut buf = Vec::with_capacity(data.len());
-                    //    buf.extend_from_slice(data);
-                    //    buf
-                    //})?;
-                    //let mut replier = replier.unwrap();
-                    //replier
-                    //    .reply(FsReply::Read(ReadReply { data: &buf }))
-                    //    .await?;
-                    //return Ok(());
+                    let guard = provider.read(&from, read).await?;
+                    let mut replier = replier.unwrap();
+                    replier
+                        .reply(FsReply::Read(ReadReply { data: &guard }))
+                        .await?;
+                    return Ok(());
                 }
-                FsMsg::Write(write) => FsReply::Write(
-                    provider
-                        .write(&from, write)
-                        .await?,
-                ),
+                FsMsg::Write(write) => FsReply::Write(provider.write(&from, write).await?),
                 FsMsg::Flush(flush) => FsReply::Ack(provider.flush(&from, flush).await?),
                 FsMsg::ReadDir(read_dir) => {
                     FsReply::ReadDir(provider.read_dir(&from, read_dir).await?)

+ 11 - 6
crates/btfsd/src/main.rs

@@ -32,19 +32,24 @@ const DEFAULT_CONFIG: ConfigRef<'static> = ConfigRef {
     block_dir: "./bt",
 };
 
-fn provider<C: 'static + Send + Sync + Creds>(block_dir: PathBuf, creds: C) -> impl FsProvider {
+async fn provider<C: 'static + Send + Sync + Creds>(
+    block_dir: PathBuf,
+    creds: C,
+) -> impl FsProvider {
     if block_dir.exists() {
         LocalFs::new_existing(block_dir, creds, ModeAuthorizer).unwrap()
     } else {
         std::fs::create_dir_all(&block_dir).unwrap();
-        LocalFs::new_empty(block_dir, 0, creds, ModeAuthorizer).unwrap()
+        LocalFs::new_empty(block_dir, 0, creds, ModeAuthorizer)
+            .await
+            .unwrap()
     }
 }
 
-fn receiver(config: Config) -> impl Receiver {
+async fn receiver(config: Config) -> impl Receiver {
     let cred_store = TpmCredStore::from_tabrmd(&config.tabrmd, config.tpm_state_path).unwrap();
     let node_creds = cred_store.node_creds().unwrap();
-    let provider = Arc::new(provider(config.block_dir, node_creds.clone()));
+    let provider = Arc::new(provider(config.block_dir, node_creds.clone()).await);
     new_fs_server(config.ip_addr, Arc::new(node_creds), provider).unwrap()
 }
 
@@ -64,7 +69,7 @@ async fn main() {
         .with_tpm_state_path(tpm_state_path)
         .with_block_dir(block_dir)
         .build();
-    let receiver = receiver(config);
+    let receiver = receiver(config).await;
     receiver.complete().unwrap().await.unwrap();
 }
 
@@ -115,7 +120,7 @@ mod tests {
             tpm_state_path: harness.swtpm().state_path().to_owned(),
             block_dir: dir.path().join(BT_DIR),
         };
-        let rx = receiver(config);
+        let rx = receiver(config).await;
         let tx = rx.transmitter(rx.addr().clone()).await.unwrap();
         let client = FsClient::new(tx);
         TestCase {

+ 0 - 1
crates/btfuse/Cargo.toml

@@ -16,7 +16,6 @@ log = "0.4.17"
 env_logger = "0.9.0"
 anyhow = { version = "1.0.66", features = ["std", "backtrace"] }
 libc = { version = "0.2.137" }
-bytes = { version = "1.3.0" }
 serde_json = "1.0.92"
 futures = "0.3.25"
 

+ 7 - 2
crates/btfuse/src/fuse_daemon.rs

@@ -2,7 +2,7 @@
 use crate::{fuse_fs::FuseFs, PathExt, DEFAULT_CONFIG};
 
 use btfproto::server::FsProvider;
-use btlib::{BlockPath, Result};
+use btlib::{bterr, BlockPath, Result};
 use fuse_backend_rs::{
     api::server::Server,
     transport::{self, FuseChannel, FuseSession},
@@ -17,7 +17,7 @@ use std::{
     path::Path,
     sync::Arc,
 };
-use tokio::task::JoinSet;
+use tokio::{sync::oneshot, task::JoinSet};
 
 pub use private::FuseDaemon;
 
@@ -51,10 +51,15 @@ mod private {
             mnt_path: PathBuf,
             num_threads: usize,
             fallback_path: Arc<BlockPath>,
+            mounted_signal: Option<oneshot::Sender<()>>,
             provider: P,
         ) -> Result<Self> {
             let server = Arc::new(Server::new(FuseFs::new(provider, fallback_path)));
             let session = Self::session(mnt_path)?;
+            if let Some(tx) = mounted_signal {
+                tx.send(())
+                    .map_err(|_| bterr!("failed to send mounted signal"))?;
+            }
             let mut set = JoinSet::new();
             for _ in 0..num_threads {
                 let server = server.clone();

+ 25 - 18
crates/btfuse/src/fuse_fs.rs

@@ -5,7 +5,6 @@ use btlib::{
     BlockPath, Epoch, Result,
 };
 use btserde::read_from;
-use bytes::{Buf, BytesMut};
 use core::{ffi::CStr, future::Future, time::Duration};
 use fuse_backend_rs::{
     abi::fuse_abi::{stat64, Attr, CreateIn},
@@ -135,16 +134,15 @@ mod private {
                 flags: FlagValue::ReadOnly.into(),
             };
             let OpenReply { handle, .. } = provider.open(from, msg).await?;
-            let mut buf = BytesMut::new();
             let msg = Read {
                 inode: parent,
                 handle,
                 offset: 0,
                 size: u64::MAX,
             };
-            provider.read(from, msg, |data| buf.extend_from_slice(data))?;
-            let mut reader = buf.reader();
-            read_from(&mut reader).map_err(|err| err.into())
+            let guard = provider.read(from, msg).await?;
+            let mut slice: &[u8] = &guard;
+            read_from(&mut slice).map_err(|err| err.into())
         }
     }
 
@@ -279,14 +277,17 @@ mod private {
             _lock_owner: Option<u64>,
             _flags: u32,
         ) -> IoResult<usize> {
-            let path = self.path_from_luid(ctx.uid);
-            let msg = Read {
-                inode,
-                handle,
-                offset,
-                size: size as u64,
-            };
-            self.provider.read(path, msg, |data| w.write(data))?
+            block_on(async move {
+                let path = self.path_from_luid(ctx.uid);
+                let msg = Read {
+                    inode,
+                    handle,
+                    offset,
+                    size: size as u64,
+                };
+                let guard = self.provider.read(path, msg).await?;
+                w.write(&guard)
+            })
         }
 
         fn write(
@@ -305,11 +306,17 @@ mod private {
             block_on(async move {
                 let path = self.path_from_luid(ctx.uid);
                 let size: usize = size.try_into().display_err()?;
-                let written: u64 = todo!();
-                //let WriteReply { written, .. } = self
-                //    .provider
-                //    .write(path, inode, handle, offset, size as u64, r)
-                //    .await?;
+                // TODO: Eliminate this copying, or at least use a pool of buffers to avoid
+                // allocating on every write. We could pass `r` to the provider if it were Send.
+                let mut buf = Vec::with_capacity(size);
+                r.read_to_end(&mut buf)?;
+                let msg = Write {
+                    inode,
+                    handle,
+                    offset,
+                    data: buf.as_slice(),
+                };
+                let WriteReply { written, .. } = self.provider.write(path, msg).await?;
                 Ok(written.try_into().display_err()?)
             })
         }

+ 42 - 18
crates/btfuse/src/main.rs

@@ -15,13 +15,13 @@ use btlib::{
     },
     Result,
 };
-use core::future::Future;
 use std::{
     fs::{self},
     io,
     path::{Path, PathBuf},
     sync::Arc,
 };
+use tokio::sync::oneshot;
 
 const ENVVARS: EnvVars = EnvVars {
     tabrmd: "BT_TABRMD",
@@ -58,20 +58,20 @@ fn node_creds(state_file: PathBuf, tabrmd_cfg: &str) -> Result<TpmCreds> {
     cred_store.node_creds()
 }
 
-fn provider<C: 'static + Creds + Send + Sync>(
+async fn provider<C: 'static + Creds + Send + Sync>(
     btdir: PathBuf,
     node_creds: C,
 ) -> Result<impl FsProvider> {
     btdir.try_create_dir()?;
     let empty = fs::read_dir(&btdir)?.next().is_none();
     if empty {
-        LocalFs::new_empty(btdir, 0, node_creds, btfproto::local_fs::ModeAuthorizer {})
+        LocalFs::new_empty(btdir, 0, node_creds, btfproto::local_fs::ModeAuthorizer {}).await
     } else {
         LocalFs::new_existing(btdir, node_creds, btfproto::local_fs::ModeAuthorizer {})
     }
 }
 
-fn run_daemon(config: Config) -> impl Send + Sync + Future<Output = ()> {
+async fn run_daemon(config: Config, mounted_signal: Option<oneshot::Sender<()>>) {
     let node_creds =
         node_creds(config.tpm_state_file, &config.tabrmd).expect("failed to get node creds");
     let fallback_path = {
@@ -81,11 +81,19 @@ fn run_daemon(config: Config) -> impl Send + Sync + Future<Output = ()> {
             .unwrap();
         Arc::new(writecap.bind_path())
     };
-    let provider = provider(config.block_dir, node_creds).expect("failed to create FS provider");
-
-    let mut daemon = FuseDaemon::new(config.mnt_dir, config.threads, fallback_path, provider)
-        .expect("failed to create FUSE daemon");
-    async move { daemon.finished().await }
+    let provider = provider(config.block_dir, node_creds)
+        .await
+        .expect("failed to create FS provider");
+
+    let mut daemon = FuseDaemon::new(
+        config.mnt_dir,
+        config.threads,
+        fallback_path,
+        mounted_signal,
+        provider,
+    )
+    .expect("failed to create FUSE daemon");
+    daemon.finished().await
 }
 
 #[tokio::main]
@@ -98,7 +106,7 @@ async fn main() {
         .with_tabrmd(from_envvar(ENVVARS.tabrmd).unwrap())
         .with_mnt_options(from_envvar(ENVVARS.mnt_options).unwrap());
     let config = builder.build();
-    run_daemon(config).await;
+    run_daemon(config, None).await;
 }
 
 #[cfg(test)]
@@ -114,12 +122,12 @@ mod test {
             set_permissions, write, Permissions, ReadDir,
         },
         os::unix::fs::PermissionsExt,
-        sync::mpsc,
         thread::JoinHandle,
         time::Duration,
     };
     use swtpm_harness::SwtpmHarness;
     use tempdir::TempDir;
+    use tokio::sync::oneshot::error::TryRecvError;
 
     /// An optional timeout to wait for the FUSE daemon to start in tests.
     const TIMEOUT: Option<Duration> = Some(Duration::from_millis(1000));
@@ -172,7 +180,7 @@ mod test {
     impl TestCase {
         fn new() -> TestCase {
             let tmp = TempDir::new("btfuse").unwrap();
-            let (mounted_tx, mounted_rx) = mpsc::channel();
+            let (mounted_tx, mut mounted_rx) = oneshot::channel();
             let (swtpm, cred_store) = Self::swtpm();
             let config = Config::builder()
                 .with_block_dir(Some(tmp.path().join("bt")))
@@ -183,8 +191,26 @@ mod test {
             let config_clone = config.clone();
             let handle = std::thread::spawn(move || Self::run(mounted_tx, config_clone));
             match TIMEOUT {
-                Some(duration) => mounted_rx.recv_timeout(duration).unwrap(),
-                None => mounted_rx.recv().unwrap(),
+                Some(duration) => {
+                    let deadline = std::time::Instant::now() + duration;
+                    loop {
+                        if std::time::Instant::now() > deadline {
+                            panic!("timed out waiting for the mounted signal");
+                        }
+                        match mounted_rx.try_recv() {
+                            Ok(_) => break,
+                            Err(err) => match err {
+                                TryRecvError::Empty => {
+                                    std::thread::sleep(Duration::from_millis(10));
+                                }
+                                TryRecvError::Closed => {
+                                    panic!("channel was closed before mounted signal was sent")
+                                }
+                            },
+                        }
+                    }
+                }
+                None => mounted_rx.blocking_recv().unwrap(),
             };
             let node_principal =
                 OsString::from(cred_store.node_creds().unwrap().principal().to_string());
@@ -198,17 +224,15 @@ mod test {
             }
         }
 
-        fn run(mounted_tx: mpsc::Sender<()>, config: Config) {
+        fn run(mounted_tx: oneshot::Sender<()>, config: Config) {
             let runtime = tokio::runtime::Builder::new_current_thread()
                 .build()
                 .unwrap();
             // run_daemon can only be called in the context of a runtime, hence the need to call
             // spawn_blocking.
-            let started = runtime.spawn_blocking(|| run_daemon(config));
+            let started = runtime.spawn_blocking(move || run_daemon(config, Some(mounted_tx)));
             let future = runtime.block_on(started).unwrap();
 
-            // The file system is mounted before run_daemon returns.
-            mounted_tx.send(()).unwrap();
             runtime.block_on(future);
         }
 

+ 4 - 7
crates/btlib/src/drop_trigger.rs

@@ -4,15 +4,13 @@ pub struct DropTrigger<F: FnOnce()> {
 
 impl<F: FnOnce()> DropTrigger<F> {
     pub fn new(trigger: F) -> Self {
-        Self { trigger: Some(trigger) }
+        Self {
+            trigger: Some(trigger),
+        }
     }
 
     pub fn disarm(&mut self) -> bool {
-        if self.trigger.take().is_some() {
-            true
-        } else {
-            false
-        }
+        self.trigger.take().is_some()
     }
 }
 
@@ -24,7 +22,6 @@ impl<F: FnOnce()> Drop for DropTrigger<F> {
     }
 }
 
-
 #[cfg(test)]
 mod tests {
     use super::*;

+ 1 - 1
crates/btlib/src/lib.rs

@@ -6,13 +6,13 @@ pub mod collections;
 pub mod config_helpers;
 /// Code which enables cryptographic operations.
 pub mod crypto;
+pub mod drop_trigger;
 pub mod error;
 pub mod log;
 pub mod sectored_buf;
 #[cfg(test)]
 mod test_helpers;
 mod trailered;
-pub mod drop_trigger;
 
 #[macro_use]
 extern crate static_assertions;