Pārlūkot izejas kodu

Converted the Blocktree struct to the LocalFs struct
and started implementing FsProvider for it.

Matthew Carr 2 gadi atpakaļ
vecāks
revīzija
942ce5c428

+ 7 - 0
Cargo.lock

@@ -206,9 +206,16 @@ dependencies = [
  "anyhow",
  "btlib",
  "btmsg",
+ "btserde",
+ "bytes",
+ "fuse-backend-rs",
+ "lazy_static",
+ "libc",
  "log",
  "paste",
+ "positioned-io",
  "serde",
+ "tempdir",
  "tokio",
 ]
 

+ 11 - 1
crates/btfproto/Cargo.toml

@@ -8,7 +8,8 @@ edition = "2021"
 [features]
 server = []
 client = []
-default = ["client", "server"]
+local-fs = ["dep:libc", "dep:positioned-io", "dep:fuse-backend-rs", "dep:btserde", "dep:bytes"]
+default = ["client", "server", "local-fs"]
 
 [dependencies]
 btlib = { path = "../btlib" }
@@ -18,3 +19,12 @@ paste = "1.0.11"
 log = "0.4.17"
 tokio = { version = "1.24.2", features = ["rt"] }
 anyhow = { version = "1.0.66", features = ["std", "backtrace"] }
+libc = { version = "0.2.137", optional = true }
+positioned-io = { version = "0.3.1", optional = true }
+fuse-backend-rs = { version = "0.9.6", optional = true }
+btserde = { path = "../btserde", optional = true }
+bytes = { version = "1.3.0", optional = true }
+
+[dev-dependencies]
+tempdir = { version = "0.3.7" }
+lazy_static = { version = "1.4.0" }

+ 10 - 5
crates/btfproto/src/client.rs

