// SPDX-License-Identifier: AGPL-3.0-or-later
use btfproto::{
    local_fs::{LocalFs, ModeAuthorizer},
    msg::{GrantAccess, SpecInodes},
    server::FsProvider,
};
use btlib::{
    crypto::{Creds, CredsPriv, CredsPub},
    AuthzAttrs, BlockError, BlockPath, IssuedProcRec,
};
use std::{
    net::{IpAddr, Ipv6Addr},
    path::PathBuf,
    sync::Arc,
};
use tempdir::TempDir;

pub fn bind_path<C: CredsPriv>(creds: C) -> BlockPath {
    let writecap = creds
        .writecap()
        .ok_or(btlib::BlockError::MissingWritecap)
        .unwrap();
    writecap.bind_path()
}

pub type ConcreteFs = LocalFs<ModeAuthorizer>;

pub struct LocalFsTest {
    dir: TempDir,
    fs: ConcreteFs,
    node_bind_path: Arc<BlockPath>,
}

fn node_creds() -> Arc<dyn Creds> {
    Arc::new(super::node_creds())
}

fn root_creds() -> Arc<dyn Creds> {
    Arc::new(super::root_creds())
}

impl LocalFsTest {
    pub const NODE_UID: u32 = 1000;
    pub const NODE_GID: u32 = 1000;

    pub async fn new_empty() -> LocalFsTest {
        let dir = TempDir::new("fuse").expect("failed to create temp dir");
        let node_creds = node_creds();
        Self::grant_node_access(dir.path().to_owned()).await;
        let node_writecap = node_creds
            .writecap()
            .ok_or(BlockError::MissingWritecap)
            .unwrap();
        let node_bind_path = Arc::new(node_writecap.bind_path());
        let fs = LocalFs::new_existing(dir.path().to_owned(), node_creds, ModeAuthorizer)
            .expect("failed to create empty blocktree");
        LocalFsTest {
            dir,
            fs,
            node_bind_path,
        }
    }

    pub fn new_existing(dir: TempDir) -> LocalFsTest {
        let fs = LocalFs::new_existing(dir.path().to_owned(), node_creds(), ModeAuthorizer)
            .expect("failed to create blocktree from existing directory");
        let from = Arc::new(bind_path(node_creds()));
        LocalFsTest {
            dir,
            fs,
            node_bind_path: from,
        }
    }

    async fn grant_node_access(path: PathBuf) {
        let root_creds = root_creds();
        let root_writecap = root_creds
            .writecap()
            .ok_or(BlockError::MissingWritecap)
            .unwrap();
        let root_bind_path = Arc::new(root_writecap.bind_path());
        let node_creds = node_creds();
        let node_writecap = node_creds
            .writecap()
            .ok_or(BlockError::MissingWritecap)
            .unwrap();
        let fs = LocalFs::new_empty(path, 0, root_creds, ModeAuthorizer)
            .await
            .unwrap();

        let proc_rec = IssuedProcRec {
            addr: IpAddr::V6(Ipv6Addr::LOCALHOST),
            pub_creds: node_creds.concrete_pub(),
            writecap: node_writecap.to_owned(),
            authz_attrs: AuthzAttrs {
                uid: Self::NODE_UID,
                gid: Self::NODE_GID,
                supp_gids: Vec::new(),
            },
        };
        let msg = GrantAccess {
            inode: SpecInodes::RootDir.into(),
            record: proc_rec,
        };
        fs.grant_access(&root_bind_path, msg).await.unwrap();
    }

    pub fn into_parts(self) -> (TempDir, ConcreteFs, Arc<BlockPath>) {
        (self.dir, self.fs, self.node_bind_path)
    }

    pub fn fs(&self) -> &ConcreteFs {
        &self.fs
    }