@@ -147,7 +147,7 @@ impl<T: Transmitter> FsClient<T> {
         &self,
         parent: Inode,
         name: &str,
-        flags: u32,
+        flags: i32,
         mode: u32,
         umask: u32,
     ) -> Result<CreateReply> {
@@ -161,7 +161,7 @@ impl<T: Transmitter> FsClient<T> {
         self.tx.call(msg, extractor!(Create)).await?
     }
 
-    pub async fn open(&self, inode: Inode, flags: u32) -> Result<OpenReply> {
+    pub async fn open(&self, inode: Inode, flags: i32) -> Result<OpenReply> {
         let msg = FsMsg::Open(Open { inode, flags });
         self.tx.call(msg, extractor!(Open)).await?
     }
@@ -210,12 +210,17 @@ impl<T: Transmitter> FsClient<T> {
         self.tx.call(msg, AckCallback).await?
     }
 
-    pub async fn read_meta(&self, inode: Inode) -> Result<BlockMeta> {
-        let msg = FsMsg::ReadMeta(ReadMeta { inode });
+    pub async fn read_meta(&self, inode: Inode, handle: Option<Handle>) -> Result<BlockMeta> {
+        let msg = FsMsg::ReadMeta(ReadMeta { inode, handle });
         self.tx.call(msg, ExtractReadMeta).await?
     }
 
-    pub async fn write_meta(&self, inode: Inode, handle: Handle, meta: BlockMeta) -> Result<()> {
+    pub async fn write_meta(
+        &self,
+        inode: Inode,
+        handle: Option<Handle>,
+        meta: BlockMeta,
+    ) -> Result<()> {
         let msg = FsMsg::WriteMeta(WriteMeta {
             inode,
             handle,

+ 3 - 0
crates/btfproto/src/lib.rs

@@ -1,7 +1,10 @@
 #![feature(type_alias_impl_trait)]
 
 mod msg;
+
 #[cfg(feature = "client")]
 pub mod client;
+#[cfg(feature = "local-fs")]
+pub mod local_fs;
 #[cfg(feature = "server")]
 pub mod server;

+ 2186 - 0
crates/btfproto/src/local_fs.rs

@@ -0,0 +1,2186 @@
+use crate::{msg::*, server::FsProvider};
+
+use btserde::{read_from, write_to};
+use core::{
+    future::{ready, Ready},
+    pin::Pin,
+};
+use log::{debug, error, warn};
+use positioned_io::Size;
+use serde::{Deserialize, Serialize};
+use std::{
+    collections::hash_map::{self, HashMap},
+    fmt::{Display, Formatter},
+    fs::File,
+    io::{self, Seek, SeekFrom, Write as IoWrite},
+    ops::{Deref, DerefMut},
+    path::{Path, PathBuf},
+    sync::{
+        atomic::{AtomicU64, Ordering},
+        Mutex, RwLock, RwLockWriteGuard,
+    },
+    sync::{MutexGuard, RwLockReadGuard},
+    time::Duration,
+};
+
+use btlib::{
+    accessor::Accessor,
+    bterr,
+    crypto::{Creds, Decrypter, Signer},
+    error::{BtErr, DisplayErr, IoErr},
+    BlockAccessor, BlockError, BlockMeta, BlockOpenOptions, BlockPath, BlockReader, BlockRecord,
+    DirEntry, DirEntryKind, Directory, Epoch, FileBlock, FlushMeta, MetaAccess, MetaReader,
+    Positioned, Principaled, Result, Split, TrySeek,
+};
+
+pub use private::{LocalFsProvider, ModeAuthorizer, SpecInodes};
+
+mod private {
+    use btlib::BlockMetaSecrets;
+
+    use super::*;
+
+    type Inode = u64;
+    type Handle = u64;
+
+    #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
+    pub enum Error {
+        NotOpen(Inode),
+        InvalidHandle { inode: Inode, handle: Handle },
+        NoHandlesAvailable(Inode),
+        InodeNotFound(Inode),
+        ReadOnlyHandle(Handle),
+    }
+
+    impl Display for Error {
+        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+            match self {
+                Error::NotOpen(inode) => write!(f, "inode {inode} is not open"),
+                Error::InvalidHandle { inode, handle } => {
+                    write!(f, "invalid handle {handle} for inode {inode}")
+                }
+                Error::NoHandlesAvailable(inode) => {
+                    write!(f, "no handles are available for inode {inode}")
+                }
+                Error::InodeNotFound(inode) => write!(f, "inode {inode} could not be found"),
+                Error::ReadOnlyHandle(handle) => {
+                    write!(f, "cannot mutably access read-only handle {handle}")
+                }
+            }
+        }
+    }
+
+    impl std::error::Error for Error {}
+
+    /// Returns true if and only if the given open flags allow the file to be mutated.
+    const fn mutable(flags: i32) -> bool {
+        const WRITE_MASK: i32 = libc::O_RDWR | libc::O_WRONLY;
+        flags & WRITE_MASK != 0
+    }
+
+    #[repr(u64)]
+    pub enum SpecInodes {
+        RootDir = 1,
+        Sb = 2,
+        FirstFree = 11,
+    }
+
+    impl From<SpecInodes> for Inode {
+        fn from(special: SpecInodes) -> Self {
+            special as Inode
+        }
+    }
+
+    #[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]
+    #[repr(u32)] // This type needs to match `libc::mode_t`.
+    /// The type of a file (regular, directory, etc).
+    enum FileType {
+        /// Directory.
+        Dir = libc::S_IFDIR,
+        /// Regular file.
+        Reg = libc::S_IFREG,
+    }
+
+    impl FileType {
+        /// Returns the underlying mode bits for this file type.
+        fn value(self) -> libc::mode_t {
+            self as libc::mode_t
+        }
+
+        /// Attempts to convert the given mode bits into a `FileType` enum value.
+        fn from_value(value: libc::mode_t) -> Result<Self> {
+            if (value & libc::S_IFDIR) != 0 {
+                return Ok(FileType::Dir);
+            }
+            if (value & libc::S_IFREG) != 0 {
+                return Ok(FileType::Reg);
+            }
+            Err(bterr!("unknown file type: 0o{value:0o}"))
+        }
+
+        fn dir_entry_kind(self) -> DirEntryKind {
+            match self {
+                Self::Dir => DirEntryKind::Directory,
+                Self::Reg => DirEntryKind::File,
+            }
+        }
+    }
+
+    impl From<FileType> for libc::mode_t {
+        fn from(file_type: FileType) -> Self {
+            file_type.value()
+        }
+    }
+
+    impl TryFrom<libc::mode_t> for FileType {
+        type Error = btlib::Error;
+        fn try_from(value: libc::mode_t) -> Result<Self> {
+            Self::from_value(value)
+        }
+    }
+
+    impl From<FileType> for DirEntryKind {
+        fn from(value: FileType) -> Self {
+            value.dir_entry_kind()
+        }
+    }
+
+    /// This type provides context for an authorization decision as to whether a given process will
+    /// be allowed to access a block.
+    pub struct AuthzContext<'a> {
+        /// The user ID of the process being authorized.
+        pub uid: u32,
+        /// The group ID of the process being authorized.
+        pub gid: u32,
+        /// The process ID of the process being authorized.
+        pub pid: libc::pid_t,
+        /// A reference to the metadata of a block, the access to which is being authorized.
+        pub meta: &'a BlockMeta,
+    }
+
+    /// A trait for types which can render authorization decisions.
+    pub trait Authorizer {
+        /// Returns [Ok] if read authorization is granted, and [Err] otherwise.
+        fn can_read(&self, ctx: &AuthzContext<'_>) -> io::Result<()>;
+        /// Returns [Ok] if write authorization is granted, and [Err] otherwise.
+        fn can_write(&self, ctx: &AuthzContext<'_>) -> io::Result<()>;
+        /// Returns [Ok] if execute authorization is granted, and [Err] otherwise.
+        fn can_exec(&self, ctx: &AuthzContext<'_>) -> io::Result<()>;
+    }
+
+    /// A particularly simple authorizer that just looks at the mode bits in the block metadata
+    /// to make authorization decisions.
+    pub struct ModeAuthorizer {}
+
+    impl ModeAuthorizer {
+        fn authorize(mode: u32, mask: u32, denied_msg: &str) -> io::Result<()> {
+            if (mode & mask) != 0 {
+                Ok(())
+            } else {
+                Err(io::Error::new(io::ErrorKind::PermissionDenied, denied_msg))
+            }
+        }
+
+        fn user_is_root(ctx: &AuthzContext<'_>) -> bool {
+            ctx.uid == 0
+        }
+    }
+
+    impl Authorizer for ModeAuthorizer {
+        fn can_read(&self, ctx: &AuthzContext<'_>) -> io::Result<()> {
+            if Self::user_is_root(ctx) {
+                return Ok(());
+            }
+            let secrets = ctx.meta.body().secrets()?;
+            let mask = (libc::S_IRUSR * (secrets.uid == ctx.uid) as u32)
+                | (libc::S_IRGRP * (secrets.gid == ctx.gid) as u32)
+                | libc::S_IROTH;
+            Self::authorize(secrets.mode, mask, "read access denied")
+        }
+
+        fn can_write(&self, ctx: &AuthzContext<'_>) -> io::Result<()> {
+            if Self::user_is_root(ctx) {
+                return Ok(());
+            }
+            let secrets = ctx.meta.body().secrets()?;
+            let mask = (libc::S_IWUSR * (secrets.uid == ctx.uid) as u32)
+                | (libc::S_IWGRP * (secrets.gid == ctx.gid) as u32)
+                | libc::S_IWOTH;
+            Self::authorize(secrets.mode, mask, "write access denied")
+        }
+
+        fn can_exec(&self, ctx: &AuthzContext<'_>) -> io::Result<()> {
+            if Self::user_is_root(ctx) {
+                return Ok(());
+            }
+            let secrets = ctx.meta.body().secrets()?;
+            let mask = (libc::S_IXUSR * (secrets.uid == ctx.uid) as u32)
+                | (libc::S_IXGRP * (secrets.gid == ctx.gid) as u32)
+                | libc::S_IXOTH;
+            Self::authorize(secrets.mode, mask, "exec access denied")
+        }
+    }
+
+    enum HandleValue {
+        File {
+            accessor: Mutex<Option<Accessor<&'static [u8]>>>,
+            mutable: bool,
+        },
+        Directory {
+            accessor: Mutex<Option<Accessor<&'static [u8]>>>,
+            mutable: bool,
+            dir: Directory,
+        },
+    }
+
+    impl HandleValue {
+        fn new<T: Size>(accessor: Accessor<T>, mutable: bool) -> HandleValue {
+            let (accessor, ..) = accessor.split();
+            HandleValue::File {
+                accessor: Mutex::new(Some(accessor)),
+                mutable,
+            }
+        }
+
+        fn get_mutex(&self) -> &Mutex<Option<Accessor<&'static [u8]>>> {
+            match self {
+                Self::File { accessor, .. } => accessor,
+                Self::Directory { accessor, .. } => accessor,
+            }
+        }
+
+        fn mutable(&self) -> bool {
+            match self {
+                Self::File { mutable, .. } => *mutable,
+                Self::Directory { mutable, .. } => *mutable,
+            }
+        }
+
+        fn set_mutable(&mut self, mutable: bool) {
+            match self {
+                Self::File {
+                    mutable: mutable_ref,
+                    ..
+                } => *mutable_ref = mutable,
+                Self::Directory {
+                    mutable: mutable_ref,
+                    ..
+                } => *mutable_ref = mutable,
+            }
+        }
+
+        fn take_accessor(&self) -> Result<Accessor<&'static [u8]>> {
+            let mut guard = self.get_mutex().lock().display_err()?;
+            guard
+                .take()
+                .ok_or_else(|| bterr!("reader has already been taken"))
+        }
+
+        fn use_accessor<T, F: FnOnce(Accessor<&'static [u8]>) -> (Accessor<&'static [u8]>, T)>(
+            &self,
+            cb: F,
+        ) -> Result<T> {
+            let mut guard = self.get_mutex().lock().display_err()?;
+            let accessor = guard
+                .take()
+                .ok_or_else(|| bterr!("accessor has already been taken"))?;
+            let (accessor, output) = cb(accessor);
+            *guard = Some(accessor);
+            Ok(output)
+        }
+
+        fn convert_to_dir<C: Signer + Principaled + Decrypter>(
+            self,
+            block: &mut FileBlock<C>,
+        ) -> io::Result<HandleValue> {
+            let writeable = self.mutable();
+            let accessor = self.take_accessor()?;
+            let mut accessor = Accessor::combine(accessor, block);
+            let dir = accessor.read_dir()?;
+            let (accessor, ..) = accessor.split();
+            Ok(HandleValue::Directory {
+                dir,
+                accessor: Mutex::new(Some(accessor)),
+                mutable: writeable,
+            })
+        }
+
+        fn directory(&self) -> io::Result<&Directory> {
+            match self {
+                Self::Directory { dir, .. } => Ok(dir),
+                _ => Err(io::Error::new(
+                    io::ErrorKind::Other,
+                    "handle is not for a directory",
+                )),
+            }
+        }
+
+        fn access_block<B: Size, T, F: FnOnce(&mut Accessor<B>) -> Result<T>>(
+            &self,
+            block: B,
+            cb: F,
+        ) -> Result<T> {
+            self.use_accessor(|accessor| {
+                let mut accessor = Accessor::combine(accessor, block);
+                let result = cb(&mut accessor);
+                let (accessor, ..) = accessor.split();
+                (accessor, result)
+            })?
+        }
+
+        #[allow(dead_code)]
+        fn handle_guard<'a, C: Signer + Principaled + Decrypter>(
+            &'a self,
+            block: &'a FileBlock<C>,
+        ) -> Result<HandleGuard<'a, C>> {
+            let mut guard = self.get_mutex().lock().display_err()?;
+            let accessor = guard.take().unwrap();
+            let accessor = Some(Accessor::combine(accessor, block));
+            Ok(HandleGuard { guard, accessor })
+        }
+    }
+
+    pub struct HandleGuard<'a, C> {
+        guard: MutexGuard<'a, Option<Accessor<&'static [u8]>>>,
+        accessor: Option<Accessor<&'a FileBlock<C>>>,
+    }
+
+    impl<'a, C> Deref for HandleGuard<'a, C> {
+        type Target = Accessor<&'a FileBlock<C>>;
+        fn deref(&self) -> &Self::Target {
+            self.accessor.as_ref().unwrap()
+        }
+    }
+
+    impl<'a, C> DerefMut for HandleGuard<'a, C> {
+        fn deref_mut(&mut self) -> &mut Self::Target {
+            self.accessor.as_mut().unwrap()
+        }
+    }
+
+    impl<'a, C> Drop for HandleGuard<'a, C> {
+        fn drop(&mut self) {
+            let (accessor, ..) = self.accessor.take().unwrap().split();
+            *self.guard = Some(accessor);
+        }
+    }
+
+    struct InodeTableValue<C> {
+        block: Accessor<FileBlock<C>>,
+        handle_values: HashMap<Handle, HandleValue>,
+        next_handle: Handle,
+        lookup_count: u64,
+        delete: bool,
+    }
+
+    impl<C: Signer + Principaled + Decrypter> InodeTableValue<C> {
+        fn new(block: Accessor<FileBlock<C>>) -> InodeTableValue<C> {
+            Self {
+                block,
+                handle_values: HashMap::new(),
+                next_handle: 1,
+                lookup_count: 1,
+                delete: false,
+            }
+        }
+
+        fn invalid_handle_err(handle: Handle) -> io::Error {
+            io::Error::new(io::ErrorKind::Other, format!("invalid handle {handle}"))
+        }
+
+        fn value(&self, handle: Handle) -> io::Result<&HandleValue> {
+            self.handle_values
+                .get(&handle)
+                .ok_or_else(|| Self::invalid_handle_err(handle))
+        }
+
+        fn block(&self) -> &FileBlock<C> {
+            self.block.get_ref()
+        }
+
+        fn block_mut(&mut self) -> &mut FileBlock<C> {
+            self.block.get_mut()
+        }
+
+        fn convert_to_dir(&mut self, handle: Handle) -> io::Result<()> {
+            let value = self
+                .handle_values
+                .remove(&handle)
+                .ok_or_else(|| Self::invalid_handle_err(handle))?;
+            let block = self.block_mut();
+            let value = value.convert_to_dir(block)?;
+            self.handle_values.insert(handle, value);
+            Ok(())
+        }
+
+        fn access_block<T, F: FnOnce(&mut Accessor<&FileBlock<C>>) -> Result<T>>(
+            &self,
+            handle: Handle,
+            cb: F,
+        ) -> Result<T> {
+            let value = self.value(handle)?;
+            let block = self.block();
+            value.access_block(block, cb)
+        }
+
+        fn access_block_mut<T, F: FnOnce(&mut Accessor<&mut FileBlock<C>>) -> Result<T>>(
+            &mut self,
+            handle: Handle,
+            cb: F,
+        ) -> Result<T> {
+            let value = self
+                .handle_values
+                .get(&handle)
+                .ok_or_else(|| Self::invalid_handle_err(handle))?;
+            if !value.mutable() {
+                return Err(Error::ReadOnlyHandle(handle).into());
+            }
+            let inner = self.block.get_mut();
+            value.access_block(inner, cb)
+        }
+
+        fn borrow_block<T, F: FnOnce(&Accessor<FileBlock<C>>) -> Result<T>>(
+            &self,
+            cb: F,
+        ) -> Result<T> {
+            cb(&self.block)
+        }
+
+        fn borrow_block_mut<T, F: FnOnce(&mut Accessor<FileBlock<C>>) -> Result<T>>(
+            &mut self,
+            cb: F,
+        ) -> Result<T> {
+            cb(&mut self.block)
+        }
+
+        fn new_handle(&mut self, mutable: bool) -> Result<Handle> {
+            if self.handle_values.len() as u64 >= u64::MAX {
+                return Err(bterr!("no handles are available"));
+            }
+            let mut handle_value = HandleValue::new(Accessor::new(self.block())?, mutable);
+            loop {
+                let handle = self.next_handle;
+                self.next_handle = self.next_handle.wrapping_add(1);
+                match self.handle_values.insert(handle, handle_value) {
+                    Some(prev) => {
+                        // We've wrapped around and this handle is already taken. Put the previous
+                        // value back and try again.
+                        handle_value = self.handle_values.insert(handle, prev).unwrap();
+                    }
+                    // We generated an unused handle. Return it.
+                    None => return Ok(handle),
+                }
+            }
+        }
+
+        fn set_mutable(&mut self, handle: Handle, mutable: bool) -> Result<()> {
+            let handle_value = self
+                .handle_values
+                .get_mut(&handle)
+                .ok_or_else(|| Self::invalid_handle_err(handle))?;
+            handle_value.set_mutable(mutable);
+            Ok(())
+        }
+
+        fn forget_handle(&mut self, handle: Handle) {
+            self.handle_values.remove(&handle);
+        }
+
+        /// Increments `lookup_count` by 1 and returns its current value.
+        fn incr_lookup_count(&mut self) -> u64 {
+            self.lookup_count += 1;
+            self.lookup_count
+        }
+
+        /// Decrements `lookup_count` by `count` and returns its current value.
+        fn decr_lookup_count(&mut self, count: u64) -> u64 {
+            self.lookup_count -= count;
+            self.lookup_count
+        }
+
+        #[allow(dead_code)]
+        fn handle_guard<'a>(&'a self, handle: Handle) -> Result<HandleGuard<'a, C>> {
+            let value = self.value(handle)?;
+            let block = self.block();
+            value.handle_guard(block)
+        }
+
+        //fn set_mutable(&self, )
+    }
+
+    pub struct ValueGuard<'a, C> {
+        // The order of these fields is significant!
+        handle_guard: HandleGuard<'a, C>,
+        _value_guard: Pin<Box<RwLockReadGuard<'a, InodeTableValue<C>>>>,
+        _value_lock: &'a RwLock<InodeTableValue<C>>,
+        _table_guard: Pin<Box<RwLockReadGuard<'a, InodeTable<C>>>>,
+    }
+
+    impl<'a, C: Signer + Principaled + Decrypter> ValueGuard<'a, C> {
+        #[allow(dead_code)]
+        fn new(
+            table_guard: RwLockReadGuard<'a, InodeTable<C>>,
+            inode: Inode,
+            handle: Handle,
+        ) -> Result<Self> {
+            // TODO: Is Pin<Box> really necessary for soundness?
+            let table_guard = Box::pin(table_guard);
+            // Safety: table_guard is alive for 'a and stored in a stable location on the heap,
+            // thus it can be referenced for 'a.
+            let table_guard_ref =
+                unsafe { &*(table_guard.deref() as *const RwLockReadGuard<'a, InodeTable<C>>) };
+            let value_lock = table_guard_ref
+                .get(&inode)
+                .ok_or_else(|| bterr!(Error::NotOpen(inode)))?;
+            let value_guard = Box::pin(value_lock.read().display_err()?);
+            // Safety: value_guard is also alive for 'a and stored in a stable location on the heap.
+            let value_guard_ref = unsafe {
+                &*(value_guard.deref() as *const RwLockReadGuard<'a, InodeTableValue<C>>)
+            };
+            let handle_guard = value_guard_ref.handle_guard(handle)?;
+            Ok(Self {
+                handle_guard,
+                _value_guard: value_guard,
+                _table_guard: table_guard,
+                _value_lock: value_lock,
+            })
+        }
+    }
+
+    impl<'a, C> Deref for ValueGuard<'a, C> {
+        type Target = <HandleGuard<'a, C> as Deref>::Target;
+        fn deref(&self) -> &Self::Target {
+            self.handle_guard.deref()
+        }
+    }
+
+    impl<'a, C> DerefMut for ValueGuard<'a, C> {
+        fn deref_mut(&mut self) -> &mut Self::Target {
+            self.handle_guard.deref_mut()
+        }
+    }
+
+    type InodeTable<C> = HashMap<Inode, RwLock<InodeTableValue<C>>>;
+    type InodeTableEntry<'a, C> = hash_map::Entry<'a, Inode, RwLock<InodeTableValue<C>>>;
+
+    /// Structure for metadata about a blocktree.
+    #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
+    struct Superblock {
+        /// The generation number of the cluster this part of the blocktree is stored on.
+        generation: u64,
+        /// The next free inode available to the cluster.
+        next_inode: u64,
+    }
+
+    /// Structure for managing the part of a blocktree which is stored in the local filesystem.
+    pub struct LocalFsProvider<C, A> {
+        /// The path to the directory in the local filesystem where this blocktree is located.
+        path: PathBuf,
+        /// A map from inode numbers to their reference counts.
+        inodes: RwLock<InodeTable<C>>,
+        /// The next inode that will be assigned to a new block.
+        next_inode: AtomicU64,
+        /// The generation number of this filesystem. This is the same for every other server in
+        /// the same cluster.
+        generation: u64,
+        /// The credentials this blocktree instance will use for all cryptographic operations.
+        creds: C,
+        authorizer: A,
+    }
+
+    impl<C, A> LocalFsProvider<C, A> {
+        /// The maximum number of directory entries that can be returned in any given call to
+        /// `read_dir`.
+        const DIR_ENTRY_LIMIT: usize = 1024;
+    }
+
+    impl<C: Creds + 'static, A> LocalFsProvider<C, A> {
+        /// Creates a new empty blocktree at the given path.
+        pub fn new_empty(
+            btdir: PathBuf,
+            generation: u64,
+            creds: C,
+            authorizer: A,
+        ) -> Result<LocalFsProvider<C, A>> {
+            let root_block_path = creds
+                .writecap()
+                .ok_or(BlockError::MissingWritecap)?
+                .root_block_path();
+
+            // Initialize the superblock.
+            let mut sb_block = Self::open_block(
+                &btdir,
+                SpecInodes::Sb.into(),
+                creds.clone(),
+                root_block_path.to_owned(),
+            )?;
+            let sb = Superblock {
+                generation,
+                next_inode: SpecInodes::FirstFree.into(),
+            };
+            write_to(&sb, &mut sb_block)?;
+            sb_block.mut_meta_body().access_secrets(|secrets| {
+                secrets.block_id.generation = generation;
+                secrets.block_id.inode = SpecInodes::Sb.into();
+                secrets.mode = FileType::Reg.value() | 0o666;
+                secrets.uid = 0;
+                secrets.gid = 0;
+                secrets.nlink = 1;
+                Ok(())
+            })?;
+            sb_block.flush()?;
+
+            // Initialize the root directory.
+            let mut root_block = Self::open_block(
+                &btdir,
+                SpecInodes::RootDir.into(),
+                creds.clone(),
+                root_block_path,
+            )?;
+            write_to(&Directory::new(), &mut root_block)?;
+            root_block.mut_meta_body().access_secrets(|secrets| {
+                secrets.block_id.generation = generation;
+                secrets.block_id.inode = SpecInodes::RootDir.into();
+                secrets.mode = FileType::Dir.value() | 0o777;
+                secrets.uid = 0;
+                secrets.gid = 0;
+                secrets.nlink = 1;
+                Ok(())
+            })?;
+            root_block.flush()?;
+
+            Self::new(btdir, sb, sb_block, root_block, creds, authorizer)
+        }
+
+        /// Opens an existing blocktree stored at the given path.
+        pub fn new_existing(
+            btdir: PathBuf,
+            creds: C,
+            authorizer: A,
+        ) -> Result<LocalFsProvider<C, A>> {
+            let root_block_path = creds
+                .writecap()
+                .ok_or(BlockError::MissingWritecap)?
+                .root_block_path();
+            let mut sb_block = Self::open_block(
+                &btdir,
+                SpecInodes::Sb.into(),
+                creds.clone(),
+                root_block_path.to_owned(),
+            )?;
+            let sb = read_from(&mut sb_block)?;
+            let root_block = Self::open_block(
+                &btdir,
+                SpecInodes::RootDir.into(),
+                creds.clone(),
+                root_block_path,
+            )?;
+            Self::new(btdir, sb, sb_block, root_block, creds, authorizer)
+        }
+
+        fn new(
+            btdir: PathBuf,
+            sb: Superblock,
+            sb_block: Accessor<FileBlock<C>>,
+            root_block: Accessor<FileBlock<C>>,
+            creds: C,
+            authorizer: A,
+        ) -> Result<LocalFsProvider<C, A>> {
+            let mut inodes = HashMap::with_capacity(1);
+            inodes.insert(
+                SpecInodes::Sb.into(),
+                RwLock::new(InodeTableValue::new(sb_block)),
+            );
+            inodes.insert(
+                SpecInodes::RootDir.into(),
+                RwLock::new(InodeTableValue::new(root_block)),
+            );
+            Ok(LocalFsProvider {
+                path: btdir,
+                inodes: RwLock::new(inodes),
+                next_inode: AtomicU64::new(sb.next_inode),
+                generation: sb.generation,
+                creds,
+                authorizer,
+            })
+        }
+
+        /// Returns the path to the file storing the given inode's data.
+        fn block_path<P: AsRef<Path>>(parent: P, inode: Inode) -> PathBuf {
+            let group = inode / 0xFF;
+            let mut path = PathBuf::new();
+            path.push(parent);
+            path.push(format!("{group:02x}"));
+            path.push(format!("{inode:x}.blk"));
+            path
+        }
+
+        fn open_block<P: AsRef<Path>>(
+            btdir: P,
+            inode: Inode,
+            creds: C,
+            block_path: BlockPath,
+        ) -> Result<Accessor<FileBlock<C>>> {
+            let path = Self::block_path(&btdir, inode);
+            let dir = path.ancestors().nth(1).unwrap();
+            if let Err(err) = std::fs::create_dir(dir) {
+                match err.kind() {
+                    io::ErrorKind::AlreadyExists => (),
+                    _ => return Err(err.into()),
+                }
+            }
+            let file = std::fs::OpenOptions::new()
+                .read(true)
+                .write(true)
+                .create(true)
+                .open(path)?;
+            Self::open_block_file(file, creds, block_path)
+        }
+
+        fn open_block_file(
+            file: File,
+            creds: C,
+            block_path: BlockPath,
+        ) -> Result<Accessor<FileBlock<C>>> {
+            let block = BlockOpenOptions::new()
+                .with_creds(creds)
+                .with_compress(false)
+                .with_encrypt(true)
+                .with_inner(file)
+                .with_block_path(block_path)
+                .open()?;
+            Ok(block)
+        }
+
+        fn access_entry<T, F: FnOnce(InodeTableEntry<C>) -> Result<T>>(
+            &self,
+            inode: Inode,
+            cb: F,
+        ) -> Result<T> {
+            let mut inodes = self.inodes.write().display_err()?;
+            let entry = inodes.entry(inode);
+            cb(entry)
+        }
+
+        fn access_value<T, F: FnOnce(&InodeTableValue<C>) -> Result<T>>(
+            &self,
+            inode: Inode,
+            cb: F,
+        ) -> Result<T> {
+            let inodes = self.inodes.read().display_err()?;
+            let guard = inodes
+                .get(&inode)
+                .ok_or_else(|| bterr!(Error::NotOpen(inode)))?
+                .read()
+                .display_err()?;
+            cb(&guard)
+        }
+
+        fn access_value_mut<T, F: FnOnce(&mut InodeTableValue<C>) -> Result<T>>(
+            &self,
+            inode: Inode,
+            cb: F,
+        ) -> Result<T> {
+            let inodes = self.inodes.read().display_err()?;
+            let mut guard = inodes
+                .get(&inode)
+                .ok_or_else(|| bterr!(Error::NotOpen(inode)))?
+                .write()
+                .display_err()?;
+            cb(&mut guard)
+        }
+
+        #[allow(dead_code)]
+        fn handle_guard<'a>(&'a self, inode: Inode, handle: Handle) -> Result<ValueGuard<'a, C>> {
+            let table_guard = self.inodes.read().display_err()?;
+            ValueGuard::new(table_guard, inode, handle)
+        }
+
+        fn access_block<T, F: FnOnce(&mut Accessor<&FileBlock<C>>) -> Result<T>>(
+            &self,
+            inode: Inode,
+            handle: Handle,
+            cb: F,
+        ) -> Result<T> {
+            self.access_value(inode, |value| value.access_block(handle, cb))
+        }
+
+        fn access_block_mut<T, F: FnOnce(&mut Accessor<&mut FileBlock<C>>) -> Result<T>>(
+            &self,
+            inode: Inode,
+            handle: Handle,
+            cb: F,
+        ) -> Result<T> {
+            self.access_value_mut(inode, |value| value.access_block_mut(handle, cb))
+        }
+
+        fn borrow_block<T, F: FnOnce(&Accessor<FileBlock<C>>) -> Result<T>>(
+            &self,
+            inode: Inode,
+            cb: F,
+        ) -> Result<T> {
+            self.access_value(inode, |value| value.borrow_block(cb))
+        }
+
+        fn borrow_block_mut<T, F: FnOnce(&mut Accessor<FileBlock<C>>) -> Result<T>>(
+            &self,
+            inode: Inode,
+            cb: F,
+        ) -> Result<T> {
+            self.access_value_mut(inode, |value| value.borrow_block_mut(cb))
+        }
+
+        fn open_value<T, F: FnOnce(&mut InodeTableValue<C>) -> Result<T>>(
+            &self,
+            inode: Inode,
+            block_path: BlockPath,
+            cb: F,
+        ) -> Result<T> {
+            self.access_entry(inode, |entry| match entry {
+                InodeTableEntry::Vacant(entry) => {
+                    let block =
+                        Self::open_block(&self.path, inode, self.creds.clone(), block_path)?;
+                    let mut value = InodeTableValue::new(block);
+                    let result = cb(&mut value);
+                    entry.insert(RwLock::new(value));
+                    result
+                }
+                InodeTableEntry::Occupied(mut entry) => {
+                    let value = entry.get_mut().get_mut().display_err()?;
+                    cb(value)
+                }
+            })
+        }
+
+        fn open_then_take_handle<T, F: FnOnce(Handle, &mut InodeTableValue<C>) -> Result<T>>(
+            &self,
+            inode: Inode,
+            block_path: BlockPath,
+            mutable: bool,
+            cb: F,
+        ) -> Result<T> {
+            self.open_value(inode, block_path, |value| {
+                let handle = value.new_handle(mutable)?;
+                cb(handle, value)
+            })
+        }
+
+        fn inode_forget(
+            &self,
+            inodes: &mut RwLockWriteGuard<InodeTable<C>>,
+            inode: Inode,
+            count: u64,
+        ) -> io::Result<()> {
+            let lookup_count = {
+                let inode_lock = match inodes.get_mut(&inode) {
+                    Some(inode_lock) => inode_lock,
+                    None => {
+                        warn!("an attempt was made to forget non-existent inode {inode}");
+                        return Ok(());
+                    }
+                };
+                let mut value = inode_lock.write().display_err()?;
+                value.decr_lookup_count(count)
+            };
+            if 0 == lookup_count {
+                let delete = inodes
+                    .remove(&inode)
+                    .unwrap()
+                    .into_inner()
+                    .display_err()?
+                    .delete;
+                if delete {
+                    let path = Self::block_path(&self.path, inode);
+                    std::fs::remove_file(path)?;
+                }
+            }
+            Ok(())
+        }
+
+        /// Returns the next available inode and updates the superblock in one atomic operation.
+        /// TODO: Obviously this strategy won't work when there are multiple servers in this
+        /// generation.
+        fn next_inode(&self) -> Result<Inode> {
+            self.borrow_block_mut(SpecInodes::Sb.into(), |mut block| {
+                // We don't need strict ordering because the lock on the inode table value is already
+                // serializing access.
+                let inode = self.next_inode.fetch_add(1, Ordering::Relaxed);
+                let sb = Superblock {
+                    generation: self.generation,
+                    next_inode: inode + 1,
+                };
+                block.rewind()?;
+                write_to(&sb, &mut block)?;
+                Ok(inode)
+            })
+        }
+
+        fn attr_timeout(&self) -> Duration {
+            Duration::from_secs(5)
+        }
+
+        fn entry_timeout(&self) -> Duration {
+            Duration::from_secs(5)
+        }
+
+        fn unsupported_flag_err(flag: &str) -> btlib::Error {
+            bterr!("unsupported flag: {flag}")
+        }
+
+        fn bt_entry(&self, attr: BlockMetaSecrets) -> crate::msg::Entry {
+            crate::msg::Entry {
+                attr,
+                attr_timeout: self.attr_timeout(),
+                entry_timeout: self.entry_timeout(),
+            }
+        }
+
+        fn authz_context<'a>(&'a self, from: &BlockPath, meta: &'a BlockMeta) -> AuthzContext {
+            // TODO: Use from to lookup the uid and gid in the blocktree.
+            AuthzContext {
+                uid: 0,
+                gid: 0,
+                pid: 0,
+                meta,
+            }
+        }
+    }
+
+    unsafe impl<C: Sync, A: Sync> Sync for LocalFsProvider<C, A> {}
+
+    impl<C: 'static + Creds + Clone + Sync, A: 'static + Authorizer + Sync> FsProvider
+        for LocalFsProvider<C, A>
+    {
+        type LookupFut<'c> = Ready<Result<LookupReply>>;
+        fn lookup<'c>(&'c self, from: &'c BlockPath, msg: Lookup<'c>) -> Self::LookupFut<'c> {
+            let result = (move || {
+                let Lookup { parent, name, .. } = msg;
+                debug!("LocalFsProvider::lookup, parent {parent}, {:?}", name);
+                let (dir, block_path) = match self.borrow_block_mut(parent, |block| {
+                    self.authorizer
+                        .can_exec(&self.authz_context(from, block.meta()))?;
+                    let dir = block.read_dir()?;
+                    let path = block.meta_body().path().to_owned();
+                    Ok((dir, path))
+                }) {
+                    Ok(pair) => pair,
+                    Err(err) => {
+                        error!("LocalFsProvider::lookup failed to borrow inode {parent}: {err}");
+                        return Err(err.into());
+                    }
+                };
+                let entry = dir
+                    .entry(name)
+                    .ok_or_else(|| io::Error::from_raw_os_error(libc::ENOENT))?;
+                let inode = entry.inode().ok_or_else(|| {
+                    io::Error::new(io::ErrorKind::Unsupported, "can't lookup server entry")
+                })?;
+                let stat = match self.open_value(inode, block_path, |value| {
+                    let stat = value.block.meta_body().secrets()?.to_owned();
+                    value.incr_lookup_count();
+                    Ok(stat)
+                }) {
+                    Ok(stat) => stat,
+                    Err(err) => {
+                        error!("LocalFsProvider::lookup failed to read stats for '{name}': {err}");
+                        return Err(err.into());
+                    }
+                };
+                let entry = self.bt_entry(stat);
+                let reply = LookupReply {
+                    inode,
+                    generation: self.generation,
+                    entry,
+                };
+                Ok(reply)
+            })();
+            ready(result)
+        }
+
+        type CreateFut<'c> = Ready<Result<CreateReply>>;
+        fn create<'c>(&'c self, from: &'c BlockPath, msg: Create<'c>) -> Self::CreateFut<'c> {
+            let result = (move || {
+                let Create {
+                    parent,
+                    name,
+                    flags,
+                    mode,
+                    umask,
+                } = msg;
+                debug!("Blocktree::create, parent {parent}, name {:?}", name);
+
+                let name = msg.name.to_owned();
+
+                // Add a directory entry to the parent for the new inode.
+                let (inode, mut block_path, uid, gid) = self.borrow_block_mut(parent, |block| {
+                    let ctx = self.authz_context(from, block.meta());
+                    let uid = ctx.uid;
+                    let gid = ctx.gid;
+                    self.authorizer.can_write(&ctx)?;
+
+                    let mut dir = block.read_dir()?;
+                    if dir.contains_entry(&name) {
+                        return Err(io::Error::from_raw_os_error(libc::EEXIST).into());
+                    }
+
+                    // Reserve a free inode.
+                    let inode = self.next_inode()?;
+                    dir.add_file(name.clone(), inode)?;
+                    block.write_dir(&dir)?;
+
+                    Ok((inode, block.meta_body().path().clone(), uid, gid))
+                })?;
+                block_path.push_component(name);
+
+                let mutable = mutable(flags);
+                let (handle, stat) =
+                    self.open_then_take_handle(inode, block_path, true, |handle, value| {
+                        let pair = value.access_block_mut(handle, |block| {
+                            let stat = block.mut_meta_body().access_secrets(|secrets| {
+                                secrets.block_id.generation = self.generation;
+                                secrets.block_id.inode = inode;
+                                secrets.mode = mode & !umask;
+                                secrets.uid = uid;
+                                secrets.gid = gid;
+                                let now = Epoch::now();
+                                secrets.atime = now;
+                                secrets.ctime = now;
+                                secrets.mtime = now;
+                                secrets.nlink = 1;
+                                Ok(secrets.to_owned())
+                            })?;
+                            if flags & libc::O_DIRECTORY != 0 {
+                                // Note that write_dir flushes data after writing, including
+                                // metadata.
+                                block.write_dir(&Directory::new())?;
+                            } else {
+                                block.flush_meta()?;
+                            }
+                            Ok((handle, stat))
+                        })?;
+                        if flags & libc::O_DIRECTORY != 0 {
+                            value.convert_to_dir(handle)?;
+                        }
+                        value.set_mutable(handle, mutable)?;
+                        Ok(pair)
+                    })?;
+
+                Ok(CreateReply {
+                    inode,
+                    handle,
+                    entry: self.bt_entry(stat),
+                })
+            })();
+            ready(result)
+        }
+
+        type OpenFut<'c> = Ready<Result<OpenReply>>;
+        fn open<'c>(&'c self, from: &'c BlockPath, msg: Open) -> Self::OpenFut<'c> {
+            let result = (move || {
+                let Open { inode, flags } = msg;
+                debug!("Blocktree::open, inode {inode}, flags {flags}");
+                if flags & libc::O_APPEND != 0 {
+                    return Err(Self::unsupported_flag_err("O_APPEND"));
+                }
+                if flags & libc::O_CLOEXEC != 0 {
+                    return Err(Self::unsupported_flag_err("O_CLOEXEC"));
+                }
+                let handle = self.access_value_mut(inode, |value| {
+                    let mutable = mutable(flags);
+                    let handle = value.new_handle(mutable)?;
+                    let result = value.access_block(handle, |block| {
+                        let ctx = self.authz_context(from, block.meta());
+                        // Be careful, because libc::O_RDONLY is 0 you can't use the '&' operator
+                        // to test for it, it has to be an equality check.
+                        if flags == libc::O_RDONLY || (flags & libc::O_RDWR) != 0 {
+                            self.authorizer.can_read(&ctx)?;
+                        }
+                        if mutable {
+                            self.authorizer.can_write(&ctx)?;
+                        }
+                        Ok(())
+                    });
+                    if result.is_err() {
+                        value.forget_handle(handle);
+                    }
+                    if flags & libc::O_DIRECTORY != 0 {
+                        value.convert_to_dir(handle)?;
+                    }
+                    Ok(handle)
+                })?;
+                Ok(OpenReply { handle })
+            })();
+            ready(result)
+        }
+
+        fn read<'c, R, F>(&'c self, from: &'c BlockPath, msg: Read, callback: F) -> Result<R>
+        where
+            F: 'c + Send + FnOnce(&[u8]) -> R,
+        {
+            let Read {
+                inode,
+                handle,
+                offset,
+                size,
+            } = msg;
+            let mut callback = Some(callback);
+            let output = self
+                .access_block(inode, handle, |block| {
+                    let pos = block.pos() as u64;
+                    if offset != pos {
+                        if let Err(err) = block.try_seek(SeekFrom::Start(offset)) {
+                            //  An error with `ErrorKind::Unsupported` means that the `SectoredBuf`
+                            // has unflushed data and it needs exclusive access to the block to
+                            // perform this seek because this data needs to be written.
+                            if let io::ErrorKind::Unsupported = err.kind() {
+                                return Ok(None);
+                            } else {
+                                return Err(err.into());
+                            }
+                        }
+                    }
+                    let buf = block.get_buf(offset, size)?;
+                    let callback = callback.take().unwrap();
+                    Ok(Some(callback(buf)))
+                })
+                .io_err()?;
+            let output = match output {
+                Some(output) => output,
+                None => {
+                    // The offset of this read requires us to flush data buffered from a previous
+                    // write before seeking to a different sector, so we have to access the block
+                    // mutably.
+                    self.access_block_mut(inode, handle, |block| {
+                        block.seek(SeekFrom::Start(offset))?;
+                        let buf = block.get_buf(offset, size)?;
+                        let callback = callback.take().unwrap();
+                        Ok(callback(buf))
+                    })?
+                }
+            };
+            Ok(output)
+        }
+
+        type WriteFut<'c> = Ready<Result<WriteReply>>;
+        fn write<'c>(&'c self, from: &'c BlockPath, msg: Write) -> Self::WriteFut<'c> {
+            let result = (move || {
+                let Write {
+                    inode,
+                    handle,
+                    offset,
+                    data,
+                } = msg;
+                debug!("Blocktree::write, inode {inode}, handle {handle}, offset {offset}");
+                let written = self.access_block_mut(inode, handle, |block| {
+                    let pos = block.pos() as u64;
+                    if offset != pos {
+                        block.seek(SeekFrom::Start(offset))?;
+                    }
+                    block.write(data).bterr()
+                })?;
+                Ok(WriteReply {
+                    written: written as u64,
+                })
+            })();
+            ready(result)
+        }
+
+        type FlushFut<'c> = Ready<Result<()>>;
+        fn flush<'c>(&'c self, from: &'c BlockPath, msg: Flush) -> Self::FlushFut<'c> {
+            let result = {
+                let Flush { inode, handle } = msg;
+                debug!("Blocktree::flush, inode {inode}, handle {handle}");
+                self.access_block_mut(inode, handle, |block| block.flush().bterr())
+            };
+            ready(result)
+        }
+
+        type ReadDirFut<'c> = Ready<Result<ReadDirReply>>;
+        fn read_dir<'c>(&'c self, from: &'c BlockPath, msg: ReadDir) -> Self::ReadDirFut<'c> {
+            let ReadDir {
+                inode,
+                handle,
+                limit,
+                state,
+            } = msg;
+            debug!("Blocktree::readdir, inode {inode}, handle {handle}, state {state}");
+            let result = self.access_value(inode, |value| {
+                let dir = value
+                    .value(handle)
+                    .map_err(|_| bterr!(Error::InvalidHandle { handle, inode }))?
+                    .directory()?;
+                let state: usize = state.try_into()?;
+                let server_limit = Self::DIR_ENTRY_LIMIT.min(dir.num_entries() - state);
+                let entries_len = if limit > 0 {
+                    server_limit.min(limit as usize)
+                } else {
+                    server_limit
+                };
+                let pairs = dir
+                    .entries()
+                    .skip(state)
+                    .take(entries_len)
+                    .map(|(name, entry)| (name.to_owned(), entry.to_owned()));
+                let mut entries = Vec::with_capacity(entries_len);
+                entries.extend(pairs);
+                let reply = ReadDirReply {
+                    entries,
+                    new_state: entries_len as u64,
+                };
+                Ok(reply)
+            });
+            ready(result)
+        }
+
+        type LinkFut<'c> = Ready<Result<()>>;
+        fn link<'c>(&'c self, from: &'c BlockPath, msg: Link) -> Self::LinkFut<'c> {
+            let Link {
+                inode,
+                new_parent,
+                name,
+            } = msg;
+            debug!("Blocktree::link, inode {inode}, new_parent {new_parent}, name {name}");
+            let result = self.borrow_block_mut(new_parent, |block| {
+                self.authorizer
+                    .can_write(&self.authz_context(from, block.meta()))?;
+
+                let mut dir = block.read_dir()?;
+                if dir.contains_entry(name) {
+                    return Err(io::Error::from_raw_os_error(libc::EEXIST).into());
+                }
+
+                let (file_type, stat) = self.access_value_mut(inode, |value| {
+                    let block = value.block_mut();
+                    let meta = block.mut_meta_body();
+                    let (mode, stat) = meta.access_secrets(|secrets| {
+                        secrets.nlink += 1;
+                        Ok((secrets.mode, secrets.stat()?))
+                    })?;
+                    let file_type = FileType::from_value(mode)?;
+                    block.flush_meta()?;
+                    value.incr_lookup_count();
+                    Ok((file_type, stat))
+                })?;
+                let entry = match file_type {
+                    FileType::Reg => DirEntry::File(BlockRecord::new(inode)),
+                    FileType::Dir => DirEntry::Directory(BlockRecord::new(inode)),
+                };
+                dir.insert_entry(name.to_owned(), entry);
+                block.write_dir(&dir)?;
+                Ok(())
+            });
+            ready(result)
+        }
+
+        type UnlinkFut<'c> = Ready<Result<()>>;
+        fn unlink<'c>(&'c self, from: &'c BlockPath, msg: Unlink) -> Self::UnlinkFut<'c> {
+            let result = (move || {
+                let Unlink { parent, name } = msg;
+                debug!("Blocktree::unlink, parent {parent}, name {:?}", name);
+                let (block_path, inode) = self.borrow_block_mut(parent, |block| {
+                    self.authorizer
+                        .can_write(&self.authz_context(from, block.meta()))?;
+
+                    let mut dir = block.read_dir()?;
+                    let entry = match dir.remove_entry(name) {
+                        None => return Err(io::Error::from_raw_os_error(libc::ENOENT).into()),
+                        Some(entry) => entry,
+                    };
+                    let inode = entry.inode().ok_or_else(|| {
+                        io::Error::new(
+                            io::ErrorKind::Other,
+                            "no inode associated with the given name",
+                        )
+                    })?;
+                    block.write_dir(&dir)?;
+
+                    let mut block_path = block.meta_body().path().clone();
+                    block_path.push_component(name.to_owned());
+                    Ok((block_path, inode))
+                })?;
+                self.open_value(inode, block_path, |value| {
+                    // We mark the block for deletion if `nlink` drops to zero.
+                    let block = value.block_mut();
+                    let nlink = block.mut_meta_body().access_secrets(|secrets| {
+                        secrets.nlink -= 1;
+                        Ok(secrets.nlink)
+                    })?;
+                    block.flush_meta()?;
+                    value.delete = 0 == nlink;
+                    Ok(())
+                })
+            })();
+            ready(result)
+        }
+
+        type ReadMetaFut<'c> = Ready<Result<BlockMeta>>;
+        fn read_meta<'c>(&'c self, from: &'c BlockPath, msg: ReadMeta) -> Self::ReadMetaFut<'c> {
+            let result = (move || {
+                let ReadMeta { inode, handle } = msg;
+                debug!("Blocktree::getattr, inode {inode}, handle {:?}", handle);
+                let meta = if let Some(handle) = handle {
+                    self.access_block(inode, handle, |block| Ok(block.meta().to_owned()))?
+                } else {
+                    self.borrow_block(inode, |block| Ok(block.meta().to_owned()))?
+                };
+                Ok(meta)
+            })();
+            ready(result)
+        }
+
+        type WriteMetaFut<'c> = Ready<Result<()>>;
+        fn write_meta<'c>(&'c self, from: &'c BlockPath, msg: WriteMeta) -> Self::WriteMetaFut<'c> {
+            let result = (move || {
+                let WriteMeta {
+                    inode,
+                    handle,
+                    meta,
+                } = msg;
+                debug!("Blocktree::setattr, inode {inode}, handle {:?}", handle);
+                let cb = |block: &mut FileBlock<C>| {
+                    self.authorizer
+                        .can_write(&self.authz_context(from, block.meta()))?;
+                    *block.mut_meta() = meta;
+                    block.flush_meta()?;
+                    Ok(())
+                };
+                if let Some(handle) = handle {
+                    self.access_block_mut(inode, handle, |block| cb(block.get_mut()))
+                } else {
+                    self.borrow_block_mut(inode, |block| cb(block.get_mut()))
+                }
+            })();
+            ready(result)
+        }
+
+        type CloseFut<'c> = Ready<Result<()>>;
+        fn close<'c>(&'c self, from: &'c BlockPath, msg: Close) -> Self::CloseFut<'c> {
+            let Close { inode, handle } = msg;
+            debug!("Blocktree::release, inode {inode}, handle {handle}");
+            let result = self.access_value_mut(inode, |value| {
+                if let Err(err) = value.access_block_mut(handle, |block| block.flush().bterr()) {
+                    if let Error::ReadOnlyHandle(_) = err.downcast_ref::<Error>().unwrap() {
+                        ()
+                    } else {
+                        return Err(err);
+                    }
+                }
+                value.forget_handle(handle);
+                Ok(())
+            });
+            ready(result)
+        }
+
+        type ForgetFut<'c> = Ready<Result<()>>;
+        fn forget<'c>(&'c self, from: &'c BlockPath, msg: Forget) -> Self::ForgetFut<'c> {
+            let result = (move || {
+                let Forget { inode, count } = msg;
+                debug!("Blocktree::forget, inode {inode}, count {count}");
+                let mut inodes = self.inodes.write().display_err()?;
+                self.inode_forget(&mut inodes, inode, count).bterr()
+            })();
+            ready(result)
+        }
+
+        type LockFut<'c> = Ready<Result<()>>;
+        fn lock<'c>(&'c self, _from: &'c BlockPath, _msg: Lock) -> Self::LockFut<'c> {
+            let result = (move || {
+                todo!();
+            })();
+            ready(result)
+        }
+
+        type UnlockFut<'c> = Ready<Result<()>>;
+        fn unlock<'c>(&'c self, _from: &'c BlockPath, _msg: Unlock) -> Self::UnlockFut<'c> {
+            let result = (move || {
+                todo!();
+            })();
+            ready(result)
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use super::private::Error;
+    use super::*;
+
+    use btlib::{
+        crypto::{ConcreteCreds, CredsPriv},
+        BlockMeta,
+    };
+    use bytes::BytesMut;
+    use lazy_static::lazy_static;
+    use std::sync::Arc;
+    use tempdir::TempDir;
+
+    lazy_static! {
+        static ref ROOT_CREDS: ConcreteCreds = ConcreteCreds::generate().unwrap();
+        static ref NODE_CREDS: ConcreteCreds = {
+            let root_creds = &ROOT_CREDS;
+            let mut node_creds = ConcreteCreds::generate().unwrap();
+            let writecap = root_creds
+                .issue_writecap(
+                    node_creds.principal(),
+                    vec![],
+                    Epoch::now() + Duration::from_secs(3600),
+                )
+                .unwrap();
+            node_creds.set_writecap(writecap);
+            node_creds
+        };
+    }
+
+    fn node_creds() -> &'static ConcreteCreds {
+        &NODE_CREDS
+    }
+
+    /// Tests for the [ModeAuthorizer] struct.
+    mod mode_authorizer_tests {
+        use super::{
+            super::private::{Authorizer, AuthzContext},
+            *,
+        };
+
+        struct TestCase {
+            ctx_uid: u32,
+            ctx_gid: u32,
+            meta: BlockMeta,
+        }
+
+        impl TestCase {
+            const BLOCK_UID: u32 = 1000;
+            const BLOCK_GID: u32 = 1000;
+            const CTX_PID: libc::pid_t = 100;
+
+            fn new(ctx_uid: u32, ctx_gid: u32, mode: u32) -> TestCase {
+                let mut meta =
+                    BlockMeta::new(node_creds()).expect("failed to create block metadata");
+                meta.mut_body()
+                    .access_secrets(|secrets| {
+                        secrets.uid = Self::BLOCK_UID;
+                        secrets.gid = Self::BLOCK_GID;
+                        secrets.mode = mode;
+                        Ok(())
+                    })
+                    .expect("failed to update secrets");
+                TestCase {
+                    ctx_uid,
+                    ctx_gid,
+                    meta,
+                }
+            }
+
+            fn context(&self) -> AuthzContext<'_> {
+                AuthzContext {
+                    uid: self.ctx_uid,
+                    gid: self.ctx_gid,
+                    pid: Self::CTX_PID,
+                    meta: &self.meta,
+                }
+            }
+        }
+
+        #[test]
+        fn cant_read_when_no_bits_set() {
+            let case = TestCase::new(TestCase::BLOCK_UID, TestCase::BLOCK_GID, 0);
+            let result = ModeAuthorizer {}.can_read(&case.context());
+            assert!(result.is_err())
+        }
+
+        #[test]
+        fn cant_write_when_no_bits_set() {
+            let case = TestCase::new(TestCase::BLOCK_UID, TestCase::BLOCK_GID, 0);
+            let result = ModeAuthorizer {}.can_write(&case.context());
+            assert!(result.is_err())
+        }
+
+        #[test]
+        fn cant_exec_when_no_bits_set() {
+            let case = TestCase::new(TestCase::BLOCK_UID, TestCase::BLOCK_GID, 0);
+            let result = ModeAuthorizer {}.can_exec(&case.context());
+            assert!(result.is_err())
+        }
+
+        #[test]
+        fn user_can_read_when_bit_set() {
+            let case = TestCase::new(TestCase::BLOCK_UID, TestCase::BLOCK_GID, libc::S_IRUSR);
+            let result = ModeAuthorizer {}.can_read(&case.context());
+            assert!(result.is_ok())
+        }
+
+        #[test]
+        fn user_can_write_when_bit_set() {
+            let case = TestCase::new(TestCase::BLOCK_UID, TestCase::BLOCK_GID, libc::S_IWUSR);
+            let result = ModeAuthorizer {}.can_write(&case.context());
+            assert!(result.is_ok())
+        }
+
+        #[test]
+        fn user_can_exec_when_bit_set() {
+            let case = TestCase::new(TestCase::BLOCK_UID, TestCase::BLOCK_GID, libc::S_IXUSR);
+            let result = ModeAuthorizer {}.can_exec(&case.context());
+            assert!(result.is_ok())
+        }
+
+        #[test]
+        fn group_can_read_when_bit_set() {
+            let case = TestCase::new(TestCase::BLOCK_UID, TestCase::BLOCK_GID, libc::S_IRGRP);
+            let result = ModeAuthorizer {}.can_read(&case.context());
+            assert!(result.is_ok())
+        }
+
+        #[test]
+        fn group_can_write_when_bit_set() {
+            let case = TestCase::new(TestCase::BLOCK_UID, TestCase::BLOCK_GID, libc::S_IWGRP);
+            let result = ModeAuthorizer {}.can_write(&case.context());
+            assert!(result.is_ok())
+        }
+
+        #[test]
+        fn group_can_exec_when_bit_set() {
+            let case = TestCase::new(TestCase::BLOCK_UID, TestCase::BLOCK_GID, libc::S_IXGRP);
+            let result = ModeAuthorizer {}.can_exec(&case.context());
+            assert!(result.is_ok())
+        }
+
+        #[test]
+        fn other_can_read_when_bit_set() {
+            let case = TestCase::new(TestCase::BLOCK_UID, TestCase::BLOCK_GID, libc::S_IROTH);
+            let result = ModeAuthorizer {}.can_read(&case.context());
+            assert!(result.is_ok())
+        }
+
+        #[test]
+        fn other_can_write_when_bit_set() {
+            let case = TestCase::new(TestCase::BLOCK_UID, TestCase::BLOCK_GID, libc::S_IWOTH);
+            let result = ModeAuthorizer {}.can_write(&case.context());
+            assert!(result.is_ok())
+        }
+
+        #[test]
+        fn other_can_exec_when_bit_set() {
+            let case = TestCase::new(TestCase::BLOCK_UID, TestCase::BLOCK_GID, libc::S_IXOTH);
+            let result = ModeAuthorizer {}.can_exec(&case.context());
+            assert!(result.is_ok())
+        }
+
+        #[test]
+        fn other_cant_write_even_if_user_can() {
+            let case = TestCase::new(
+                TestCase::BLOCK_UID + 1,
+                TestCase::BLOCK_GID + 1,
+                libc::S_IWUSR,
+            );
+            let result = ModeAuthorizer {}.can_write(&case.context());
+            assert!(result.is_err())
+        }
+
+        #[test]
+        fn other_cant_write_even_if_group_can() {
+            let case = TestCase::new(
+                TestCase::BLOCK_UID + 1,
+                TestCase::BLOCK_GID + 1,
+                libc::S_IWGRP,
+            );
+            let result = ModeAuthorizer {}.can_write(&case.context());
+            assert!(result.is_err())
+        }
+
+        #[test]
+        fn user_allowed_read_when_only_other_bit_set() {
+            let case = TestCase::new(TestCase::BLOCK_UID, TestCase::BLOCK_GID, libc::S_IROTH);
+            let result = ModeAuthorizer {}.can_read(&case.context());
+            assert!(result.is_ok())
+        }
+
+        #[test]
+        fn root_always_allowed() {
+            let case = TestCase::new(0, 0, 0);
+            let ctx = case.context();
+            let authorizer = ModeAuthorizer {};
+            assert!(authorizer.can_read(&ctx).is_ok());
+            assert!(authorizer.can_write(&ctx).is_ok());
+            assert!(authorizer.can_exec(&ctx).is_ok());
+        }
+    }
+
+    fn bind_path<C: CredsPriv>(creds: C) -> BlockPath {
+        let writecap = creds
+            .writecap()
+            .ok_or(btlib::BlockError::MissingWritecap)
+            .unwrap();
+        writecap.bind_path()
+    }
+
+    struct BtTestCase {
+        dir: TempDir,
+        bt: LocalFsProvider<ConcreteCreds, ModeAuthorizer>,
+        from: BlockPath,
+    }
+
+    impl BtTestCase {
+        fn new_empty() -> BtTestCase {
+            let dir = TempDir::new("fuse").expect("failed to create temp dir");
+            let bt = LocalFsProvider::new_empty(
+                dir.path().to_owned(),
+                0,
+                Self::creds(),
+                ModeAuthorizer {},
+            )
+            .expect("failed to create empty blocktree");
+            let from = bind_path(node_creds());
+            BtTestCase { dir, bt, from }
+        }
+
+        fn new_existing(dir: TempDir) -> BtTestCase {
+            let bt = LocalFsProvider::new_existing(
+                dir.path().to_owned(),
+                Self::creds(),
+                ModeAuthorizer {},
+            )
+            .expect("failed to create blocktree from existing directory");
+            let from = bind_path(node_creds());
+            BtTestCase { dir, bt, from }
+        }
+
+        fn creds() -> ConcreteCreds {
+            node_creds().clone()
+        }
+
+        fn from(&self) -> &BlockPath {
+            &self.from
+        }
+    }
+
+    /// Tests that a new file can be created, written to and the written data can be read from it.
+    #[tokio::test]
+    async fn create_write_read() {
+        let case = BtTestCase::new_empty();
+        let bt = &case.bt;
+        let from = case.from();
+        let name = "README.md";
+        let flags = libc::O_RDWR;
+
+        let create_msg = Create {
+            parent: SpecInodes::RootDir.into(),
+            name,
+            flags,
+            mode: libc::S_IFREG | 0o644,
+            umask: 0,
+        };
+        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
+
+        const LEN: usize = 32;
+        let expected = [1u8; LEN];
+        let write_msg = Write {
+            inode,
+            handle,
+            offset: 0,
+            data: expected.as_slice(),
+        };
+        let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
+        assert_eq!(LEN as u64, written);
+
+        let mut actual = [0u8; LEN];
+        let read_msg = Read {
+            inode,
+            handle,
+            offset: 0,
+            size: LEN as u64,
+        };
+        bt.read(from, read_msg, |data| actual.copy_from_slice(data))
+            .unwrap();
+
+        assert_eq!(expected, actual)
+    }
+
+    #[tokio::test]
+    async fn lookup() {
+        let case = BtTestCase::new_empty();
+        let bt = &case.bt;
+        let from = case.from();
+
+        let name = "README.md";
+        let create_msg = Create {
+            parent: SpecInodes::RootDir.into(),
+            name,
+            flags: 0,
+            mode: 0,
+            umask: 0,
+        };
+        let create_reply = bt.create(from, create_msg).await.unwrap();
+
+        let lookup_msg = Lookup {
+            parent: SpecInodes::RootDir.into(),
+            name,
+        };
+        let lookup_reply = bt.lookup(from, lookup_msg).await.unwrap();
+
+        assert_eq!(create_reply.inode, lookup_reply.inode);
+    }
+
+    /// Tests that data written by one instance of [Blocktree] can be read by a subsequent
+    /// instance.
+    #[tokio::test]
+    async fn new_existing() {
+        const EXPECTED: &[u8] = b"cool as cucumbers";
+        let name = "RESIGNATION.docx";
+        let case = BtTestCase::new_empty();
+        let bt = &case.bt;
+        let from = case.from();
+        let flags = libc::O_RDWR;
+
+        {
+            let create_msg = Create {
+                parent: SpecInodes::RootDir.into(),
+                name,
+                mode: libc::S_IFREG | 0o644,
+                flags,
+                umask: 0,
+            };
+            let CreateReply { handle, inode, .. } = bt.create(from, create_msg).await.unwrap();
+
+            let write_msg = Write {
+                inode,
+                handle,
+                offset: 0,
+                data: EXPECTED,
+            };
+            let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
+            assert_eq!(EXPECTED.len() as u64, written);
+
+            let flush_msg = Flush { inode, handle };
+            bt.flush(from, flush_msg).await.unwrap();
+        }
+
+        let case = BtTestCase::new_existing(case.dir);
+        let from = case.from();
+        let bt = &case.bt;
+
+        let lookup_msg = Lookup {
+            parent: SpecInodes::RootDir.into(),
+            name,
+        };
+        let LookupReply { inode, .. } = bt.lookup(from, lookup_msg).await.unwrap();
+
+        let open_msg = Open {
+            inode,
+            flags: libc::O_RDONLY,
+        };
+        let OpenReply { handle, .. } = bt.open(from, open_msg).await.unwrap();
+
+        let mut actual = BytesMut::new();
+        let read_msg = Read {
+            inode,
+            handle,
+            offset: 0,
+            size: EXPECTED.len() as u64,
+        };
+        bt.read(from, read_msg, |data| actual.extend_from_slice(data))
+            .unwrap();
+
+        assert_eq!(EXPECTED, &actual)
+    }
+
+    /// Tests that an error is returned by the `Blocktree::write` method if the file was opened
+    /// read-only.
+    #[tokio::test]
+    async fn open_read_only_write_is_error() {
+        let name = "books.ods";
+        let case = BtTestCase::new_empty();
+        let bt = &case.bt;
+        let from = case.from();
+
+        let create_msg = Create {
+            parent: SpecInodes::RootDir.into(),
+            name,
+            flags: 0,
+            mode: libc::S_IFREG | 0o644,
+            umask: 0,
+        };
+        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
+
+        let close_msg = Close { inode, handle };
+        bt.close(from, close_msg).await.unwrap();
+
+        let open_msg = Open {
+            inode,
+            flags: libc::O_RDONLY,
+        };
+        let OpenReply { handle, .. } = bt.open(from, open_msg).await.unwrap();
+
+        let data = [1u8; 32];
+        let write_msg = Write {
+            inode,
+            handle,
+            offset: 0,
+            data: &data,
+        };
+        let result = bt.write(from, write_msg).await;
+
+        let err = result.err().unwrap();
+        let err = err.downcast::<Error>().unwrap();
+        let actual_handle = if let Error::ReadOnlyHandle(actual_handle) = err {
+            Some(actual_handle)
+        } else {
+            None
+        };
+        assert_eq!(Some(handle), actual_handle);
+    }
+
+    /// Asserts that the given [Result] is an [Err] and that it contains an [io::Error] which
+    /// corresponds to the [libc::ENOENT] error code.
+    fn assert_enoent<T>(result: Result<T>) {
+        let err = result.err().unwrap().downcast::<io::Error>().unwrap();
+        assert_eq!(libc::ENOENT, err.raw_os_error().unwrap());
+    }
+
+    /// Tests that multiple handles see consistent metadata associated with a block.
+    #[tokio::test]
+    async fn ensure_metadata_consistency() {
+        let case = BtTestCase::new_empty();
+        let from = case.from();
+        let trash = ".Trash";
+        let file = "file.txt";
+        let bt = &case.bt;
+        let root = SpecInodes::RootDir.into();
+
+        let open_msg = Open {
+            inode: root,
+            flags: libc::O_DIRECTORY,
+        };
+        let OpenReply { handle, .. } = bt.open(from, open_msg).await.unwrap();
+
+        // Because the directory is open, this will cause a new handle for this block to be opened.
+        let lookup_msg = Lookup {
+            parent: root,
+            name: trash,
+        };
+        let result = bt.lookup(from, lookup_msg).await;
+        assert_enoent(result);
+
+        let close_msg = Close {
+            inode: root,
+            handle,
+        };
+        bt.close(from, close_msg).await.unwrap();
+
+        let create_msg = Create {
+            parent: root,
+            name: file,
+            flags: 0,
+            mode: libc::S_IFREG | 0o644,
+            umask: 0,
+        };
+        bt.create(from, create_msg).await.unwrap();
+
+        let open_msg = Open {
+            inode: root,
+            flags: libc::O_DIRECTORY,
+        };
+        bt.open(from, open_msg).await.unwrap();
+
+        // Since the directory is open, the second handle will be used for this lookup.
+        let lookup_msg = Lookup {
+            parent: root,
+            name: trash,
+        };
+        let result = bt.lookup(from, lookup_msg).await;
+        assert!(result.is_err());
+    }
+
+    /// Tests that the `size` parameter actually limits the number of read bytes.
+    #[tokio::test]
+    async fn read_with_smaller_size() {
+        const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
+        let case = BtTestCase::new_empty();
+        let from = case.from();
+        let file = "file.txt";
+        let bt = &case.bt;
+        let root: Inode = SpecInodes::RootDir.into();
+        let flags = libc::O_RDWR;
+
+        let create_msg = Create {
+            parent: root,
+            name: file,
+            flags,
+            mode: libc::S_IFREG | 0o644,
+            umask: 0,
+        };
+        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
+        let write_msg = Write {
+            inode,
+            handle,
+            offset: 0,
+            data: DATA.as_slice(),
+        };
+        let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
+        assert_eq!(DATA.len() as u64, written);
+        const SIZE: usize = DATA.len() / 2;
+        let mut actual = Vec::with_capacity(SIZE);
+        let read_msg = Read {
+            inode,
+            handle,
+            offset: 0,
+            size: SIZE as u64,
+        };
+        bt.read(from, read_msg, |data| actual.extend_from_slice(data))
+            .unwrap();
+
+        assert_eq!(&[0, 1, 2, 3], actual.as_slice());
+    }
+
+    /// Returns an integer array starting at the given value and increasing by one for each
+    /// subsequent entry.
+    pub const fn integer_array<const N: usize>(start: u8) -> [u8; N] {
+        let mut array = [0u8; N];
+        let mut k = 0usize;
+        while k < N {
+            array[k] = start.wrapping_add(k as u8);
+            k += 1;
+        }
+        array
+    }
+
+    #[tokio::test]
+    async fn concurrent_reads() {
+        // The size of each of the reads.
+        const SIZE: usize = 4;
+        // The number of concurrent reads.
+        const NREADS: usize = 32;
+        const DATA_LEN: usize = SIZE * NREADS;
+        const DATA: [u8; DATA_LEN] = integer_array::<DATA_LEN>(0);
+        let case = BtTestCase::new_empty();
+        let from = case.from();
+        let file = "file.txt";
+        let bt = &case.bt;
+        let root: Inode = SpecInodes::RootDir.into();
+        let flags = libc::O_RDWR;
+        let mode = libc::S_IFREG | 0o644;
+
+        let create_msg = Create {
+            parent: root,
+            name: file,
+            flags,
+            mode,
+            umask: 0,
+        };
+        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
+        let write_msg = Write {
+            inode,
+            handle,
+            offset: 0,
+            data: DATA.as_slice(),
+        };
+        let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
+        assert_eq!(DATA.len() as u64, written);
+        let case = Box::new(case);
+        let cb = Arc::new(Box::new(move |offset: usize| {
+            // Notice that we have concurrent reads to different offsets using the same handle.
+            // Without proper synchronization, this shouldn't work.
+            let mut actual = Vec::with_capacity(SIZE);
+            let read_msg = Read {
+                inode,
+                handle,
+                offset: offset as u64,
+                size: SIZE as u64,
+            };
+            case.bt
+                .read(case.from(), read_msg, |data| actual.extend_from_slice(data))
+                .unwrap();
+            let expected = integer_array::<SIZE>(offset as u8);
+            assert_eq!(&expected, actual.as_slice());
+        }));
+
+        let mut handles = Vec::with_capacity(NREADS);
+        for offset in (0..NREADS).map(|e| e * SIZE) {
+            let thread_cb = cb.clone();
+            handles.push(std::thread::spawn(move || thread_cb(offset)));
+        }
+        for handle in handles {
+            handle.join().unwrap();
+        }
+    }
+
+    #[tokio::test]
+    async fn rename_in_same_directory() {
+        let case = BtTestCase::new_empty();
+        let bt = &case.bt;
+        let from = case.from();
+        let root: Inode = SpecInodes::RootDir.into();
+        let src_name = "src";
+        let dst_name = "dst";
+
+        let create_msg = Create {
+            parent: root,
+            name: src_name,
+            flags: 0,
+            mode: libc::S_IFREG | 0o644,
+            umask: 0,
+        };
+        let CreateReply { inode, .. } = bt.create(from, create_msg).await.unwrap();
+        let link_msg = Link {
+            inode,
+            new_parent: root,
+            name: dst_name,
+        };
+        bt.link(from, link_msg).await.unwrap();
+        let unlink_msg = Unlink {
+            parent: root,
+            name: src_name,
+        };
+        bt.unlink(from, unlink_msg).await.unwrap();
+
+        let lookup_msg = Lookup {
+            parent: root,
+            name: dst_name,
+        };
+        let LookupReply {
+            inode: actual_inode,
+            ..
+        } = bt.lookup(from, lookup_msg).await.unwrap();
+        assert_eq!(inode, actual_inode);
+        let lookup_msg = Lookup {
+            parent: root,
+            name: src_name,
+        };
+        let result = bt.lookup(from, lookup_msg).await;
+        assert_enoent(result);
+    }
+
+    #[tokio::test]
+    async fn rename_to_different_directory() {
+        let case = BtTestCase::new_empty();
+        let bt = &case.bt;
+        let from = case.from();
+        let root = SpecInodes::RootDir.into();
+        let dir_name = "dir";
+        let file_name = "file";
+
+        let create_msg = Create {
+            parent: root,
+            name: dir_name,
+            flags: libc::O_DIRECTORY,
+            mode: libc::S_IFDIR | 0o755,
+            umask: 0,
+        };
+        let CreateReply { inode: dir, .. } = bt.create(from, create_msg).await.unwrap();
+        let create_msg = Create {
+            parent: root,
+            name: file_name,
+            flags: 0,
+            mode: libc::S_IFREG | 0o644,
+            umask: 0,
+        };
+        let CreateReply { inode: file, .. } = bt.create(from, create_msg).await.unwrap();
+        let link_msg = Link {
+            inode: file,
+            new_parent: dir,
+            name: file_name,
+        };
+        bt.link(from, link_msg).await.unwrap();
+        let unlink_msg = Unlink {
+            parent: root,
+            name: file_name,
+        };
+        bt.unlink(from, unlink_msg).await.unwrap();
+
+        let lookup_msg = Lookup {
+            parent: dir,
+            name: file_name,
+        };
+        let LookupReply {
+            inode: actual_inode,
+            ..
+        } = bt.lookup(from, lookup_msg).await.unwrap();
+        assert_eq!(file, actual_inode);
+        let lookup_msg = Lookup {
+            parent: root,
+            name: file_name,
+        };
+        let result = bt.lookup(from, lookup_msg).await;
+        assert_enoent(result);
+    }
+
+    #[tokio::test]
+    async fn rename_no_replace_same_directory() {
+        let case = BtTestCase::new_empty();
+        let bt = &case.bt;
+        let from = case.from();
+        let root: Inode = SpecInodes::RootDir.into();
+        let oldname = "old";
+        let newname = "new";
+
+        let create_msg = Create {
+            parent: root,
+            name: oldname,
+            flags: 0,
+            mode: 0o644,
+            umask: 0,
+        };
+        let CreateReply { inode, .. } = bt.create(from, create_msg).await.unwrap();
+        let create_msg = Create {
+            parent: root,
+            name: newname,
+            flags: 0,
+            mode: 0o644,
+            umask: 0,
+        };
+        bt.create(from, create_msg).await.unwrap();
+
+        let link_msg = Link {
+            inode,
+            new_parent: root,
+            name: newname,
+        };
+        let result = bt.link(from, link_msg).await;
+        let err = result.err().unwrap().downcast::<io::Error>().unwrap();
+        assert_eq!(io::ErrorKind::AlreadyExists, err.kind());
+    }
+
+    #[tokio::test]
+    async fn rename_no_replace_different_directory() {
+        let case = BtTestCase::new_empty();
+        let bt = &case.bt;
+        let from = case.from();
+        let root = SpecInodes::RootDir.into();
+        let dir_name = "dir";
+        let file_name = "file";
+
+        let create_msg = Create {
+            parent: root,
+            name: dir_name,
+            flags: libc::O_DIRECTORY,
+            mode: 0o755,
+            umask: 0,
+        };
+        let CreateReply { inode: dir, .. } = bt.create(from, create_msg).await.unwrap();
+        let create_msg = Create {
+            parent: root,
+            name: file_name,
+            flags: 0,
+            mode: 0o644,
+            umask: 0,
+        };
+        let CreateReply { inode, .. } = bt.create(from, create_msg).await.unwrap();
+        let create_msg = Create {
+            parent: dir,
+            name: file_name,
+            flags: 0,
+            mode: 0o644,
+            umask: 0,
+        };
+        bt.create(from, create_msg).await.unwrap();
+
+        let link_msg = Link {
+            inode,
+            new_parent: dir,
+            name: file_name,
+        };
+        let result = bt.link(from, link_msg).await;
+        let err = result.err().unwrap().downcast::<io::Error>().unwrap();
+        assert_eq!(io::ErrorKind::AlreadyExists, err.kind());
+    }
+}