    pub fn from(&self) -> &Arc<BlockPath> {
        &self.node_bind_path
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use btfproto::{local_fs::Error, msg::*};
    use btlib::{Inode, Result, SECTOR_SZ_DEFAULT};
    use btlib_tests::fs_queries::num_files;
    use std::{
        fs::read_dir,
        io::{self, Cursor, Write as IoWrite},
        ops::Deref,
        sync::Arc,
    };

    /// Tests that a new file can be created, written to and the written data can be read from it.
    #[tokio::test]
    async fn create_write_read() {
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let from = case.from();
        let name = "README.md";
        let flags = Flags::new(libc::O_RDWR);

        let create_msg = Create {
            parent: SpecInodes::RootDir.into(),
            name,
            flags,
            mode: libc::S_IFREG | 0o644,
            umask: 0,
        };
        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();

        const LEN: usize = 32;
        let expected = [1u8; LEN];
        let write_msg = Write {
            inode,
            handle,
            offset: 0,
            data: expected.as_slice(),
        };
        let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
        assert_eq!(LEN as u64, written);

        let read_msg = Read {
            inode,
            handle,
            offset: 0,
            size: LEN as u64,
        };
        let guard = bt.read(from, read_msg).await.unwrap();

        assert_eq!(expected, guard.deref())
    }

    #[tokio::test]
    async fn lookup() {
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let from = case.from();

        let name = "README.md";
        let create_msg = Create {
            parent: SpecInodes::RootDir.into(),
            name,
            flags: Flags::default(),
            mode: 0,
            umask: 0,
        };
        let create_reply = bt.create(from, create_msg).await.unwrap();

        let lookup_msg = Lookup {
            parent: SpecInodes::RootDir.into(),
            name,
        };
        let lookup_reply = bt.lookup(from, lookup_msg).await.unwrap();

        assert_eq!(create_reply.inode, lookup_reply.inode);
    }

    /// Tests that data written by one instance of [Blocktree] can be read by a subsequent
    /// instance.
    #[tokio::test]
    async fn new_existing() {
        const EXPECTED: &[u8] = b"cool as cucumbers";
        let name = "RESIGNATION.docx";
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let from = case.from();
        let flags = Flags::new(libc::O_RDWR);

        {
            let create_msg = Create {
                parent: SpecInodes::RootDir.into(),
                name,
                mode: libc::S_IFREG | 0o644,
                flags,
                umask: 0,
            };
            let CreateReply { handle, inode, .. } = bt.create(from, create_msg).await.unwrap();

            let write_msg = Write {
                inode,
                handle,
                offset: 0,
                data: EXPECTED,
            };
            let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
            assert_eq!(EXPECTED.len() as u64, written);

            let close_msg = Close { inode, handle };
            bt.close(from, close_msg).await.unwrap();
        }

        let case = LocalFsTest::new_existing(case.dir);
        let from = case.from();
        let bt = &case.fs;

        let lookup_msg = Lookup {
            parent: SpecInodes::RootDir.into(),
            name,
        };
        let LookupReply { inode, .. } = bt.lookup(from, lookup_msg).await.unwrap();

        let open_msg = Open {
            inode,
            flags: Flags::new(libc::O_RDONLY),
        };
        let OpenReply { handle, .. } = bt.open(from, open_msg).await.unwrap();

        let read_msg = Read {
            inode,
            handle,
            offset: 0,
            size: EXPECTED.len() as u64,
        };
        let guard = bt.read(from, read_msg).await.unwrap();

        assert_eq!(EXPECTED, guard.deref())
    }

    /// Tests that an error is returned by the `Blocktree::write` method if the file was opened
    /// read-only.
    #[tokio::test]
    async fn open_read_only_write_is_error() {
        let name = "books.ods";
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let from = case.from();

        let create_msg = Create {
            parent: SpecInodes::RootDir.into(),
            name,
            flags: Flags::default(),
            mode: libc::S_IFREG | 0o644,
            umask: 0,
        };
        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();

        let close_msg = Close { inode, handle };
        bt.close(from, close_msg).await.unwrap();

        let open_msg = Open {
            inode,
            flags: libc::O_RDONLY.into(),
        };
        let OpenReply { handle, .. } = bt.open(from, open_msg).await.unwrap();

        let data = [1u8; 32];
        let write_msg = Write {
            inode,
            handle,
            offset: 0,
            data: data.as_slice(),
        };
        let result = bt.write(from, write_msg).await;

        let err = result.err().unwrap();
        let err = err.downcast::<Error>().unwrap();
        let actual_handle = if let Error::ReadOnlyHandle(actual_handle) = err {
            Some(actual_handle)
        } else {
            None
        };
        assert_eq!(Some(handle), actual_handle);
    }

    /// Asserts that the given [Result] is an [Err] and that it contains an [io::Error] which
    /// corresponds to the [libc::ENOENT] error code.
    fn assert_enoent<T>(result: Result<T>) {
        let err = result.err().unwrap().downcast::<io::Error>().unwrap();
        assert_eq!(libc::ENOENT, err.raw_os_error().unwrap());
    }

    /// Tests that multiple handles see consistent metadata associated with a block.
    #[tokio::test]
    async fn ensure_metadata_consistency() {
        let case = LocalFsTest::new_empty().await;
        let from = case.from();
        let trash = ".Trash";
        let file = "file.txt";
        let bt = &case.fs;
        let root = SpecInodes::RootDir.into();

        let open_msg = Open {
            inode: root,
            flags: libc::O_DIRECTORY.into(),
        };
        let OpenReply { handle, .. } = bt.open(from, open_msg).await.unwrap();

        // Because the directory is open, this will cause a new handle for this block to be opened.
        let lookup_msg = Lookup {
            parent: root,
            name: trash,
        };
        let result = bt.lookup(from, lookup_msg).await;
        assert_enoent(result);

        let close_msg = Close {
            inode: root,
            handle,
        };
        bt.close(from, close_msg).await.unwrap();

        let create_msg = Create {
            parent: root,
            name: file,
            flags: Flags::default(),
            mode: libc::S_IFREG | 0o644,
            umask: 0,
        };
        bt.create(from, create_msg).await.unwrap();

        let open_msg = Open {
            inode: root,
            flags: libc::O_DIRECTORY.into(),
        };
        bt.open(from, open_msg).await.unwrap();

        // Since the directory is open, the second handle will be used for this lookup.
        let lookup_msg = Lookup {
            parent: root,
            name: trash,
        };
        let result = bt.lookup(from, lookup_msg).await;
        assert!(result.is_err());
    }

    /// Tests that the `size` parameter actually limits the number of read bytes.
    #[tokio::test]
    async fn read_with_smaller_size() {
        const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
        let case = LocalFsTest::new_empty().await;
        let from = case.from();
        let file = "file.txt";
        let bt = &case.fs;
        let root: Inode = SpecInodes::RootDir.into();

        let create_msg = Create {
            parent: root,
            name: file,
            flags: libc::O_RDWR.into(),
            mode: libc::S_IFREG | 0o644,
            umask: 0,
        };
        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
        let write_msg = Write {
            inode,
            handle,
            offset: 0,
            data: DATA.as_slice(),
        };
        let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
        assert_eq!(DATA.len() as u64, written);
        const SIZE: usize = DATA.len() / 2;
        let read_msg = Read {
            inode,
            handle,
            offset: 0,
            size: SIZE as u64,
        };
        let guard = bt.read(from, read_msg).await.unwrap();

        assert_eq!(&[0, 1, 2, 3], guard.deref());
    }

    /// Returns an integer array starting at the given value and increasing by one for each
    /// subsequent entry.
    pub const fn integer_array<const N: usize>(start: u8) -> [u8; N] {
        let mut array = [0u8; N];
        let mut k = 0usize;
        while k < N {
            array[k] = start.wrapping_add(k as u8);
            k += 1;
        }
        array
    }

    #[tokio::test]
    async fn concurrent_reads() {
        // The size of each of the reads.
        const SIZE: usize = 4;
        // The number of concurrent reads.
        const NREADS: usize = 32;
        const DATA_LEN: usize = SIZE * NREADS;
        const DATA: [u8; DATA_LEN] = integer_array::<DATA_LEN>(0);
        let case = LocalFsTest::new_empty().await;
        let from = case.from();
        let file = "file.txt";
        let bt = &case.fs;
        let root: Inode = SpecInodes::RootDir.into();
        let mode = libc::S_IFREG | 0o644;

        let create_msg = Create {
            parent: root,
            name: file,
            flags: libc::O_RDWR.into(),
            mode,
            umask: 0,
        };
        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
        let write_msg = Write {
            inode,
            handle,
            offset: 0,
            data: DATA.as_slice(),
        };
        let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
        assert_eq!(DATA.len() as u64, written);
        let case = Arc::new(case);

        let mut handles = Vec::with_capacity(NREADS);
        for offset in (0..NREADS).map(|e| e * SIZE) {
            let case = case.clone();
            handles.push(tokio::spawn(async move {
                // Notice that we have concurrent reads to different offsets using the same handle.
                // Without proper synchronization, this shouldn't work.
                let read_msg = Read {
                    inode,
                    handle,
                    offset: offset as u64,
                    size: SIZE as u64,
                };
                let guard = case.fs.read(case.from(), read_msg).await.unwrap();
                let expected = integer_array::<SIZE>(offset as u8);
                assert_eq!(&expected, guard.deref());
            }));
        }
        for handle in handles {
            handle.await.unwrap();
        }
    }

    #[tokio::test]
    async fn rename_in_same_directory() {
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let from = case.from();
        let root: Inode = SpecInodes::RootDir.into();
        let src_name = "src";
        let dst_name = "dst";

        let create_msg = Create {
            parent: root,
            name: src_name,
            flags: Flags::default(),
            mode: libc::S_IFREG | 0o644,
            umask: 0,
        };
        let CreateReply { inode, .. } = bt.create(from, create_msg).await.unwrap();
        let link_msg = Link {
            inode,
            new_parent: root,
            name: dst_name,
        };
        bt.link(from, link_msg).await.unwrap();
        let unlink_msg = Unlink {
            parent: root,
            name: src_name,
        };
        bt.unlink(from, unlink_msg).await.unwrap();

        let lookup_msg = Lookup {
            parent: root,
            name: dst_name,
        };
        let LookupReply {
            inode: actual_inode,
            ..
        } = bt.lookup(from, lookup_msg).await.unwrap();
        assert_eq!(inode, actual_inode);
        let lookup_msg = Lookup {
            parent: root,
            name: src_name,
        };
        let result = bt.lookup(from, lookup_msg).await;
        assert_enoent(result);
    }

    #[tokio::test]
    async fn rename_to_different_directory() {
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let from = case.from();
        let root = SpecInodes::RootDir.into();
        let dir_name = "dir";
        let file_name = "file";

        let create_msg = Create {
            parent: root,
            name: dir_name,
            flags: libc::O_DIRECTORY.into(),
            mode: libc::S_IFDIR | 0o755,
            umask: 0,
        };
        let CreateReply { inode: dir, .. } = bt.create(from, create_msg).await.unwrap();
        let create_msg = Create {
            parent: root,
            name: file_name,
            flags: Flags::default(),
            mode: libc::S_IFREG | 0o644,
            umask: 0,
        };
        let CreateReply { inode: file, .. } = bt.create(from, create_msg).await.unwrap();
        let link_msg = Link {
            inode: file,
            new_parent: dir,
            name: file_name,
        };
        bt.link(from, link_msg).await.unwrap();
        let unlink_msg = Unlink {
            parent: root,
            name: file_name,
        };
        bt.unlink(from, unlink_msg).await.unwrap();

        let lookup_msg = Lookup {
            parent: dir,
            name: file_name,
        };
        let LookupReply {
            inode: actual_inode,
            ..
        } = bt.lookup(from, lookup_msg).await.unwrap();
        assert_eq!(file, actual_inode);
        let lookup_msg = Lookup {
            parent: root,
            name: file_name,
        };
        let result = bt.lookup(from, lookup_msg).await;
        assert_enoent(result);
    }

    #[tokio::test]
    async fn rename_no_replace_same_directory() {
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let from = case.from();
        let root: Inode = SpecInodes::RootDir.into();
        let oldname = "old";
        let newname = "new";

        let create_msg = Create {
            parent: root,
            name: oldname,
            flags: Flags::default(),
            mode: 0o644,
            umask: 0,
        };
        let CreateReply { inode, .. } = bt.create(from, create_msg).await.unwrap();
        let create_msg = Create {
            parent: root,
            name: newname,
            flags: Flags::default(),
            mode: 0o644,
            umask: 0,
        };
        bt.create(from, create_msg).await.unwrap();

        let link_msg = Link {
            inode,
            new_parent: root,
            name: newname,
        };
        let result = bt.link(from, link_msg).await;
        let err = result.err().unwrap().downcast::<io::Error>().unwrap();
        assert_eq!(io::ErrorKind::AlreadyExists, err.kind());
    }

    #[tokio::test]
    async fn rename_no_replace_different_directory() {
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let from = case.from();
        let root = SpecInodes::RootDir.into();
        let dir_name = "dir";
        let file_name = "file";

        let create_msg = Create {
            parent: root,
            name: dir_name,
            flags: libc::O_DIRECTORY.into(),
            mode: 0o755,
            umask: 0,
        };
        let CreateReply { inode: dir, .. } = bt.create(from, create_msg).await.unwrap();
        let create_msg = Create {
            parent: root,
            name: file_name,
            flags: Flags::default(),
            mode: 0o644,
            umask: 0,
        };
        let CreateReply { inode, .. } = bt.create(from, create_msg).await.unwrap();
        let create_msg = Create {
            parent: dir,
            name: file_name,
            flags: Flags::default(),
            mode: 0o644,
            umask: 0,
        };
        bt.create(from, create_msg).await.unwrap();

        let link_msg = Link {
            inode,
            new_parent: dir,
            name: file_name,
        };
        let result = bt.link(from, link_msg).await;
        let err = result.err().unwrap().downcast::<io::Error>().unwrap();
        assert_eq!(io::ErrorKind::AlreadyExists, err.kind());
    }

    #[tokio::test]
    async fn read_from_non_owner_is_err() {
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let name = "file.txt";
        let owner = case.from();
        let mut other = owner.as_ref().clone();
        other.push_component("subdir".to_owned());
        let other = Arc::new(other);

        let create_msg = Create {
            parent: SpecInodes::RootDir.into(),
            name,
            flags: libc::O_RDWR.into(),
            mode: 0o644,
            umask: 0,
        };
        let CreateReply { inode, handle, .. } = bt.create(owner, create_msg).await.unwrap();
        let write_msg = Write {
            inode,
            handle,
            offset: 0,
            data: [1, 2, 3].as_slice(),
        };
        let result = bt.write(&other, write_msg).await;

        let err = result.err().unwrap().downcast::<Error>().unwrap();
        let matched = if let Error::WrongOwner = err {
            true
        } else {
            false
        };
        assert!(matched)
    }

    #[tokio::test]
    async fn allocate_full_sectors_zero_remain_non_zero() {
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let name = "file.txt";
        let from = case.from();

        let create_msg = Create {
            parent: SpecInodes::RootDir.into(),
            name,
            flags: libc::O_RDWR.into(),
            mode: 0o644,
            umask: 0,
        };
        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
        const LEN: u64 = 8;
        let alloc_msg = Allocate {
            inode,
            handle,
            offset: None,
            size: LEN,
        };
        bt.allocate(from, alloc_msg).await.unwrap();
        let read_meta_msg = ReadMeta {
            inode,
            handle: Some(handle),
        };
        let ReadMetaReply { attrs, .. } = bt.read_meta(from, read_meta_msg).await.unwrap();
        let read_msg = Read {
            inode,
            handle,
            offset: 0,
            size: LEN,
        };
        let guard = bt.read(from, read_msg).await.unwrap();

        assert_eq!([0u8; 8], guard.deref());
        assert_eq!(guard.len() as u64, attrs.size);
    }

    #[tokio::test]
    async fn allocate_full_sectors_non_zero_remain_non_zero() {
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let name = "file.txt";
        let from = case.from();

        let create_msg = Create {
            parent: SpecInodes::RootDir.into(),
            name,
            flags: libc::O_RDWR.into(),
            mode: 0o644,
            umask: 0,
        };
        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
        const LEN: usize = SECTOR_SZ_DEFAULT + 1;
        let mut size = LEN as u64;
        let alloc_msg = Allocate {
            inode,
            handle,
            offset: None,
            size,
        };
        bt.allocate(from, alloc_msg).await.unwrap();
        let mut actual = Cursor::new(Vec::with_capacity(LEN));
        while size > 0 {
            let read_msg = Read {
                inode,
                handle,
                offset: 0,
                size,
            };
            let guard = bt.read(from, read_msg).await.unwrap();
            let data = guard.deref();
            actual.write(data).unwrap();
            size -= data.len() as u64;
        }
        let read_meta_msg = ReadMeta {
            inode,
            handle: Some(handle),
        };
        let ReadMetaReply { attrs, .. } = bt.read_meta(from, read_meta_msg).await.unwrap();

        assert!(vec![0u8; LEN].eq(&actual.into_inner()));
        assert_eq!(LEN as u64, attrs.size);
    }

    #[tokio::test]
    async fn allocate_full_sectors_non_zero_remain_zero() {
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let name = "file.txt";
        let from = case.from();

        let create_msg = Create {
            parent: SpecInodes::RootDir.into(),
            name,
            flags: libc::O_RDWR.into(),
            mode: 0o644,
            umask: 0,
        };
        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
        const LEN: usize = SECTOR_SZ_DEFAULT;
        let size = LEN as u64;
        let alloc_msg = Allocate {
            inode,
            handle,
            offset: None,
            size,
        };
        bt.allocate(from, alloc_msg).await.unwrap();
        let read_meta_msg = ReadMeta {
            inode,
            handle: Some(handle),
        };
        let ReadMetaReply { attrs, .. } = bt.read_meta(from, read_meta_msg).await.unwrap();
        let read_msg = Read {
            inode,
            handle,
            offset: 0,
            size,
        };
        let guard = bt.read(from, read_msg).await.unwrap();

        assert_eq!(vec![0u8; LEN], guard.deref());
        assert_eq!(LEN as u64, attrs.size);
    }

    /// Tests that when the new_size of the block is not greater than the current size of the block,
    /// then no change to the block occurs.
    #[tokio::test]
    async fn allocate_new_size_not_greater_than_curr_size() {
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let name = "file.txt";
        let from = case.from();

        let create_msg = Create {
            parent: SpecInodes::RootDir.into(),
            name,
            flags: libc::O_RDWR.into(),
            mode: 0o644,
            umask: 0,
        };
        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
        const LEN: usize = 8;
        let write_msg = Write {
            inode,
            handle,
            offset: 0,
            data: [1u8; LEN].as_slice(),
        };
        let WriteReply { written, .. } = bt.write(from, write_msg).await.unwrap();
        assert_eq!(LEN as u64, written);
        let alloc_msg = Allocate {
            inode,
            handle,
            offset: None,
            size: (LEN / 2) as u64,
        };
        bt.allocate(from, alloc_msg).await.unwrap();
        let read_meta_msg = ReadMeta {
            inode,
            handle: Some(handle),
        };
        let ReadMetaReply { attrs, .. } = bt.read_meta(from, read_meta_msg).await.unwrap();
        let read_msg = Read {
            inode,
            handle,
            offset: 0,
            size: LEN as u64,
        };
        let actual = bt.read(from, read_msg).await.unwrap();

        assert_eq!([1u8; LEN], actual.deref());
        assert_eq!(LEN as u64, attrs.size);
    }

    #[tokio::test]
    async fn read_at_non_current_position() {
        const FILENAME: &str = "MANIFESTO.rtf";
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let from = case.from();

        let msg = Create {
            parent: SpecInodes::RootDir.into(),
            name: FILENAME,
            flags: FlagValue::ReadWrite.into(),
            mode: 0o644,
            umask: 0,
        };
        let CreateReply {
            inode,
            handle,
            entry,
            ..
        } = bt.create(from, msg).await.unwrap();
        let sect_sz64 = entry.attr.sect_sz;
        let sect_sz: usize = sect_sz64.try_into().unwrap();
        let mut data = vec![1u8; sect_sz];
        let msg = Write {
            inode,
            handle,
            offset: 0,
            data: data.as_slice(),
        };
        let WriteReply { written, .. } = bt.write(from, msg).await.unwrap();
        assert_eq!(sect_sz64, written);
        data.truncate(0);
        data.extend(std::iter::repeat(2).take(sect_sz));
        let msg = Write {
            inode,
            handle,
            offset: sect_sz64,
            data: data.as_slice(),
        };
        let WriteReply { written, .. } = bt.write(from, msg).await.unwrap();
        assert_eq!(sect_sz64, written);
        // The Accessor for  this block should now have the second sector loaded, so it will have to
        // seek back to the first in order to respond to this read request.
        let msg = Read {
            inode,
            handle,
            offset: 0,
            size: sect_sz64,
        };
        let guard = bt.read(from, msg).await.unwrap();

        assert!(guard
            .deref()
            .iter()
            .map(|e| *e)
            .eq(std::iter::repeat(1u8).take(sect_sz)));
    }

    #[tokio::test]
    async fn unlink_after_forget_file_is_deleted() {
        const FILENAME: &str = "file";
        let case = LocalFsTest::new_empty().await;
        let block_dir = case.dir.path();
        let bt = &case.fs;
        let from = case.from();
        let expected = num_files(block_dir).unwrap();

        let msg = Create {
            parent: SpecInodes::RootDir.into(),
            name: FILENAME,
            flags: FlagValue::ReadWrite.into(),
            mode: 0o644,
            umask: 0,
        };
        let CreateReply { inode, handle, .. } = bt.create(from, msg).await.unwrap();
        const DATA: [u8; 8] = [1u8; 8];
        let msg = Write {
            inode,
            handle,
            offset: 0,
            data: DATA.as_slice(),
        };
        bt.write(from, msg).await.unwrap();
        let msg = Close { inode, handle };
        bt.close(from, msg).await.unwrap();
        let more = num_files(block_dir).unwrap();
        assert!(more > expected);
        let msg = Forget { inode, count: 1 };
        bt.forget(from, msg).await.unwrap();
        let msg = Unlink {
            parent: SpecInodes::RootDir.into(),
            name: FILENAME,
        };
        bt.unlink(from, msg).await.unwrap();

        let actual = num_files(block_dir).unwrap();
        assert_eq!(expected, actual);
    }

    #[tokio::test]
    async fn forget_after_unlink_file_is_deleted() {
        const FILENAME: &str = "file";
        let case = LocalFsTest::new_empty().await;
        let block_dir = case.dir.path();
        let bt = &case.fs;
        let from = case.from();
        let expected = num_files(block_dir).unwrap();

        let msg = Create {
            parent: SpecInodes::RootDir.into(),
            name: FILENAME,
            flags: FlagValue::ReadWrite.into(),
            mode: 0o644,
            umask: 0,
        };
        let CreateReply { inode, handle, .. } = bt.create(from, msg).await.unwrap();
        const DATA: [u8; 8] = [1u8; 8];
        let msg = Write {
            inode,
            handle,
            offset: 0,
            data: DATA.as_slice(),
        };
        bt.write(from, msg).await.unwrap();
        let msg = Close { inode, handle };
        bt.close(from, msg).await.unwrap();
        let more = num_files(block_dir).unwrap();
        let msg = Unlink {
            parent: SpecInodes::RootDir.into(),
            name: FILENAME,
        };
        bt.unlink(from, msg).await.unwrap();
        assert!(more > expected);
        let msg = Forget { inode, count: 1 };
        bt.forget(from, msg).await.unwrap();

        let actual = num_files(block_dir).unwrap();
        assert_eq!(expected, actual);
    }

    #[tokio::test]
    async fn after_unlink_no_empty_directories() {
        const FILENAME: &str = "file";
        let case = LocalFsTest::new_empty().await;
        let bt = &case.fs;
        let from = case.from();

        let msg = Create {
            parent: SpecInodes::RootDir.into(),
            name: FILENAME,
            flags: FlagValue::ReadWrite.into(),
            mode: 0o644,
            umask: 0,
        };
        let CreateReply { inode, handle, .. } = bt.create(from, msg).await.unwrap();
        const DATA: [u8; 8] = [1u8; 8];
        let msg = Write {
            inode,
            handle,
            offset: 0,
            data: DATA.as_slice(),
        };
        bt.write(from, msg).await.unwrap();
        let msg = Close { inode, handle };
        bt.close(from, msg).await.unwrap();
        let msg = Forget { inode, count: 1 };
        bt.forget(from, msg).await.unwrap();
        let msg = Unlink {
            parent: SpecInodes::RootDir.into(),
            name: FILENAME,
        };
        bt.unlink(from, msg).await.unwrap();

        let mut path = case.dir.path().to_owned();
        let entries = read_dir(&path).unwrap();
        path.push("x");
        for entry in entries {
            let entry = entry.unwrap();
            if !entry.file_type().unwrap().is_dir() {
                continue;
            }
            path.pop();
            path.push(entry.file_name());
            let empty = read_dir(&path).unwrap().next().is_none();
            assert!(!empty);
        }
    }
}