+ 29 - 6
crates/btfproto/src/msg.rs

@@ -1,4 +1,4 @@
-use btlib::BlockMeta;
+use btlib::{BlockMeta, BlockMetaSecrets, DirEntry};
 use btmsg::CallMsg;
 use core::time::Duration;
 use serde::{Deserialize, Serialize};
@@ -18,6 +18,7 @@ pub enum FsMsg<'a> {
     #[serde(borrow)]
     Write(Write<'a>),
     Flush(Flush),
+    ReadDir(ReadDir),
     #[serde(borrow)]
     Link(Link<'a>),
     #[serde(borrow)]
@@ -39,6 +40,7 @@ pub enum FsReply<'a> {
     #[serde(borrow)]
     Read(ReadReply<'a>),
     Write(WriteReply),
+    ReadDir(ReadDirReply),
     Link(LinkReply),
     ReadMeta(BlockMeta),
 }
@@ -64,8 +66,7 @@ impl std::error::Error for FsError {}
 
 #[derive(Serialize, Deserialize)]
 pub struct Entry {
-    //pub attr: stat64,
-    pub attr_flags: u32,
+    pub attr: BlockMetaSecrets,
     pub attr_timeout: Duration,
     pub entry_timeout: Duration,
 }
@@ -87,13 +88,14 @@ pub struct LookupReply {
 pub struct Create<'a> {
     pub parent: Inode,
     pub name: &'a str,
-    pub flags: u32,
+    pub flags: i32,
     pub mode: u32,
     pub umask: u32,
 }
 
 #[derive(Serialize, Deserialize)]
 pub struct CreateReply {
+    pub inode: Inode,
     pub handle: Handle,
     pub entry: Entry,
 }
@@ -101,7 +103,7 @@ pub struct CreateReply {
 #[derive(Serialize, Deserialize)]
 pub struct Open {
     pub inode: Inode,
-    pub flags: u32,
+    pub flags: i32,
 }
 
 #[derive(Serialize, Deserialize)]
@@ -141,6 +143,26 @@ pub struct Flush {
     pub handle: Handle,
 }
 
+#[derive(Serialize, Deserialize)]
+pub struct ReadDir {
+    pub inode: Inode,
+    pub handle: Handle,
+    /// The maximum number of directory entries to return in a single response. A value of 0
+    /// indicates there is no limit. Note that the server may impose it's own limit.
+    pub limit: u32,
+    /// An opaque value which the server uses to keep track of the client's position in reading
+    /// the directory. A value of 0 indicates the directory is to be iterated from the beginning.
+    pub state: u64,
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct ReadDirReply {
+    pub entries: Vec<(String, DirEntry)>,
+    /// This is the value to pass in a subsequent [ReadDir] message to continue reading this
+    /// directory. A value of 0 indicates that all entries have been returned.
+    pub new_state: u64,
+}
+
 #[derive(Serialize, Deserialize)]
 pub struct Link<'a> {
     pub inode: Inode,
@@ -162,12 +184,13 @@ pub struct Unlink<'a> {
 #[derive(Serialize, Deserialize)]
 pub struct ReadMeta {
     pub inode: Inode,
+    pub handle: Option<Handle>,
 }
 
 #[derive(Serialize, Deserialize)]
 pub struct WriteMeta {
     pub inode: Inode,
-    pub handle: Handle,
+    pub handle: Option<Handle>,
     pub meta: BlockMeta,
 }
 

+ 104 - 35
crates/btfproto/src/server.rs

@@ -1,25 +1,85 @@
 use crate::msg::*;
 
-use btlib::{crypto::Creds, BlockMeta, BlockPath};
+use btlib::{crypto::Creds, BlockMeta, BlockPath, Result};
 use btmsg::{receiver, MsgCallback, MsgReceived, Receiver};
 use core::future::Future;
-use std::{net::IpAddr, result::Result, sync::Arc};
+use std::{net::IpAddr, sync::Arc};
+use tokio::runtime::Handle as RuntimeHandle;
 
 pub trait FsProvider {
-    fn lookup(&self, from: &BlockPath, msg: &Lookup) -> Result<LookupReply, FsError>;
-    fn create(&self, from: &BlockPath, msg: &Create) -> Result<CreateReply, FsError>;
-    fn open(&self, from: &BlockPath, msg: &Open) -> Result<OpenReply, FsError>;
-    fn read(&self, from: &BlockPath, msg: &Read) -> Result<ReadReply<'_>, FsError>;
-    fn write(&self, from: &BlockPath, msg: &Write) -> Result<WriteReply, FsError>;
-    fn flush(&self, from: &BlockPath, msg: &Flush) -> Result<(), FsError>;
-    fn link(&self, from: &BlockPath, msg: &Link) -> Result<(), FsError>;
-    fn unlink(&self, from: &BlockPath, msg: &Unlink) -> Result<(), FsError>;
-    fn read_meta(&self, from: &BlockPath, msg: &ReadMeta) -> Result<BlockMeta, FsError>;
-    fn write_meta(&self, from: &BlockPath, msg: &WriteMeta) -> Result<(), FsError>;
-    fn close(&self, from: &BlockPath, msg: &Close) -> Result<(), FsError>;
-    fn forget(&self, from: &BlockPath, msg: &Forget) -> Result<(), FsError>;
-    fn lock(&self, from: &BlockPath, msg: &Lock) -> Result<(), FsError>;
-    fn unlock(&self, from: &BlockPath, msg: &Unlock) -> Result<(), FsError>;
+    type LookupFut<'c>: Send + Future<Output = Result<LookupReply>>
+    where
+        Self: 'c;
+    fn lookup<'c>(&'c self, from: &'c BlockPath, msg: Lookup<'c>) -> Self::LookupFut<'c>;
+
+    type CreateFut<'c>: Send + Future<Output = Result<CreateReply>>
+    where
+        Self: 'c;
+    fn create<'c>(&'c self, from: &'c BlockPath, msg: Create<'c>) -> Self::CreateFut<'c>;
+
+    type OpenFut<'c>: Send + Future<Output = Result<OpenReply>>
+    where
+        Self: 'c;
+    fn open<'c>(&'c self, from: &'c BlockPath, msg: Open) -> Self::OpenFut<'c>;
+
+    fn read<'c, R, F>(&'c self, from: &'c BlockPath, msg: Read, callback: F) -> Result<R>
+    where
+        F: 'c + Send + FnOnce(&[u8]) -> R;
+
+    type WriteFut<'c>: Send + Future<Output = Result<WriteReply>>
+    where
+        Self: 'c;
+    fn write<'c>(&'c self, from: &'c BlockPath, msg: Write) -> Self::WriteFut<'c>;
+
+    type FlushFut<'c>: Send + Future<Output = Result<()>>
+    where
+        Self: 'c;
+    fn flush<'c>(&'c self, from: &'c BlockPath, msg: Flush) -> Self::FlushFut<'c>;
+
+    type ReadDirFut<'c>: Send + Future<Output = Result<ReadDirReply>>
+    where
+        Self: 'c;
+    fn read_dir<'c>(&'c self, from: &'c BlockPath, msg: ReadDir) -> Self::ReadDirFut<'c>;
+
+    type LinkFut<'c>: Send + Future<Output = Result<()>>
+    where
+        Self: 'c;
+    fn link<'c>(&'c self, from: &'c BlockPath, msg: Link) -> Self::LinkFut<'c>;
+
+    type UnlinkFut<'c>: Send + Future<Output = Result<()>>
+    where
+        Self: 'c;
+    fn unlink<'c>(&'c self, from: &'c BlockPath, msg: Unlink) -> Self::UnlinkFut<'c>;
+
+    type ReadMetaFut<'c>: Send + Future<Output = Result<BlockMeta>>
+    where
+        Self: 'c;
+    fn read_meta<'c>(&'c self, from: &'c BlockPath, msg: ReadMeta) -> Self::ReadMetaFut<'c>;
+
+    type WriteMetaFut<'c>: Send + Future<Output = Result<()>>
+    where
+        Self: 'c;
+    fn write_meta<'c>(&'c self, from: &'c BlockPath, msg: WriteMeta) -> Self::WriteMetaFut<'c>;
+
+    type CloseFut<'c>: Send + Future<Output = Result<()>>
+    where
+        Self: 'c;
+    fn close<'c>(&'c self, from: &'c BlockPath, msg: Close) -> Self::CloseFut<'c>;
+
+    type ForgetFut<'c>: Send + Future<Output = Result<()>>
+    where
+        Self: 'c;
+    fn forget<'c>(&'c self, from: &'c BlockPath, msg: Forget) -> Self::ForgetFut<'c>;
+
+    type LockFut<'c>: Send + Future<Output = Result<()>>
+    where
+        Self: 'c;
+    fn lock<'c>(&'c self, from: &'c BlockPath, msg: Lock) -> Self::LockFut<'c>;
+
+    type UnlockFut<'c>: Send + Future<Output = Result<()>>
+    where
+        Self: 'c;
+    fn unlock<'c>(&'c self, from: &'c BlockPath, msg: Unlock) -> Self::UnlockFut<'c>;
 }
 
 struct ServerCallback<P> {
@@ -43,31 +103,40 @@ impl<P> Clone for ServerCallback<P> {
 impl<P: 'static + Send + Sync + FsProvider> MsgCallback for ServerCallback<P> {
     type Arg<'de> = FsMsg<'de>;
     type CallFut<'de> = impl 'de + Future<Output = btlib::Result<()>>;
-    fn call<'de>(&'de self, mut arg: MsgReceived<FsMsg<'de>>) -> Self::CallFut<'de> {
+    fn call<'de>(&'de self, arg: MsgReceived<FsMsg<'de>>) -> Self::CallFut<'de> {
         async move {
+            let (from, body, replier) = arg.into_parts();
             let provider = &self.provider;
-            let reply = match arg.body() {
-                FsMsg::Lookup(lookup) => FsReply::Lookup(provider.lookup(arg.from(), lookup)?),
-                FsMsg::Create(create) => FsReply::Create(provider.create(arg.from(), create)?),
-                FsMsg::Open(open) => FsReply::Open(provider.open(arg.from(), open)?),
-                FsMsg::Read(read) => FsReply::Read(provider.read(arg.from(), read)?),
-                FsMsg::Write(write) => FsReply::Write(provider.write(arg.from(), write)?),
-                FsMsg::Flush(flush) => FsReply::Ack(provider.flush(arg.from(), flush)?),
-                FsMsg::Link(link) => FsReply::Ack(provider.link(arg.from(), link)?),
-                FsMsg::Unlink(unlink) => FsReply::Ack(provider.unlink(arg.from(), unlink)?),
+            let reply = match body {
+                FsMsg::Lookup(lookup) => FsReply::Lookup(provider.lookup(&from, lookup).await?),
+                FsMsg::Create(create) => FsReply::Create(provider.create(&from, create).await?),
+                FsMsg::Open(open) => FsReply::Open(provider.open(&from, open).await?),
+                FsMsg::Read(read) => {
+                    return provider.read(&from, read, move |data| {
+                        let mut replier = replier.unwrap();
+                        RuntimeHandle::current()
+                            .block_on(replier.reply(FsReply::Read(ReadReply { data })))
+                    })?;
+                }
+                FsMsg::Write(write) => FsReply::Write(provider.write(&from, write).await?),
+                FsMsg::Flush(flush) => FsReply::Ack(provider.flush(&from, flush).await?),
+                FsMsg::ReadDir(read_dir) => {
+                    FsReply::ReadDir(provider.read_dir(&from, read_dir).await?)
+                }
+                FsMsg::Link(link) => FsReply::Ack(provider.link(&from, link).await?),
+                FsMsg::Unlink(unlink) => FsReply::Ack(provider.unlink(&from, unlink).await?),
                 FsMsg::ReadMeta(read_meta) => {
-                    FsReply::ReadMeta(provider.read_meta(arg.from(), read_meta)?)
+                    FsReply::ReadMeta(provider.read_meta(&from, read_meta).await?)
                 }
                 FsMsg::WriteMeta(write_meta) => {
-                    FsReply::Ack(provider.write_meta(arg.from(), write_meta)?)
+                    FsReply::Ack(provider.write_meta(&from, write_meta).await?)
                 }
-                FsMsg::Close(close) => FsReply::Ack(provider.close(arg.from(), close)?),
-                FsMsg::Forget(forget) => FsReply::Ack(provider.forget(arg.from(), forget)?),
-                FsMsg::Lock(lock) => FsReply::Ack(provider.lock(arg.from(), lock)?),
-                FsMsg::Unlock(unlock) => FsReply::Ack(provider.unlock(arg.from(), unlock)?),
+                FsMsg::Close(close) => FsReply::Ack(provider.close(&from, close).await?),
+                FsMsg::Forget(forget) => FsReply::Ack(provider.forget(&from, forget).await?),
+                FsMsg::Lock(lock) => FsReply::Ack(provider.lock(&from, lock).await?),
+                FsMsg::Unlock(unlock) => FsReply::Ack(provider.unlock(&from, unlock).await?),
             };
-            let mut replier = arg.take_replier().unwrap();
-            replier.reply(reply).await
+            replier.unwrap().reply(reply).await
         }
     }
 }
@@ -76,7 +145,7 @@ pub fn new_fs_server<C, P>(
     ip_addr: IpAddr,
     creds: Arc<C>,
     provider: Arc<P>,
-) -> Result<impl Receiver, btlib::Error>
+) -> Result<impl Receiver>
 where
     C: 'static + Send + Sync + Creds,
     P: 'static + Send + Sync + FsProvider,

+ 6 - 0
crates/btlib/src/accessor.rs

@@ -36,6 +36,12 @@ mod private {
         }
     }
 
+    impl<T: Size + ReadAt + AsRef<BlockMeta>> Accessor<T> {
+        pub fn get_buf<'a>(&'a self, offset: u64, size: u64) -> Result<&'a [u8]> {
+            self.inner.get_buf(offset, size)
+        }
+    }
+
     impl<T: ReadAt + AsRef<BlockMeta> + Size> Read for Accessor<T> {
         fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
             self.inner.read(buf)

+ 49 - 25
crates/btlib/src/lib.rs

@@ -228,16 +228,16 @@ impl<T: Sectored + Size> Sectored for Cursor<T> {
 /// `Write` instance is given. This can be used to avoid copying in cases where the trait
 /// implementor is already buffering data, so it can give a reference to this buffer to the caller
 /// (for example in `SectoredBuf`)
-trait ReadDual: Read {
+pub trait ReadDual: Read {
     fn read_into<W: Write>(&mut self, write: W, count: usize) -> io::Result<usize>;
 }
 
-trait WriteDual: Write {
+pub trait WriteDual: Write {
     fn write_from<R: Read>(&mut self, read: R, count: usize) -> io::Result<usize>;
 }
 
 /// Trait for streams which can efficiently and infallibly return their current position.
-trait Positioned {
+pub trait Positioned {
     /// Returns the position of this stream (byte offset relative to the beginning).
     fn pos(&self) -> usize;
 }
@@ -254,7 +254,7 @@ impl<T: Positioned> Positioned for &mut T {
     }
 }
 
-trait TrySeek {
+pub trait TrySeek {
     /// Attempts to seek to the given offset from the start of the stream.
     fn try_seek(&mut self, seek_from: SeekFrom) -> io::Result<()>;
 }
@@ -436,7 +436,7 @@ trait ReadExt: Read {
 
 impl<T: Read> ReadExt for T {}
 
-trait SeekFromExt {
+pub trait SeekFromExt {
     /// Returns the absolute position (offset from the start) this `SeekFrom` refers to.
     /// `curr` is called in the case this `SeekFrom` is `Current`, and is expected to return the
     /// current position.
@@ -495,8 +495,8 @@ impl SeekFromExt for SeekFrom {
 /// A unique identifier for a block.
 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Hash)]
 pub struct BlockId {
-    generation: u64,
-    inode: u64,
+    pub generation: u64,
+    pub inode: u64,
 }
 
 impl BlockId {
@@ -515,27 +515,27 @@ impl Default for BlockId {
 #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Hash)]
 pub struct BlockMetaSecrets {
     /// The identifier for the block these secrets are for.
-    block_id: BlockId,
+    pub block_id: BlockId,
     /// Mode of file.
-    mode: u32,
+    pub mode: u32,
     /// Owner UID of file.
-    uid: u32,
+    pub uid: u32,
     /// Owner GID of file.
-    gid: u32,
+    pub gid: u32,
     /// Last access time.
-    atime: Epoch,
+    pub atime: Epoch,
     /// Last data modification.
-    mtime: Epoch,
+    pub mtime: Epoch,
     /// Last status change.
-    ctime: Epoch,
+    pub ctime: Epoch,
     /// Size of the file in bytes.
-    size: u64,
+    pub size: u64,
     /// Number of hard links to the file.
-    nlink: u32,
+    pub nlink: u32,
     /// The sector size used by the block.
-    sect_sz: u64,
+    pub sect_sz: u64,
     /// User controlled metadata.
-    tags: BTreeMap<String, Vec<u8>>,
+    pub tags: BTreeMap<String, Vec<u8>>,
 }
 
 impl BlockMetaSecrets {
@@ -745,20 +745,24 @@ impl BlockMetaBody {
         Ok(())
     }
 
+    pub fn path(&self) -> &BlockPath {
+        &self.path
+    }
+
     pub fn set_path(&mut self, path: BlockPath) {
         self.path = path
     }
 }
 
 /// Signed metadata associated with a block.
-#[derive(Debug, PartialEq, Serialize, Deserialize)]
+#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
 pub struct BlockMeta {
     body: BlockMetaBody,
     sig: Signature,
 }
 
 impl BlockMeta {
-    fn new<C: Creds>(creds: &C) -> Result<BlockMeta> {
+    pub fn new<C: Creds>(creds: &C) -> Result<BlockMeta> {
         let body = BlockMetaBody::new(creds)?;
         let sig = Signature::empty(body.signing_key.scheme());
         Ok(BlockMeta { body, sig })
@@ -1144,7 +1148,7 @@ impl Fragment {
     }
 }
 
-#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
 /// Structure for keeping track of block information in a directory.
 pub struct BlockRecord {
     /// The ID of the block. Note that this is only guaranteed to be unique in the directory this
@@ -1165,7 +1169,7 @@ impl BlockRecord {
     }
 }
 
-#[derive(Debug, PartialEq, Serialize, Deserialize)]
+#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
 /// Structure for keeping track of server information in a directory.
 pub struct ServerRecord {
     /// The most up-to-date address for this server.
@@ -1174,7 +1178,7 @@ pub struct ServerRecord {
     pub_creds: ConcretePub,
 }
 
-#[derive(Debug, PartialEq, Serialize, Deserialize, EnumDiscriminants)]
+#[derive(Debug, PartialEq, Serialize, Deserialize, EnumDiscriminants, Clone)]
 #[strum_discriminants(derive(FromRepr, Display, Serialize, Deserialize))]
 #[strum_discriminants(name(DirEntryKind))]
 pub enum DirEntry {
@@ -1232,11 +1236,31 @@ impl Directory {
         }
     }
 
+    pub fn num_entries(&self) -> usize {
+        self.entries.len()
+    }
+
     pub fn entries(&self) -> impl Iterator<Item = (&str, &DirEntry)> {
         self.entries
             .iter()
             .map(|(name, entry)| (name.as_str(), entry))
     }
+
+    pub fn entry(&self, name: &str) -> Option<&DirEntry> {
+        self.entries.get(name)
+    }
+
+    pub fn contains_entry(&self, name: &str) -> bool {
+        self.entries.contains_key(name)
+    }
+
+    pub fn insert_entry(&mut self, name: String, entry: DirEntry) -> Option<DirEntry> {
+        self.entries.insert(name, entry)
+    }
+
+    pub fn remove_entry(&mut self, name: &str) -> Option<DirEntry> {
+        self.entries.remove(name)
+    }
 }
 
 impl Default for Directory {
@@ -1246,7 +1270,7 @@ impl Default for Directory {
 }
 
 /// Keeps track of which principal is storing a fragment.
-#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
 pub struct FragmentRecord {
     /// The fragment serial number this record is for.
     serial: FragmentSerial,
@@ -1383,7 +1407,7 @@ impl Sub<Duration> for Epoch {
 }
 
 /// The serial number of a block fragment.
-#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Hashable)]
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Hashable, Clone)]
 pub struct FragmentSerial(u32);
 
 /// A struct which may contain a closure. When this struct is dropped, if it contains a closure

+ 19 - 0
crates/btlib/src/sectored_buf.rs

@@ -315,6 +315,25 @@ mod private {
             self.pos = pos_new.try_into().box_err()?;
             Ok(pos_new)
         }
+
+        /// Returns a slice of the internal buffer that starts at the given offset in the inner
+        /// stream, and which is no longer than the given size. Note that this slice may
+        /// be shorter than the `size` parameter if the number of bytes in the internal buffer is
+        /// less than `size`.
+        pub fn get_buf<'a>(&'a self, offset: u64, size: u64) -> Result<&'a [u8]> {
+            let offset: usize = offset.try_into().unwrap();
+            let size: usize = size.try_into().unwrap();
+            let sect_sz = self.sector_sz();
+            let index = offset / sect_sz;
+            if self.buf_start != sect_sz * index {
+                return Err(bterr!(
+                    "SectoredBuf in wrong position to return buf for offset {offset}, size {size}"
+                ));
+            }
+            let start = offset % sect_sz;
+            let end = self.buf.len().min(start + size);
+            Ok(&self.buf[start..end])
+        }
     }
 
     impl<T: Seek + Read + Write + MetaAccess> Seek for SectoredBuf<T> {

+ 16 - 4
crates/btmsg/src/lib.rs

@@ -169,6 +169,10 @@ impl<T> MsgReceived<T> {
         Self { from, msg, replier }
     }
 
+    pub fn into_parts(self) -> (Arc<BlockPath>, T, Option<Replier>) {
+        (self.from, self.msg.msg, self.replier)
+    }
+
     /// The path from which this message was received.
     pub fn from(&self) -> &Arc<BlockPath> {
         &self.from
@@ -183,11 +187,8 @@ impl<T> MsgReceived<T> {
     pub fn needs_reply(&self) -> bool {
         self.replier.is_some()
     }
-}
 
-impl<'de, T: CallMsg<'de>> MsgReceived<T> {
-    /// Returns a type which can be used to reply to this message, if this message requires a
-    /// reply and it has not yet been sent.
+    /// Takes the replier out of this struct and returns it, if it has not previously been returned.
     pub fn take_replier(&mut self) -> Option<Replier> {
         self.replier.take()
     }
@@ -225,6 +226,17 @@ pub trait Transmitter {
 
     /// Transmit a message to the connected [Receiver], waits for a reply, then calls the given
     /// [DeserCallback] with the deserialized reply.
+    ///
+    /// ## WARNING
+    /// The callback must be such that `F::Arg<'a> = T::Reply<'a>` for any `'a`. If this
+    /// is violated, then a deserilization error will occur at runtime.
+    ///
+    /// ## TODO
+    /// This issue needs to be fixed. Due to the fact that
+    /// `F::Arg` is a Generic Associated Type I have been unable to express this constraint in the
+    /// where clause of this method. I'm not sure if the errors I've encountered are due to a lack
+    /// of understanding on my part or due to the current limitations of the borrow checker in
+    /// its handling GATs.
     fn call<'call, T, F>(&'call self, msg: T, callback: F) -> Self::CallFut<'call, T, F>
     where
         T: 'call + CallMsg<'call>,