소스 검색

Made major modification to the Block trait.

It is now setup to allow reads to be performed using immutable references to
Block implementations. This was accomplished by using the positioned_io crate
and replacing `Read` with its stateless counterpart, `ReadAt`.

When a block is to be written to or read from, an `Accessor` is created for it.
This holds the cipher context for reads and writes, as well as buffers the current
sector of data.
Matthew Carr 2 년 전
부모
커밋
cb02f2cbd8

+ 2 - 0
.gitignore

@@ -13,3 +13,5 @@ crates/scratch/
 *.blg
 *.bcf
 *.xml
+*.nav
+*.snm

+ 12 - 0
Cargo.lock

@@ -217,6 +217,7 @@ dependencies = [
  "log",
  "openssl",
  "os_pipe",
+ "positioned-io",
  "serde",
  "serde-big-array",
  "static_assertions",
@@ -1147,6 +1148,17 @@ dependencies = [
  "plotters-backend",
 ]
 
+[[package]]
+name = "positioned-io"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "09b9485cf7f528baf34edd811ec8283a168864912e11d0b7d3e0510738761114"
+dependencies = [
+ "byteorder",
+ "libc",
+ "winapi",
+]
+
 [[package]]
 name = "proc-macro2"
 version = "1.0.43"

+ 1 - 0
crates/btlib/Cargo.toml

@@ -30,6 +30,7 @@ libc = "0.2.137"
 env_logger = { version = "0.9.0" }
 chrono = "0.4.23"
 anyhow = { version = "1.0.66", features = ["std", "backtrace"] }
+positioned-io = "0.3.1"
 
 [dev-dependencies]
 tempdir = { version = "0.3.7" }

+ 1 - 1
crates/btlib/benches/block_benches.rs

@@ -1,7 +1,7 @@
 //! Benchmarks for implementations of the [Block] trait.
 //! You can run these with `cargo bench`.
 
-use std::{fs::OpenOptions, time::Duration};
+use std::{fs::OpenOptions, io::Write, time::Duration};
 
 use btlib::{
     crypto::{ConcreteCreds, Creds},

+ 144 - 0
crates/btlib/src/accessor.rs

@@ -0,0 +1,144 @@
+use positioned_io::{ReadAt, Size, WriteAt};
+use std::io::{Read, Seek, Write};
+
+use crate::{
+    sectored_buf::SectoredBuf, BlockMeta, Cursor, Decompose, FlushMeta, MetaAccess, Result,
+    SecretStream, Sectored, Split, TryCompose,
+};
+
+pub use private::Accessor;
+
+mod private {
+    use super::*;
+
+    pub struct Accessor<T: Size> {
+        inner: SectoredBuf<SecretStream<Cursor<T>>>,
+    }
+
+    impl<T: ReadAt + Sectored + AsRef<BlockMeta> + Size> Accessor<T> {
+        pub fn new(inner: T) -> Result<Accessor<T>> {
+            let meta: &BlockMeta = inner.as_ref();
+            let key = meta.body.block_key()?.clone();
+            let inner = SecretStream::new(key).try_compose(Cursor::new(inner))?;
+            Ok(Self {
+                inner: SectoredBuf::new().try_compose(inner)?,
+            })
+        }
+    }
+
+    impl<T: Size> Accessor<T> {
+        pub fn get_ref(&self) -> &T {
+            self.inner.get_ref().get_ref().get_ref()
+        }
+
+        pub fn get_mut(&mut self) -> &mut T {
+            self.inner.get_mut().get_mut().get_mut()
+        }
+    }
+
+    impl<T: ReadAt + AsRef<BlockMeta> + Size> Read for Accessor<T> {
+        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+            self.inner.read(buf)
+        }
+    }
+
+    impl<T: ReadAt + WriteAt + MetaAccess> Write for Accessor<T> {
+        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+            self.inner.write(buf)
+        }
+
+        fn flush(&mut self) -> std::io::Result<()> {
+            self.inner.flush()
+        }
+    }
+
+    impl<T: ReadAt + WriteAt + MetaAccess> Seek for Accessor<T> {
+        fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
+            self.inner.seek(pos)
+        }
+    }
+
+    impl<U, T: AsRef<U> + Size> AsRef<U> for Accessor<T> {
+        fn as_ref(&self) -> &U {
+            self.inner.get_ref().as_ref()
+        }
+    }
+
+    impl<U, T: AsMut<U> + Size> AsMut<U> for Accessor<T> {
+        fn as_mut(&mut self) -> &mut U {
+            self.inner.get_mut().as_mut()
+        }
+    }
+
+    impl<T: Size> Decompose<T> for Accessor<T> {
+        fn into_inner(self) -> T {
+            self.inner.into_inner().into_inner().into_inner()
+        }
+    }
+
+    impl<T: FlushMeta + Size> FlushMeta for Accessor<T> {
+        fn flush_meta(&mut self) -> Result<()> {
+            self.get_mut().flush_meta()
+        }
+    }
+
+    impl<T: Size> Sectored for Accessor<T> {
+        fn sector_sz(&self) -> usize {
+            self.inner.sector_sz()
+        }
+    }
+
+    impl<T: Size> Size for Accessor<T> {
+        fn size(&self) -> std::io::Result<Option<u64>> {
+            self.inner.get_ref().size()
+        }
+    }
+
+    impl<T: Size> Split<Accessor<&'static [u8]>, T> for Accessor<T> {
+        fn split(self) -> (Accessor<&'static [u8]>, T) {
+            let (sectored_buf, inner) = self.inner.split();
+            let (secret_stream, inner) = inner.split();
+            let (cursor, inner) = inner.split();
+            let new_inner =
+                SectoredBuf::combine(sectored_buf, SecretStream::combine(secret_stream, cursor));
+            (Accessor { inner: new_inner }, inner)
+        }
+
+        fn combine(left: Accessor<&'static [u8]>, right: T) -> Self {
+            let (sectored_buf, inner) = left.inner.split();
+            let (secret_stream, inner) = inner.split();
+            let (cursor, ..) = inner.split();
+            let new_inner = SectoredBuf::combine(
+                sectored_buf,
+                SecretStream::combine(secret_stream, Cursor::combine(cursor, right)),
+            );
+            Accessor { inner: new_inner }
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use crate::test_helpers::{make_block_with, node_creds};
+
+    #[test]
+    fn can_wrap_block_ref() {
+        let block = make_block_with(node_creds())
+            .into_inner()
+            .into_inner()
+            .into_inner();
+        let mut accessor = Accessor::new(block).expect("failed to wrap block");
+        const EXPECTED: &[u8] = &[1u8; 8];
+
+        accessor.write_all(EXPECTED).expect("write failed");
+        accessor.flush().expect("flush failed");
+        accessor.rewind().expect("rewind failed");
+        let block = accessor.into_inner();
+        let mut wrapped = Accessor::new(&block).expect("failed to wrap block reference");
+        let mut actual = [0u8; EXPECTED.len()];
+        wrapped.read(&mut actual).expect("read failed");
+
+        assert_eq!(EXPECTED, actual);
+    }
+}

파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 293 - 253
crates/btlib/src/blocktree.rs


+ 345 - 0
crates/btlib/src/buf_reader.rs

@@ -0,0 +1,345 @@
+use log::error;
+use positioned_io::Size;
+use std::io::{self, Cursor, Read, Seek, SeekFrom};
+
+use crate::{bterr, Decompose, Result, Sectored, Split, EMPTY_SLICE};
+
+pub use private::BufReader;
+
+mod private {
+    use crate::{ReadExt, SeekFromExt, SizeExt};
+
+    use super::*;
+
+    pub struct BufReader<T> {
+        cursor: Cursor<Vec<u8>>,
+        reader: T,
+    }
+
+    impl<T> BufReader<T> {
+        pub fn with_buf(buf: Vec<u8>, reader: T) -> Result<BufReader<T>> {
+            Ok(Self {
+                cursor: Self::make_cursor(buf)?,
+                reader,
+            })
+        }
+
+        pub fn get_ref(&self) -> &T {
+            &self.reader
+        }
+
+        pub fn get_mut(&mut self) -> &mut T {
+            &mut self.reader
+        }
+
+        /// Extracts the buffer from this [BufReader]. The [BufReader] which is returned contains
+        /// an empty buffer.
+        pub fn take_buf(mut self) -> (Self, Vec<u8>) {
+            let buf = self.cursor.into_inner();
+            self.cursor = Cursor::new(Vec::new());
+            (self, buf)
+        }
+
+        fn make_cursor(buf: Vec<u8>) -> Result<Cursor<Vec<u8>>> {
+            // If buf is zero-length then a call to read will loop forever.
+            if buf.is_empty() {
+                return Err(bterr!("the given vector must be non-empty"));
+            }
+            let mut cursor = Cursor::new(buf);
+            cursor.seek(SeekFrom::End(0)).unwrap();
+            Ok(cursor)
+        }
+
+        /// Returns true if all bytes have been read from the cursor.
+        fn cursor_is_empty(&self) -> bool {
+            let cursor_len = self.cursor.get_ref().len() as u64;
+            let cursor_pos = self.cursor.position();
+            cursor_pos >= cursor_len
+        }
+    }
+
+    impl<T: Sectored> BufReader<T> {
+        pub fn new(reader: T) -> Result<BufReader<T>> {
+            let sect_sz = reader.sector_sz();
+            Ok(Self {
+                cursor: Self::make_cursor(vec![0u8; sect_sz])?,
+                reader,
+            })
+        }
+    }
+
+    impl<T: Seek> BufReader<T> {
+        /// Calculates the current position in this stream.
+        pub fn pos(&mut self) -> io::Result<u64> {
+            let inner_pos = self.reader.stream_position()?;
+            // Because the inner stream is ahead of this stream, the current position is the
+            // position of the inner stream minus the number of bytes remaining in the cursor.
+            let remaining = self.cursor.get_ref().len() as u64 - self.cursor.position();
+            let pos = inner_pos - remaining;
+            Ok(pos)
+        }
+    }
+
+    impl<T: Read> BufReader<T> {
+        /// Refills the cursor by reading from the underlying stream.
+        fn refill(&mut self) -> Result<()> {
+            self.cursor.rewind()?;
+            let vec = self.cursor.get_mut();
+            let read = self.reader.fill_buf(vec)?;
+            if read == 0 || read == vec.len() {
+                Ok(())
+            } else {
+                Err(bterr!("unexpected number of bytes read: {read}"))
+            }
+        }
+    }
+
+    impl<T: Read> Read for BufReader<T> {
+        fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
+            if buf.len() == self.sector_sz() && self.cursor_is_empty() {
+                return self.reader.read(buf);
+            }
+            let buf_len_start = buf.len();
+            while !buf.is_empty() {
+                let read = match self.cursor.read(buf) {
+                    Ok(read) => read,
+                    Err(err) => {
+                        if buf_len_start == buf.len() {
+                            return Err(err);
+                        } else {
+                            error!("{err}");
+                            break;
+                        }
+                    }
+                };
+                buf = &mut buf[read..];
+                if self.cursor_is_empty() {
+                    if let Err(err) = self.refill() {
+                        if buf_len_start == buf.len() {
+                            return Err(err.into());
+                        } else {
+                            error!("error occurred in BufReader::refill: {err}");
+                            break;
+                        }
+                    }
+                }
+            }
+            Ok(buf_len_start - buf.len())
+        }
+    }
+
+    impl<T: Seek + Read + Size> Seek for BufReader<T> {
+        fn seek(&mut self, seek_from: std::io::SeekFrom) -> std::io::Result<u64> {
+            let pos = self.pos()?;
+            let new_pos = seek_from.abs(|| Ok(pos), || self.reader.size_or_err())?;
+            let sect_sz = self.sector_sz64();
+            let buf_pos = new_pos % sect_sz;
+            let index = pos / sect_sz;
+            let new_index = new_pos / sect_sz;
+            if index != new_index {
+                // Seek to the new position and invalidate the buffer.
+                self.reader.seek(SeekFrom::Start(sect_sz * new_index))?;
+                self.cursor.seek(SeekFrom::End(0))?;
+            }
+            if buf_pos != 0 {
+                // If the buffer position is not at the end then we must refill it.
+                self.refill()?;
+                self.cursor.seek(SeekFrom::Start(buf_pos))?;
+            }
+            Ok(new_pos)
+        }
+    }
+
+    impl<U, T: AsRef<U>> AsRef<U> for BufReader<T> {
+        fn as_ref(&self) -> &U {
+            self.reader.as_ref()
+        }
+    }
+
+    impl<U, T: AsMut<U>> AsMut<U> for BufReader<T> {
+        fn as_mut(&mut self) -> &mut U {
+            self.reader.as_mut()
+        }
+    }
+
+    impl<T: Size> Size for BufReader<T> {
+        fn size(&self) -> std::io::Result<Option<u64>> {
+            self.reader.size()
+        }
+    }
+
+    impl<T> Sectored for BufReader<T> {
+        fn sector_sz(&self) -> usize {
+            self.cursor.get_ref().len()
+        }
+    }
+
+    impl<T> Decompose<T> for BufReader<T> {
+        fn into_inner(self) -> T {
+            self.reader
+        }
+    }
+
+    impl<T> Split<BufReader<&'static [u8]>, T> for BufReader<T> {
+        fn split(self) -> (BufReader<&'static [u8]>, T) {
+            let reader = BufReader {
+                cursor: self.cursor,
+                reader: EMPTY_SLICE,
+            };
+            (reader, self.reader)
+        }
+
+        fn combine(left: BufReader<&'static [u8]>, right: T) -> Self {
+            BufReader {
+                cursor: left.cursor,
+                reader: right,
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use crate::test_helpers::{
+        random_indices, read_check, read_indices, write_fill, write_indices, Randomizer,
+        SectoredCursor,
+    };
+
+    #[test]
+    fn can_read() {
+        const EXPECTED: [u8; 32] = [1u8; 32];
+        let mut reader = BufReader::new(SectoredCursor::new(EXPECTED, EXPECTED.len())).unwrap();
+
+        let mut actual = [0u8; EXPECTED.len()];
+        reader.read(actual.as_mut()).expect("read failed");
+
+        assert_eq!(EXPECTED, actual);
+    }
+
+    /// Tests that the inner [Read] only sees calls to `read` with sector sized buffers.
+    #[test]
+    fn inner_sees_only_sector_sized_reads() {
+        const SECT_SZ: usize = 32;
+        const CHUNK_SZ: usize = 8;
+        const CHUNKS: usize = SECT_SZ / CHUNK_SZ;
+        let data = std::iter::successors(Some(1u8), |prev| Some(*prev + 1))
+            .map(|e| [e; CHUNK_SZ])
+            .take(CHUNKS)
+            .fold(Vec::with_capacity(SECT_SZ), |mut prev, curr| {
+                prev.extend_from_slice(&curr);
+                prev
+            });
+        // SectoredCursor will panic if it's given a buffer that isn't exactly SECT_SZ bytes long.
+        let mut reader =
+            BufReader::new(SectoredCursor::new(data, SECT_SZ).require_sect_sz(true)).unwrap();
+
+        let mut actual = [0u8; CHUNK_SZ];
+        for k in 1..(CHUNKS + 1) {
+            let expected = [k as u8; CHUNK_SZ];
+            reader.read(&mut actual).expect("read failed");
+            assert_eq!(expected, actual);
+        }
+    }
+
+    #[test]
+    fn sequential_read() {
+        const SECT_SZ: usize = 16;
+        const SECT_CT: usize = 8;
+        let mut cursor = SectoredCursor::new(Vec::new(), SECT_SZ);
+        write_fill(&mut cursor, SECT_SZ, SECT_CT);
+        cursor.rewind().unwrap();
+        let mut reader = BufReader::new(cursor).unwrap();
+        read_check(&mut reader, SECT_SZ, SECT_CT);
+    }
+
+    /// Tests that a read which is larger than one sector will be handled correctly.
+    #[test]
+    fn read_larger_than_one_sector() {
+        const SECT_SZ: usize = 4;
+        const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
+        let mut reader = BufReader::new(SectoredCursor::new(DATA, SECT_SZ)).unwrap();
+
+        let mut actual = [0u8; 6];
+        reader.read(&mut actual).expect("read failed");
+
+        assert_eq!([0, 1, 2, 3, 4, 5], actual);
+    }
+
+    #[test]
+    fn random_sector_sized_read() {
+        const SECT_SZ: usize = 32;
+        const SECT_CT: usize = 10;
+        let mut rando = Randomizer::new([7u8; Randomizer::HASH.len()]);
+        let indices: Vec<_> = random_indices(&mut rando, SECT_CT).collect();
+        let mut cursor = SectoredCursor::new(vec![0u8; SECT_SZ * SECT_CT], SECT_SZ);
+        write_indices(&mut cursor, SECT_SZ, indices.iter().cloned());
+        let mut reader = BufReader::new(cursor).unwrap();
+        read_indices(&mut reader, SECT_SZ, indices.iter().cloned());
+    }
+
+    #[test]
+    fn seek_with_empty_buffer() {
+        const SECT_SZ: usize = 4;
+        const DATA: [u8; 2 * SECT_SZ] = [0, 1, 2, 3, 4, 5, 6, 7];
+        const EXPECTED: u64 = 3;
+        let mut reader = BufReader::new(SectoredCursor::new(DATA, SECT_SZ)).unwrap();
+
+        let new_pos = reader.seek(SeekFrom::Start(EXPECTED)).expect("seek failed");
+        assert_eq!(EXPECTED, new_pos);
+        let mut actual = [0u8; 1];
+        reader.read(&mut actual).expect("read failed");
+
+        assert_eq!(EXPECTED as u8, actual[0]);
+    }
+
+    #[test]
+    fn seek_to_middle_of_next_sector() {
+        const SECT_SZ: usize = 4;
+        const DATA: [u8; 2 * SECT_SZ] = [0, 1, 2, 3, 4, 5, 6, 7];
+        const EXPECTED: u64 = 5;
+        let mut reader = BufReader::new(SectoredCursor::new(DATA, SECT_SZ)).unwrap();
+        let mut actual = [0u8; 1];
+        // This first read ensures the buffer is filled.
+        reader.read(&mut actual).expect("first read failed");
+
+        let new_pos = reader.seek(SeekFrom::Start(EXPECTED)).expect("seek failed");
+        assert_eq!(EXPECTED, new_pos);
+        reader.read(&mut actual).expect("read failed");
+
+        assert_eq!(EXPECTED as u8, actual[0]);
+    }
+
+    #[test]
+    fn seek_relative_to_current_position() {
+        const SECT_SZ: usize = 4;
+        const DATA: [u8; 2 * SECT_SZ] = [0, 1, 2, 3, 4, 5, 6, 7];
+        const EXPECTED: u64 = 3;
+        let mut reader = BufReader::new(SectoredCursor::new(DATA, SECT_SZ)).unwrap();
+        let mut actual = [0u8; SECT_SZ];
+        reader.read(&mut actual).expect("first read failed");
+
+        let new_pos = reader.seek(SeekFrom::Current(-1)).expect("seek failed");
+        assert_eq!(EXPECTED, new_pos);
+        reader.read(&mut actual).expect("read failed");
+
+        assert_eq!(EXPECTED as u8, actual[0]);
+    }
+
+    #[test]
+    fn seek_relative_to_end() {
+        const SECT_SZ: usize = 4;
+        const DATA: [u8; 2 * SECT_SZ] = [0, 1, 2, 3, 4, 5, 6, 7];
+        const EXPECTED: u64 = 7;
+        let mut reader = BufReader::new(SectoredCursor::new(DATA, SECT_SZ)).unwrap();
+        let mut actual = [0u8; SECT_SZ];
+        reader.read(&mut actual).expect("first read failed");
+
+        let new_pos = reader.seek(SeekFrom::End(-1)).expect("seek failed");
+        assert_eq!(EXPECTED, new_pos);
+        reader.read(&mut actual).expect("read failed");
+
+        assert_eq!(EXPECTED as u8, actual[0]);
+    }
+}

+ 49 - 65
crates/btlib/src/crypto/merkle_stream.rs

@@ -8,11 +8,11 @@ mod private {
         bterr,
         crypto::{Error, HashKind, Result},
         trailered::Trailered,
-        Block, BlockMeta, BoxInIoErr, Decompose, MetaAccess, Sectored, TryCompose, WriteInteg,
-        SECTOR_SZ_DEFAULT,
+        BoxInIoErr, Decompose, FlushMeta, MetaReader, Sectored, WriteInteg, SECTOR_SZ_DEFAULT,
     };
+    use positioned_io::{ReadAt, Size, WriteAt};
     use serde::{Deserialize, Serialize};
-    use std::io::{self, Read, Seek, Write};
+    use std::io;
     use strum::EnumDiscriminants;
 
     /// Returns the base 2 logarithm of the given number. This function will return -1 when given 0, and
@@ -552,19 +552,18 @@ mod private {
         }
     }
 
-    impl Default for VariantMerkleTree {
+    impl Default for MerkleTreeKind {
         fn default() -> Self {
-            Self::Sha2_256(VecMerkleTree::<Sha2_256Node>::default())
+            Self::Sha2_256
         }
     }
 
     pub struct MerkleStream<T> {
         trailered: Trailered<T, VariantMerkleTree>,
         tree: VariantMerkleTree,
-        pos: usize,
     }
 
-    impl<T: MetaAccess> MerkleStream<T> {
+    impl<T: MetaReader> MerkleStream<T> {
         /// Asserts that the root merkle node contains the integrity value given by the inner
         /// stream.
         pub fn assert_root_integrity(&mut self) -> Result<()> {
@@ -573,16 +572,15 @@ mod private {
         }
     }
 
-    impl<T: Read + Seek> MerkleStream<T> {
+    impl<T: ReadAt + Size + Sectored> MerkleStream<T> {
         /// Reads a `MerkleTree` from the end of the given stream and returns a stream which uses
         /// it.
         pub fn new(inner: T) -> Result<MerkleStream<T>> {
             let (trailered, tree) = Trailered::new(inner)?;
-            Ok(MerkleStream {
-                trailered,
-                tree: tree.unwrap_or_default(),
-                pos: 0,
-            })
+            let tree = tree.unwrap_or_else(|| {
+                VariantMerkleTree::empty(MerkleTreeKind::default(), trailered.sector_sz())
+            });
+            Ok(MerkleStream { trailered, tree })
         }
 
         pub fn with_tree(inner: T, tree: VariantMerkleTree) -> Result<MerkleStream<T>> {
@@ -590,11 +588,7 @@ mod private {
             if trailer.is_some() {
                 return Err(bterr!("stream already contained a serialized merkle tree",));
             }
-            Ok(MerkleStream {
-                trailered,
-                tree,
-                pos: 0,
-            })
+            Ok(MerkleStream { trailered, tree })
         }
     }
 
@@ -604,31 +598,18 @@ mod private {
         }
     }
 
-    impl<T: Read + Seek> TryCompose<T, MerkleStream<T>> for MerkleStream<()> {
-        type Error = crate::Error;
-
-        fn try_compose(self, inner: T) -> std::result::Result<MerkleStream<T>, Self::Error> {
-            let (trailered, tree) = Trailered::new(inner)?;
-            Ok(MerkleStream {
-                trailered,
-                tree: tree.unwrap_or_default(),
-                pos: 0,
-            })
-        }
-    }
-
     impl<T> Decompose<T> for MerkleStream<T> {
         fn into_inner(self) -> T {
             self.trailered.into_inner()
         }
     }
 
-    impl<T: WriteInteg + Seek> Write for MerkleStream<T> {
-        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+    impl<T: WriteInteg + Size> WriteAt for MerkleStream<T> {
+        fn write_at(&mut self, pos: u64, buf: &[u8]) -> io::Result<usize> {
             self.assert_sector_sz(buf.len())?;
-            self.tree.write(self.pos, buf)?;
-            let written = self.trailered.write(buf)?;
-            self.pos += self.sector_sz();
+            let pos_usize: usize = pos.try_into().box_err()?;
+            self.tree.write(pos_usize, buf)?;
+            let written = self.trailered.write_at(pos, buf)?;
             Ok(written)
         }
 
@@ -638,39 +619,35 @@ mod private {
         }
     }
 
-    impl<T: Read + Seek> Read for MerkleStream<T> {
-        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+    impl<T: ReadAt + Size> ReadAt for MerkleStream<T> {
+        fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result<usize> {
             self.assert_sector_sz(buf.len())?;
-            self.trailered.read_exact(buf)?;
-            self.tree.verify(self.pos, buf)?;
-            self.pos += self.sector_sz();
+            self.trailered.read_exact_at(pos, buf)?;
+            let pos: usize = pos.try_into().box_err()?;
+            self.tree.verify(pos, buf)?;
             Ok(self.sector_sz())
         }
     }
 
-    impl<T: Seek> Seek for MerkleStream<T> {
-        fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
-            let from_start = self.trailered.seek(pos)?;
-            self.pos = from_start.try_into().box_err()?;
-            Ok(from_start)
-        }
-    }
-
-    impl<T: AsRef<BlockMeta>> AsRef<BlockMeta> for MerkleStream<T> {
-        fn as_ref(&self) -> &BlockMeta {
+    impl<U, T: AsRef<U>> AsRef<U> for MerkleStream<T> {
+        fn as_ref(&self) -> &U {
             self.trailered.as_ref()
         }
     }
 
-    impl<T: AsMut<BlockMeta>> AsMut<BlockMeta> for MerkleStream<T> {
-        fn as_mut(&mut self) -> &mut BlockMeta {
+    impl<U, T: AsMut<U>> AsMut<U> for MerkleStream<T> {
+        fn as_mut(&mut self) -> &mut U {
             self.trailered.as_mut()
         }
     }
 
-    impl<T: MetaAccess> MetaAccess for MerkleStream<T> {}
+    impl<T> Size for MerkleStream<T> {
+        fn size(&self) -> io::Result<Option<u64>> {
+            self.trailered.size()
+        }
+    }
 
-    impl<T: Block + WriteInteg> Block for MerkleStream<T> {
+    impl<T: FlushMeta> FlushMeta for MerkleStream<T> {
         fn flush_meta(&mut self) -> crate::Result<()> {
             self.trailered.flush_meta()
         }
@@ -679,13 +656,17 @@ mod private {
 
 #[cfg(test)]
 pub(crate) mod tests {
+    use btserde::{from_vec, to_vec};
     use std::io::{Read, Seek, SeekFrom, Write};
 
-    use super::private::{exp2, log2};
-    use super::*;
-    use crate::test_helpers::{BtCursor, Randomizer};
-    use crate::SECTOR_SZ_DEFAULT;
-    use btserde::{from_vec, to_vec};
+    use super::{
+        private::{exp2, log2},
+        *,
+    };
+    use crate::{
+        test_helpers::{Randomizer, SectoredCursor},
+        Cursor, SECTOR_SZ_DEFAULT,
+    };
 
     #[test]
     fn log2_test() {
@@ -780,8 +761,9 @@ pub(crate) mod tests {
 
     fn merkle_stream_sequential_test_case(sect_sz: usize, sect_ct: usize) {
         let tree = VariantMerkleTree::empty(MerkleTreeKind::Sha2_256, sect_sz);
-        let mut stream =
-            MerkleStream::with_tree(BtCursor::new(Vec::new()), tree).expect("read from end failed");
+        let stream = MerkleStream::with_tree(SectoredCursor::new(Vec::new(), sect_sz), tree)
+            .expect("read from end failed");
+        let mut stream = Cursor::new(stream);
         for k in 1..(sect_ct + 1) {
             let sector = vec![k as u8; sect_sz];
             stream.write(&sector).expect("write failed");
@@ -807,11 +789,12 @@ pub(crate) mod tests {
     pub(crate) fn make_merkle_stream_filled_with_zeros(
         sect_sz: usize,
         sect_ct: usize,
-    ) -> MerkleStream<BtCursor<Vec<u8>>> {
+    ) -> Cursor<MerkleStream<SectoredCursor<Vec<u8>>>> {
         let tree = VariantMerkleTree::empty(MerkleTreeKind::Sha2_256, sect_sz);
-        let mut stream =
-            MerkleStream::with_tree(BtCursor::new(Vec::new()), tree).expect("read from end failed");
+        let stream = MerkleStream::with_tree(SectoredCursor::new(Vec::new(), sect_sz), tree)
+            .expect("read from end failed");
         let zeros = vec![0u8; sect_sz];
+        let mut stream = Cursor::new(stream);
         for _ in 0..sect_ct {
             stream.write(&zeros).expect("write zeros failed");
         }
@@ -830,6 +813,7 @@ pub(crate) mod tests {
             let sector = vec![index as u8; sect_sz];
             stream.write(&sector).expect("write failed");
         }
+        stream.flush().expect("flush failed");
         for index in indices.iter().map(|e| *e) {
             let offset = sect_sz * index;
             stream

+ 13 - 42
crates/btlib/src/crypto/mod.rs

@@ -1,11 +1,11 @@
 pub mod tpm;
 
-mod merkle_stream;
+pub mod merkle_stream;
 pub use merkle_stream::MerkleStream;
-mod secret_stream;
+pub mod secret_stream;
 pub use secret_stream::SecretStream;
-mod sign_stream;
-pub use sign_stream::SignStream;
+//mod sign_stream;
+//pub use sign_stream::SignStream;
 
 use crate::{
     btensure, bterr, fmt, io, BigArray, BlockMeta, BlockPath, Deserialize, Epoch, Formatter,
@@ -2097,15 +2097,16 @@ impl Writecap {
 
 #[cfg(test)]
 mod tests {
+    use std::{
+        io::{Seek, SeekFrom},
+        time::Duration,
+    };
+
     use super::*;
     use crate::{
         crypto::secret_stream::SecretStream,
         test_helpers::{self, *},
-        BrotliParams, Sectored, SectoredBuf, TryCompose,
-    };
-    use std::{
-        io::{Seek, SeekFrom},
-        time::Duration,
+        Sectored, TryCompose,
     };
 
     #[test]
@@ -2115,7 +2116,7 @@ mod tests {
         let creds = make_key_pair();
         let mut block = make_block_with(&creds);
         write_fill(&mut block, SECT_SZ, SECT_CT);
-        block.seek(SeekFrom::Start(0)).expect("seek failed");
+        block.rewind().expect("rewind failed");
         read_check(block, SECT_SZ, SECT_CT);
     }
 
@@ -2277,38 +2278,8 @@ mod tests {
             .expect("compose for secret failed");
         let secret_sect_sz = secret.sector_sz();
         write_fill(&mut secret, secret_sect_sz, SECT_CT);
-        secret.seek(SeekFrom::Start(0)).expect("seek failed");
-        read_check(&mut secret, secret_sect_sz, SECT_CT);
-    }
-
-    #[test]
-    fn compress_then_encrypt() {
-        use brotli::{CompressorWriter, Decompressor};
-        const SECT_SZ: usize = 4096;
-        const SECT_CT: usize = 16;
-        let params = BrotliParams::new(SECT_SZ, 8, 20);
-        let key = SymKey::generate(SymKeyKind::Aes256Cbc).expect("key generation failed");
-        let mut inner = SectoredBuf::new()
-            .try_compose(
-                SecretStream::new(key)
-                    .try_compose(SectoredCursor::new(Vec::new(), SECT_SZ))
-                    .expect("compose for inner failed"),
-            )
-            .expect("compose with sectored buffer failed");
-        {
-            let write: CompressorWriter<_> = params
-                .clone()
-                .try_compose(&mut inner)
-                .expect("compose for write failed");
-            write_fill(write, SECT_SZ, SECT_CT);
-        }
-        inner.seek(SeekFrom::Start(0)).expect("seek failed");
-        {
-            let read: Decompressor<_> = params
-                .try_compose(&mut inner)
-                .expect("compose for read failed");
-            read_check(read, SECT_SZ, SECT_CT);
-        }
+        secret.rewind().expect("rewind failed");
+        read_check(secret, secret_sect_sz, SECT_CT);
     }
 
     fn ossl_hash_op_same_as_digest_test_case<H: Hash + From<DigestBytes>>(kind: HashKind) {

+ 53 - 21
crates/btlib/src/crypto/secret_stream.rs

@@ -1,12 +1,16 @@
+use std::io::{self, Read, Seek, SeekFrom, Write};
+use positioned_io::Size;
+
+use crate::{
+    bterr,
+    crypto::{Error, Result, SymKey},
+    Decompose, Sectored, Split, TryCompose, EMPTY_SLICE,
+};
+
 pub use private::SecretStream;
 
 mod private {
-    use crate::{
-        bterr,
-        crypto::{Error, Result, SymKey},
-        Block, BlockMeta, Decompose, MetaAccess, Sectored, TryCompose,
-    };
-    use std::io::{self, Read, Seek, SeekFrom, Write};
+    use super::*;
 
     // A stream which encrypts all data written to it and decrypts all data read from it.
     pub struct SecretStream<T> {
@@ -24,6 +28,14 @@ mod private {
     }
 
     impl<T> SecretStream<T> {
+        pub fn get_ref(&self) -> &T {
+            &self.inner
+        }
+
+        pub fn get_mut(&mut self) -> &mut T {
+            &mut self.inner
+        }
+
         /// Given an offset into this stream, produces the corresponding offset into the inner stream.
         fn inner_offset(&self, outer_offset: u64) -> u64 {
             let sect_sz = self.sect_sz as u64;
@@ -53,6 +65,31 @@ mod private {
         }
     }
 
+    impl<T> Split<SecretStream<&'static [u8]>, T> for SecretStream<T> {
+        fn split(self) -> (SecretStream<&'static [u8]>, T) {
+            let new_self = SecretStream {
+                inner: EMPTY_SLICE,
+                inner_sect_sz: self.inner_sect_sz,
+                sect_sz: self.sect_sz,
+                key: self.key,
+                ct_buf: self.ct_buf,
+                pt_buf: self.pt_buf,
+            };
+            (new_self, self.inner)
+        }
+
+        fn combine(left: SecretStream<&'static [u8]>, right: T) -> Self {
+            SecretStream {
+                inner: right,
+                inner_sect_sz: left.inner_sect_sz,
+                sect_sz: left.sect_sz,
+                key: left.key,
+                ct_buf: left.ct_buf,
+                pt_buf: left.pt_buf,
+            }
+        }
+    }
+
     impl<T> Decompose<T> for SecretStream<T> {
         fn into_inner(self) -> T {
             self.inner
@@ -72,7 +109,7 @@ mod private {
                     actual: sect_sz,
                 }));
             }
-            self.pt_buf.resize(inner_sect_sz, 0);
+            self.ct_buf.resize(inner_sect_sz, 0);
             self.pt_buf.resize(inner_sect_sz + block_sz, 0);
             Ok(SecretStream {
                 inner,
@@ -113,7 +150,6 @@ mod private {
         fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
             self.assert_sector_sz(buf.len())?;
 
-            self.ct_buf.resize(self.inner_sect_sz, 0);
             match self.inner.read_exact(&mut self.ct_buf) {
                 Ok(_) => (),
                 Err(err) => {
@@ -165,35 +201,31 @@ mod private {
         }
     }
 
-    impl<T: AsRef<BlockMeta>> AsRef<BlockMeta> for SecretStream<T> {
-        fn as_ref(&self) -> &BlockMeta {
+    impl<U, T: AsRef<U>> AsRef<U> for SecretStream<T> {
+        fn as_ref(&self) -> &U {
             self.inner.as_ref()
         }
     }
 
-    impl<T: AsMut<BlockMeta>> AsMut<BlockMeta> for SecretStream<T> {
-        fn as_mut(&mut self) -> &mut BlockMeta {
+    impl<U, T: AsMut<U>> AsMut<U> for SecretStream<T> {
+        fn as_mut(&mut self) -> &mut U {
             self.inner.as_mut()
         }
     }
 
-    impl<T: MetaAccess> MetaAccess for SecretStream<T> {}
-
-    impl<T: Block> Block for SecretStream<T> {
-        fn flush_meta(&mut self) -> crate::Result<()> {
-            self.inner.flush_meta()
+    impl<T: Size> Size for SecretStream<T> {
+        fn size(&self) -> io::Result<Option<u64>> {
+            self.inner.size()
         }
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use std::io::{Read, Seek, SeekFrom, Write};
-
     use crate::{
-        crypto::{SymKey, SymKeyKind},
+        crypto::{SymKeyKind},
         test_helpers::{Randomizer, SectoredCursor},
-        Sectored, TryCompose, SECTOR_SZ_DEFAULT,
+        SECTOR_SZ_DEFAULT,
     };
 
     use super::*;

+ 297 - 101
crates/btlib/src/lib.rs

@@ -1,17 +1,16 @@
-pub use block_path::{BlockPath, BlockPathError};
 pub mod blocktree;
 /// Code which enables cryptographic operations.
 pub mod crypto;
 pub mod error;
 pub mod log;
 pub mod msg;
-pub use error::{Error, Result};
-
+pub mod accessor;
+pub mod buf_reader;
+pub mod sectored_buf;
 mod block_path;
-mod sectored_buf;
+mod trailered;
 #[cfg(test)]
 mod test_helpers;
-mod trailered;
 
 #[macro_use]
 extern crate static_assertions;
@@ -21,17 +20,10 @@ extern crate static_assertions;
 extern crate lazy_static;
 
 use btserde::{read_from, write_to};
-use crypto::{
-    AsymKeyPub, Ciphertext, Creds, Decrypter, DecrypterExt, Encrypter, EncrypterExt, HashKind,
-    MerkleStream, PublicCreds, SecretStream, Sign, Signature, Signer, SymKey, SymKeyKind, VarHash,
-};
-use error::{BoxInIoErr, BtErr};
-use trailered::Trailered;
-
 use ::log::error;
 use brotli::{CompressorWriter, Decompressor};
 use fuse_backend_rs::abi::fuse_abi::{stat64, Attr};
-use sectored_buf::SectoredBuf;
+use positioned_io::{ReadAt, Size, WriteAt};
 use serde::{Deserialize, Serialize};
 use serde_big_array::BigArray;
 use std::{
@@ -47,12 +39,23 @@ use std::{
 };
 use strum_macros::{Display, EnumDiscriminants, FromRepr};
 
+pub use error::{Error, Result};
+pub use block_path::{BlockPath, BlockPathError};
+use crypto::{
+    AsymKeyPub, Ciphertext, Creds, Decrypter, DecrypterExt, Encrypter, EncrypterExt, HashKind,
+    MerkleStream, PublicCreds, SecretStream, Sign, Signature, Signer, SymKey, SymKeyKind, VarHash,
+};
+use error::{BoxInIoErr, BtErr};
+use trailered::Trailered;
+use accessor::Accessor;
+
 #[derive(Debug)]
 pub enum BlockError {
     MissingWritecap,
     IncorrectSize { expected: usize, actual: usize },
     NoBlockKey,
     NoBlockPath,
+    UnknownSize,
 }
 
 impl Display for BlockError {
@@ -64,6 +67,7 @@ impl Display for BlockError {
             }
             BlockError::NoBlockKey => write!(f, "no block key is present"),
             BlockError::NoBlockPath => write!(f, "no block path was specified"),
+            BlockError::UnknownSize => write!(f, "size could not be determined"),
         }
     }
 }
@@ -73,45 +77,78 @@ impl std::error::Error for BlockError {}
 // This assertion ensures that conversions from `usize` to `u64` will not cause truncation. This
 // prevents this code from compiling for 128 bit platforms, but that's not really a concern for the
 // foreseeable future.
+// If this assumption is ever to be removed, you'll need to evaluate every occurrence of `as u64`.
 const_assert!(::std::mem::size_of::<usize>() <= ::std::mem::size_of::<u64>());
 /// The default sector size to use for new blocks.
 pub const SECTOR_SZ_DEFAULT: usize = 4096;
 /// `SECTOR_SZ_DEFAULT` converted to a `u64`.
 pub const SECTOR_U64_DEFAULT: u64 = SECTOR_SZ_DEFAULT as u64;
 
+pub trait MetaReader: AsRef<BlockMeta> + Size {
+    fn meta(&self) -> &BlockMeta {
+        self.as_ref()
+    }
+
+    fn meta_body(&self) -> &BlockMetaBody {
+        self.meta().body()
+    }
+}
+
+impl<T: AsRef<BlockMeta> + Size + ?Sized> MetaReader for T {}
+
+/// Trait for accessing the metadata associated with a block.
+pub trait MetaAccess: AsMut<BlockMeta> + MetaReader {
+    fn mut_meta(&mut self) -> &mut BlockMeta {
+        self.as_mut()
+    }
+
+    fn mut_meta_body(&mut self) -> &mut BlockMetaBody {
+        self.mut_meta().mut_body()
+    }
+}
+
+impl<T: AsMut<BlockMeta> + MetaReader + ?Sized> MetaAccess for T {}
+
+pub trait FlushMeta {
+    /// Flushes metadata to persistent storage.
+    fn flush_meta(&mut self) -> Result<()>;
+}
+
+impl<T: FlushMeta + ?Sized> FlushMeta for &mut T {
+    fn flush_meta(&mut self) -> Result<()> {
+        (*self).flush_meta()
+    }
+}
+
 /// ### THE BLOCK TRAIT
 ///
 /// Trait for types which provide read and write access to blocks.
-pub trait Block: Read + Write + Seek + MetaAccess {
-    /// Flushes metadata to persistent storage.
-    fn flush_meta(&mut self) -> Result<()>;
+pub trait Block: ReadAt + WriteAt + MetaAccess + Sectored + FlushMeta {}
+
+impl<T: ReadAt + WriteAt + MetaAccess + Sectored + FlushMeta + ?Sized> Block for T {}
 
+pub trait BlockReader: Read + Seek + AsRef<BlockMeta> + Size + Sectored {
     fn read_dir(&mut self) -> Result<Directory> {
-        self.seek(SeekFrom::Start(0))?;
-        // &mut &mut Self has to be passed because rustc complains that &mut Self is not sized,
-        // for some reason (implicit dereference?). You can't pass in `&mut self` because `self`
-        // is not declared as mutable. Hence this hack.
-        let mut selfie = self;
-        let dir: Directory = read_from(&mut selfie)?;
+        self.rewind()?;
+        let mut selphie = self;
+        let dir: Directory = read_from(&mut selphie)?;
         Ok(dir)
     }
+}
+
+impl<T: Read + Seek + AsRef<BlockMeta> + Size + Sectored + ?Sized> BlockReader for T {}
 
+pub trait BlockAccessor: BlockReader + Write + MetaAccess {
     fn write_dir(&mut self, dir: &Directory) -> Result<()> {
-        self.seek(SeekFrom::Start(0))?;
-        let mut selfie = self;
-        write_to(dir, &mut selfie)?;
-        selfie.flush()?;
+        self.rewind()?;
+        let mut selphie = self;
+        write_to(dir, &mut selphie)?;
+        selphie.flush()?;
         Ok(())
     }
 }
 
-impl<T: MetaAccess> MetaAccess for &mut T {}
-
-impl<T: Block> Block for &mut T {
-    fn flush_meta(&mut self) -> Result<()> {
-        (*self).flush_meta()
-    }
-}
+impl<T: Read + Write + Seek + MetaAccess + Sectored + ?Sized> BlockAccessor for T {}
 
 // A trait for streams which only allow reads and writes in fixed sized units called sectors.
 pub trait Sectored {
@@ -152,6 +189,18 @@ pub trait Sectored {
     }
 }
 
+impl<T: Sectored + ?Sized> Sectored for &T {
+    fn sector_sz(&self) -> usize {
+        (**self).sector_sz()
+    }
+}
+
+impl<T: Sectored + ?Sized> Sectored for &mut T {
+    fn sector_sz(&self) -> usize {
+        (**self).sector_sz()
+    }
+}
+
 impl Sectored for ::std::fs::File {
     fn sector_sz(&self) -> usize {
         self.metadata()
@@ -160,16 +209,36 @@ impl Sectored for ::std::fs::File {
     }
 }
 
-/// A version of the `Write` trait, which allows integrity information to be supplied when flushing.
-pub trait WriteInteg: Write {
+impl<T: Sectored + Size> Sectored for Cursor<T> {
+    fn sector_sz(&self) -> usize {
+        self.cursor.get_ref().sector_sz()
+    }
+}
+
+pub trait SizeExt: Size {
+    fn size_or_err(&self) -> Result<u64> {
+        self.size()?.ok_or_else(|| bterr!(BlockError::UnknownSize))
+    }
+}
+
+impl<T: Size> SizeExt for T {}
+
+/// A version of the `WriteAt` trait, which allows integrity information to be supplied when
+/// flushing.
+pub trait WriteInteg: WriteAt {
     fn flush_integ(&mut self, integrity: &[u8]) -> io::Result<()>;
 }
 
-trait Decompose<T> {
+pub trait Decompose<T> {
     fn into_inner(self) -> T;
 }
 
-trait TryCompose<T, U: Decompose<T>> {
+pub trait Split<L, R> {
+    fn split(self) -> (L, R);
+    fn combine(left: L, right: R) -> Self;
+}
+
+pub trait TryCompose<T, U: Decompose<T>> {
     type Error;
     fn try_compose(self, inner: T) -> std::result::Result<U, Self::Error>;
 }
@@ -185,25 +254,6 @@ impl<T, U: Decompose<T>, S: TryCompose<T, U, Error = Infallible>> Compose<T, U>
         unsafe { result.unwrap_unchecked() }
     }
 }
-
-/// Trait for accessing the metadata associated with a block.
-pub trait MetaAccess: AsRef<BlockMeta> + AsMut<BlockMeta> {
-    fn meta(&self) -> &BlockMeta {
-        self.as_ref()
-    }
-    fn mut_meta(&mut self) -> &mut BlockMeta {
-        self.as_mut()
-    }
-
-    fn meta_body(&self) -> &BlockMetaBody {
-        self.meta().body()
-    }
-
-    fn mut_meta_body(&mut self) -> &mut BlockMetaBody {
-        self.mut_meta().mut_body()
-    }
-}
-
 impl AsRef<BlockMeta> for &BlockMeta {
     fn as_ref(&self) -> &BlockMeta {
         self
@@ -222,7 +272,92 @@ impl AsMut<BlockMeta> for &mut BlockMeta {
     }
 }
 
-impl MetaAccess for &mut BlockMeta {}
+#[derive(Debug)]
+pub struct Cursor<T: Size> {
+    cursor: positioned_io::SizeCursor<T>,
+}
+
+impl<T: Size> Cursor<T> {
+    pub fn new(inner: T) -> Cursor<T> {
+        Self {
+            cursor: positioned_io::SizeCursor::new(inner),
+        }
+    }
+
+    pub fn new_pos(inner: T, pos: u64) -> Cursor<T> {
+        Self {
+            cursor: positioned_io::SizeCursor::new_pos(inner, pos),
+        }
+    }
+
+    pub fn get_ref(&self) -> &T {
+        self.cursor.get_ref()
+    }
+
+    pub fn get_mut(&mut self) -> &mut T {
+        self.cursor.get_mut()
+    }
+}
+
+impl<T: ReadAt + Size> Read for Cursor<T> {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        self.cursor.read(buf)
+    }
+}
+
+impl<T: WriteAt + Size> Write for Cursor<T> {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.cursor.write(buf)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.cursor.flush()
+    }
+}
+
+impl<T: Size> Seek for Cursor<T> {
+    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
+        self.cursor.seek(pos)
+    }
+}
+
+impl<T: Size> Decompose<T> for Cursor<T> {
+    fn into_inner(self) -> T {
+        self.cursor.into_inner()
+    }
+}
+
+impl<U, T: AsRef<U> + Size> AsRef<U> for Cursor<T> {
+    fn as_ref(&self) -> &U {
+        self.cursor.get_ref().as_ref()
+    }
+}
+
+impl<U, T: AsMut<U> + Size> AsMut<U> for Cursor<T> {
+    fn as_mut(&mut self) -> &mut U {
+        self.cursor.get_mut().as_mut()
+    }
+}
+
+impl<T: Size> Size for Cursor<T> {
+    fn size(&self) -> io::Result<Option<u64>> {
+        self.cursor.get_ref().size()
+    }
+}
+
+pub const EMPTY_SLICE: &[u8] = &[0u8; 0];
+
+impl<T: Size> Split<Cursor<&'static [u8]>, T> for Cursor<T> {
+    fn split(self) -> (Cursor<&'static [u8]>, T) {
+        let pos = self.cursor.position();
+        (Cursor::new_pos(EMPTY_SLICE, pos), self.cursor.into_inner())
+    }
+
+    fn combine(left: Cursor<&'static [u8]>, right: T) -> Self {
+        let pos = left.cursor.position();
+        Self::new_pos(right, pos)
+    }
+}
 
 /// Extensions to the `Read` trait.
 trait ReadExt: Read {
@@ -257,6 +392,62 @@ trait ReadExt: Read {
 
 impl<T: Read> ReadExt for T {}
 
+trait SeekFromExt {
+    /// Returns the absolute position (offset from the start) this `SeekFrom` refers to.
+    /// `curr` is called in the case this `SeekFrom` is `Current`, and is expected to return the
+    /// current position.
+    /// `end` is called in the case this `SeekFrom` is `End`, and is expected  to return the
+    /// the position of the end.
+    fn abs<F, G>(&self, curr: F, end: G) -> Result<u64>
+    where
+        F: FnOnce() -> Result<u64>,
+        G: FnOnce() -> Result<u64>;
+
+    /// Like [SeekFromExt::abs] except that an error is always returned when [SeekFrom::End] is
+    /// given.
+    fn abs_no_end<F>(&self, curr: F) -> Result<u64>
+    where
+        F: FnOnce() -> Result<u64>,
+    {
+        self.abs(curr, || Err(bterr!("SeekFrom::End is not supported")))
+    }
+
+    /// Converts a C-style `(whence, offset)` pair into a [SeekFrom] enum value.
+    /// See the POSIX man page of `lseek` for more details.
+    fn whence_offset(whence: u32, offset: u64) -> io::Result<SeekFrom> {
+        let whence = whence as i32;
+        match whence {
+            libc::SEEK_SET => Ok(SeekFrom::Start(offset)),
+            libc::SEEK_CUR => Ok(SeekFrom::Current(offset as i64)),
+            libc::SEEK_END => Ok(SeekFrom::End(offset as i64)),
+            _ => Err(io::Error::new(
+                io::ErrorKind::InvalidInput,
+                "`whence` was not one of `libc::{SEEK_SET, SEEK_CUR, SEEK_END}`",
+            )),
+        }
+    }
+}
+
+impl SeekFromExt for SeekFrom {
+    fn abs<F, G>(&self, curr: F, end: G) -> Result<u64>
+    where
+        F: FnOnce() -> Result<u64>,
+        G: FnOnce() -> Result<u64>,
+    {
+        match self {
+            SeekFrom::Start(start) => Ok(*start),
+            SeekFrom::Current(from_curr) => {
+                let curr = curr()?;
+                Ok(curr.wrapping_add_signed(*from_curr))
+            }
+            SeekFrom::End(from_end) => {
+                let end = end()?;
+                Ok(end.wrapping_add_signed(*from_end))
+            }
+        }
+    }
+}
+
 /// A unique identifier for a block.
 #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Hash)]
 pub struct BlockId {
@@ -550,7 +741,7 @@ impl AsMut<BlockMetaBody> for BlockMeta {
     }
 }
 
-struct BlockStream<T, C> {
+pub struct BlockStream<T, C> {
     trailered: Trailered<T, BlockMeta>,
     meta: BlockMeta,
     meta_body_buf: Vec<u8>,
@@ -558,7 +749,7 @@ struct BlockStream<T, C> {
     sect_sz: usize,
 }
 
-impl<T: Read + Seek + Sectored, C: Creds> BlockStream<T, C> {
+impl<T: ReadAt + Sectored + Size, C: Creds> BlockStream<T, C> {
     fn new(inner: T, creds: C, block_path: BlockPath) -> Result<BlockStream<T, C>> {
         let (trailered, meta) = Trailered::<_, BlockMeta>::new(inner)?;
         let meta = match meta {
@@ -605,7 +796,7 @@ impl<T: Read + Seek + Sectored, C: Creds> BlockStream<T, C> {
     }
 }
 
-impl<T: Write + Seek, C: Signer> BlockStream<T, C> {
+impl<T: WriteAt + Size, C: Signer> BlockStream<T, C> {
     fn sign_flush_meta(&mut self) -> io::Result<()> {
         self.meta_body_buf.clear();
         self.meta.sig = self
@@ -615,9 +806,9 @@ impl<T: Write + Seek, C: Signer> BlockStream<T, C> {
     }
 }
 
-impl<T: Write + Seek, C: Signer + Principaled + Decrypter> Write for BlockStream<T, C> {
-    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        self.trailered.write(buf)
+impl<T: WriteAt + Size, C: Signer + Principaled + Decrypter> WriteAt for BlockStream<T, C> {
+    fn write_at(&mut self, pos: u64, buf: &[u8]) -> io::Result<usize> {
+        self.trailered.write_at(pos, buf)
     }
 
     fn flush(&mut self) -> io::Result<()> {
@@ -625,7 +816,7 @@ impl<T: Write + Seek, C: Signer + Principaled + Decrypter> Write for BlockStream
     }
 }
 
-impl<T: Write + Seek, C: Signer + Principaled + Decrypter> WriteInteg for BlockStream<T, C> {
+impl<T: WriteAt + Size, C: Signer + Principaled + Decrypter> WriteInteg for BlockStream<T, C> {
     fn flush_integ(&mut self, integrity: &[u8]) -> io::Result<()> {
         let meta_body = &mut self.meta.body;
         let integ = meta_body.integrity.get_or_insert_with(VarHash::default);
@@ -634,17 +825,9 @@ impl<T: Write + Seek, C: Signer + Principaled + Decrypter> WriteInteg for BlockS
     }
 }
 
-impl<T: Read + Seek, C> Read for BlockStream<T, C> {
-    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        self.trailered.read(buf)
-    }
-}
-
-impl<T: Seek, C> Seek for BlockStream<T, C> {
-    /// Seeks to the given position in the stream. If a position beyond the end of the stream is
-    /// specified, the the seek will be to the end of the stream.
-    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
-        self.trailered.seek(pos)
+impl<T: ReadAt + Size, C> ReadAt for BlockStream<T, C> {
+    fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result<usize> {
+        self.trailered.read_at(pos, buf)
     }
 }
 
@@ -660,15 +843,19 @@ impl<T, C> AsMut<BlockMeta> for BlockStream<T, C> {
     }
 }
 
-impl<T, C> MetaAccess for BlockStream<T, C> {}
-
 impl<T, C> Sectored for BlockStream<T, C> {
     fn sector_sz(&self) -> usize {
         self.sect_sz
     }
 }
 
-impl<T: Read + Write + Seek, C: Creds> Block for BlockStream<T, C> {
+impl<T: Size, C> Size for BlockStream<T, C> {
+    fn size(&self) -> io::Result<Option<u64>> {
+        self.trailered.size()
+    }
+}
+
+impl<T: WriteAt + Size, C: Signer> FlushMeta for BlockStream<T, C> {
     fn flush_meta(&mut self) -> Result<()> {
         self.sign_flush_meta().map_err(|err| err.into())
     }
@@ -737,21 +924,22 @@ impl<T, C> BlockOpenOptions<T, C> {
     }
 }
 
-impl<T: Read + Write + Seek + Sectored + 'static, C: Creds + 'static> BlockOpenOptions<T, C> {
-    pub fn open(self) -> Result<Box<dyn Block>> {
+pub type ConcreteBlock<T, C> = MerkleStream<BlockStream<T, C>>;
+pub type FileBlock<C> = ConcreteBlock<std::fs::File, C>;
+
+impl<T: ReadAt + WriteAt + Size + Sectored + 'static, C: Creds + 'static> BlockOpenOptions<T, C> {
+    pub fn open_bare(self) -> Result<ConcreteBlock<T, C>> {
         let block_path = self.block_path.ok_or(BlockError::NoBlockPath)?;
         let stream = BlockStream::new(self.inner, self.creds, block_path)?;
-        let block_key = stream.meta_body().block_key().map(|e| e.to_owned())?;
         let mut stream = MerkleStream::new(stream)?;
         stream.assert_root_integrity()?;
-        if self.encrypt {
-            let stream = SecretStream::new(block_key).try_compose(stream)?;
-            let stream = SectoredBuf::new().try_compose(stream)?;
-            Ok(Box::new(stream))
-        } else {
-            let stream = SectoredBuf::new().try_compose(stream)?;
-            Ok(Box::new(stream))
-        }
+        Ok(stream)
+    }
+
+    pub fn open(self) -> Result<Accessor<ConcreteBlock<T, C>>> {
+        let stream = self.open_bare()?;
+        let stream = Accessor::new(stream)?;
+        Ok(stream)
     }
 }
 
@@ -1134,14 +1322,21 @@ impl<F: FnOnce()> Drop for DropTrigger<F> {
 
 #[cfg(test)]
 mod tests {
-    use std::{fs::OpenOptions, io::Cursor, path::PathBuf};
-
-    use crate::crypto::{ConcreteCreds, CredsPriv, SignStream};
-
-    use super::*;
     use btserde::{from_vec, to_vec};
+    use std::{
+        fs::OpenOptions,
+        io::{Cursor, Write},
+        path::PathBuf,
+    };
     use tempdir::TempDir;
-    use test_helpers::*;
+
+    use super::*;
+    use crate::{
+        crypto::{ConcreteCreds, CredsPriv},
+        sectored_buf::SectoredBuf,
+        test_helpers::*,
+        Cursor as PioCursor,
+    };
 
     #[test]
     fn brotli_compress_decompress() {
@@ -1198,9 +1393,7 @@ mod tests {
     }
 
     type EncBlock = SectoredBuf<
-        SecretStream<
-            SignStream<BlockStream<SectoredCursor<Vec<u8>>, ConcreteCreds>, ConcreteCreds>,
-        >,
+        SecretStream<PioCursor<MerkleStream<BlockStream<SectoredCursor<Vec<u8>>, ConcreteCreds>>>>,
     >;
 
     impl InMemTestCase {
@@ -1239,7 +1432,9 @@ mod tests {
                 })
                 .unwrap();
             let block_key = stream.meta_body().block_key().unwrap().to_owned();
-            let stream = SignStream::new(stream, self.node_creds.clone()).unwrap();
+            let mut stream = MerkleStream::new(stream).unwrap();
+            stream.assert_root_integrity().unwrap();
+            let stream = PioCursor::new(stream);
             let stream = SecretStream::new(block_key).try_compose(stream).unwrap();
             SectoredBuf::new().try_compose(stream).unwrap()
         }
@@ -1250,6 +1445,7 @@ mod tests {
                 .into_inner()
                 .into_inner()
                 .into_inner()
+                .into_inner()
                 .0
                 .into_inner()
         }
@@ -1325,7 +1521,7 @@ mod tests {
             fs_path
         }
 
-        fn open_new(&mut self, path: crate::BlockPath) -> Box<dyn Block> {
+        fn open_new(&mut self, path: crate::BlockPath) -> Accessor<impl Block + 'static> {
             let file = OpenOptions::new()
                 .create_new(true)
                 .read(true)
@@ -1345,7 +1541,7 @@ mod tests {
             block
         }
 
-        fn open_existing(&mut self, path: crate::BlockPath) -> Box<dyn Block> {
+        fn open_existing(&mut self, path: crate::BlockPath) -> Accessor<impl Block + 'static> {
             let file = OpenOptions::new()
                 .read(true)
                 .write(true)

+ 53 - 16
crates/btlib/src/sectored_buf.rs

@@ -2,11 +2,12 @@ pub use private::SectoredBuf;
 
 mod private {
     use log::{error, warn};
+    use positioned_io::Size;
     use std::io::{self, Read, Seek, SeekFrom, Write};
 
     use crate::{
-        bterr, Block, BlockError, BlockMeta, BoxInIoErr, Decompose, MetaAccess, ReadExt, Result,
-        Sectored, TryCompose,
+        bterr, BlockError, BlockMeta, BoxInIoErr, Decompose, MetaAccess, ReadExt, Result, Sectored,
+        Split, TryCompose, EMPTY_SLICE,
     };
 
     /// A stream which buffers writes and read such that the inner stream only sees reads and writes
@@ -36,6 +37,16 @@ mod private {
     }
 
     impl<T> SectoredBuf<T> {
+        /// Returns a reference to the inner stream.
+        pub fn get_ref(&self) -> &T {
+            &self.inner
+        }
+
+        /// Returns a mutable reference to the inner stream.
+        pub fn get_mut(&mut self) -> &mut T {
+            &mut self.inner
+        }
+
         /// Returns the offset into the internal buffer that corresponds to the current position.
         fn buf_pos(&self) -> usize {
             self.pos - self.buf_start
@@ -47,10 +58,11 @@ mod private {
         }
     }
 
-    impl<T: MetaAccess> SectoredBuf<T> {
+    impl<T: AsRef<BlockMeta>> SectoredBuf<T> {
         fn len(&self) -> usize {
             self.inner
-                .meta_body()
+                .as_ref()
+                .body
                 .secrets()
                 .unwrap()
                 .size
@@ -65,7 +77,7 @@ mod private {
         }
     }
 
-    impl<T: Read + Seek + MetaAccess> SectoredBuf<T> {
+    impl<T: Read + Seek + AsRef<BlockMeta>> SectoredBuf<T> {
         /// Fills the internal buffer by reading from the inner stream at the current position
         /// and updates `self.buf_start` with the position read from.
         fn fill_internal_buf(&mut self) -> Result<usize> {
@@ -92,13 +104,38 @@ mod private {
         }
     }
 
+    impl<T> Split<SectoredBuf<&'static [u8]>, T> for SectoredBuf<T> {
+        fn split(self) -> (SectoredBuf<&'static [u8]>, T) {
+            let new_self = SectoredBuf {
+                inner: EMPTY_SLICE,
+                buf: self.buf,
+                buf_start: self.buf_start,
+                dirty: self.dirty,
+                pos: self.pos,
+            };
+            (new_self, self.inner)
+        }
+
+        fn combine(left: SectoredBuf<&'static [u8]>, right: T) -> Self {
+            SectoredBuf {
+                inner: right,
+                buf: left.buf,
+                buf_start: left.buf_start,
+                dirty: left.dirty,
+                pos: left.pos,
+            }
+        }
+    }
+
     impl<T> Decompose<T> for SectoredBuf<T> {
         fn into_inner(self) -> T {
             self.inner
         }
     }
 
-    impl<T: Sectored + Read + Seek + MetaAccess> TryCompose<T, SectoredBuf<T>> for SectoredBuf<()> {
+    impl<T: Sectored + Read + Seek + AsRef<BlockMeta>> TryCompose<T, SectoredBuf<T>>
+        for SectoredBuf<()>
+    {
         type Error = crate::Error;
         fn try_compose(self, inner: T) -> Result<SectoredBuf<T>> {
             let sect_sz = inner.sector_sz();
@@ -194,7 +231,7 @@ mod private {
         }
     }
 
-    impl<T: Read + Seek + MetaAccess> Read for SectoredBuf<T> {
+    impl<T: Read + Seek + AsRef<BlockMeta>> Read for SectoredBuf<T> {
         fn read(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
             if self.pos == self.len() {
                 return Ok(0);
@@ -267,23 +304,23 @@ mod private {
         }
     }
 
-    impl<T: AsRef<BlockMeta>> AsRef<BlockMeta> for SectoredBuf<T> {
-        fn as_ref(&self) -> &BlockMeta {
+    impl<U, T: AsRef<U>> AsRef<U> for SectoredBuf<T> {
+        fn as_ref(&self) -> &U {
             self.inner.as_ref()
         }
     }
 
-    impl<T: AsMut<BlockMeta>> AsMut<BlockMeta> for SectoredBuf<T> {
-        fn as_mut(&mut self) -> &mut BlockMeta {
+    impl<U, T: AsMut<U>> AsMut<U> for SectoredBuf<T> {
+        fn as_mut(&mut self) -> &mut U {
             self.inner.as_mut()
         }
     }
 
-    impl<T: MetaAccess> MetaAccess for SectoredBuf<T> {}
-
-    impl<T: Block> Block for SectoredBuf<T> {
-        fn flush_meta(&mut self) -> Result<()> {
-            self.inner.flush_meta()
+    impl<T: AsRef<BlockMeta>> Size for SectoredBuf<T> {
+        /// Returns the size of the block, which the value stored in the block's metadata, not
+        /// the length of the inner stream.
+        fn size(&self) -> io::Result<Option<u64>> {
+            Ok(Some(self.inner.as_ref().body.secrets()?.size))
         }
     }
 }

+ 165 - 14
crates/btlib/src/test_helpers.rs

@@ -1,7 +1,5 @@
 /// Test data and functions to help with testing.
-use super::*;
 use btserde::{Error, Result};
-use crypto::{self, *};
 use fuse_backend_rs::{
     api::filesystem::{ZeroCopyReader, ZeroCopyWriter},
     file_buf::FileVolatileSlice,
@@ -15,6 +13,9 @@ use std::{
 };
 use vm_memory::bytes::Bytes;
 
+use super::*;
+use crate::{crypto::*, sectored_buf::SectoredBuf, Cursor as PioCursor};
+
 pub const PRINCIPAL: [u8; 32] = [
     0x75, 0x28, 0xA9, 0xE0, 0x9D, 0x24, 0xBA, 0xB3, 0x79, 0x56, 0x15, 0x68, 0xFD, 0xA4, 0xE2, 0xA4,
     0xCF, 0xB2, 0xC0, 0xE3, 0x96, 0xAE, 0xA2, 0x6E, 0x45, 0x15, 0x50, 0xED, 0xA6, 0xBE, 0x6D, 0xEC,
@@ -196,7 +197,7 @@ pub fn make_self_signed_writecap_with<C: Creds>(key: &C) -> Writecap {
     writecap
 }
 
-pub fn make_block_with<C: CredsPub>(creds: &C) -> Box<dyn Block> {
+pub fn make_block_with<C: CredsPub>(creds: &C) -> SectoredBuf<SecretStream<PioCursor<impl Block>>> {
     let block_key = SymKey::generate(SymKeyKind::default()).unwrap();
     let mut readcaps = BTreeMap::new();
     readcaps.insert(creds.principal(), creds.ser_encrypt(&block_key).unwrap());
@@ -228,12 +229,11 @@ pub fn make_block_with<C: CredsPub>(creds: &C) -> Box<dyn Block> {
     stream.meta.sig = sig;
     let stream = MerkleStream::new(stream).expect("create merkle stream failed");
     let stream = SecretStream::new(block_key)
-        .try_compose(stream)
+        .try_compose(PioCursor::new(stream))
         .expect("create secret stream failed");
-    let stream = SectoredBuf::new()
+    SectoredBuf::new()
         .try_compose(stream)
-        .expect("create sectored buf failed");
-    Box::new(stream)
+        .expect("failed to compose with SectoredBuf")
 }
 
 /// This function can be run as a test to write a new RSA key pair, as two Rust arrays,
@@ -322,12 +322,12 @@ impl Iterator for Randomizer {
         self.state.copy_from_slice(&self.buf);
         let int_bytes = self.buf.as_slice()[..BYTES]
             .try_into()
-            .expect("failed to convert array");
+            .expect("failed to convert to array");
         Some(usize::from_ne_bytes(int_bytes))
     }
 }
 
-pub fn make_sector(mut buf: &mut [u8], sect_index: usize) {
+pub fn write_sector(mut buf: &mut [u8], sect_index: usize) {
     let data = (sect_index + 1).to_ne_bytes();
     for chunk in std::iter::repeat(data).take(buf.len() / data.len()) {
         (&mut buf[..chunk.len()]).copy_from_slice(chunk.as_slice());
@@ -338,7 +338,7 @@ pub fn make_sector(mut buf: &mut [u8], sect_index: usize) {
 pub fn write_fill<W: Write>(mut write: W, sect_sz: usize, sect_ct: usize) {
     let mut buf = vec![0u8; sect_sz];
     for sect_index in 0..sect_ct {
-        make_sector(&mut buf, sect_index);
+        write_sector(&mut buf, sect_index);
         write.write_all(&mut buf).expect("write failed");
     }
     write.flush().expect("flush failed");
@@ -348,12 +348,53 @@ pub fn read_check<R: Read>(mut read: R, sect_sz: usize, sect_ct: usize) {
     let mut actual = vec![0u8; sect_sz];
     let mut expected = vec![0u8; sect_sz];
     for sect_index in 0..sect_ct {
-        make_sector(&mut expected, sect_index);
+        write_sector(&mut expected, sect_index);
+        read.read_exact(&mut actual).expect("read failed");
+        assert_eq!(expected, actual);
+    }
+}
+
+pub fn write_indices<W: Write + Seek, I: Iterator<Item = usize>>(
+    mut write: W,
+    sect_sz: usize,
+    indices: I,
+) {
+    let mut buf = vec![0u8; sect_sz];
+    for sect_index in indices {
+        let offset = sect_index * sect_sz;
+        write
+            .seek(SeekFrom::Start(offset as u64))
+            .expect("seek failed");
+        write_sector(&mut buf, sect_index);
+        write.write_all(&mut buf).expect("write failed");
+    }
+    write.flush().expect("flush failed");
+}
+
+pub fn read_indices<R: Read + Seek, I: Iterator<Item = usize>>(
+    mut read: R,
+    sect_sz: usize,
+    indices: I,
+) {
+    let mut actual = vec![0u8; sect_sz];
+    let mut expected = vec![0u8; sect_sz];
+    for sect_index in indices {
+        let offset = sect_index * sect_sz;
+        read.seek(SeekFrom::Start(offset as u64))
+            .expect("seek failed");
+        write_sector(&mut expected, sect_index);
         read.read_exact(&mut actual).expect("read failed");
         assert_eq!(expected, actual);
     }
 }
 
+pub fn random_indices<'a>(
+    rando: &'a mut Randomizer,
+    sect_ct: usize,
+) -> impl Iterator<Item = usize> + 'a {
+    rando.take(sect_ct).map(move |e| e % sect_ct)
+}
+
 /// Trait for types which can be referenced as slices and which support conversion from `Vec<u8>`.
 pub trait FromVec: AsRef<[u8]> {
     fn from_vec(vec: Vec<u8>) -> Self;
@@ -471,12 +512,57 @@ impl<const N: usize> Write for BtCursor<[u8; N]> {
     }
 }
 
+impl WriteAt for BtCursor<Vec<u8>> {
+    fn write_at(&mut self, pos: u64, buf: &[u8]) -> io::Result<usize> {
+        let pos: usize = pos.try_into().box_err()?;
+        let vec = self.cursor.get_mut().get_mut();
+        let end = pos + buf.len();
+        if end > vec.len() {
+            vec.resize(end, 0);
+        }
+        let slice = &mut vec[pos..end];
+        slice.copy_from_slice(buf);
+        Ok(end - pos)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        Ok(())
+    }
+}
+
+impl<const N: usize> WriteAt for BtCursor<[u8; N]> {
+    fn write_at(&mut self, pos: u64, buf: &[u8]) -> io::Result<usize> {
+        let pos: usize = pos.try_into().box_err()?;
+        let slice = self.cursor.get_mut().get_mut().as_mut();
+        let end = pos + buf.len();
+        let slice = &mut slice[pos..end];
+        slice.copy_from_slice(buf);
+        Ok(end - pos)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        Ok(())
+    }
+}
+
 impl<T: FromVec> Read for BtCursor<T> {
     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
         self.cursor.get_mut().read(buf)
     }
 }
 
+impl<T: FromVec> ReadAt for BtCursor<T> {
+    fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result<usize> {
+        let pos: usize = pos.try_into().box_err()?;
+        let borrow = self.cursor.try_borrow().box_err()?;
+        let slice: &[u8] = borrow.get_ref().as_ref();
+        let end = slice.len().min(pos + buf.len());
+        let buf = &mut buf[..(end - pos)];
+        buf.copy_from_slice(&slice[pos..end]);
+        Ok(end - pos)
+    }
+}
+
 impl<T: FromVec> Seek for BtCursor<T> {
     fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
         self.cursor.get_mut().seek(pos)
@@ -582,6 +668,14 @@ impl<const N: usize> ZeroCopyWriter for BtCursor<[u8; N]> {
     }
 }
 
+impl<T: FromVec> Size for BtCursor<T> {
+    fn size(&self) -> io::Result<Option<u64>> {
+        let borrow = self.cursor.borrow();
+        let slice: &[u8] = borrow.get_ref().as_ref();
+        Ok(Some(slice.len() as u64))
+    }
+}
+
 #[derive(Debug, PartialEq, Serialize, Deserialize)]
 pub struct SectoredCursor<T: FromVec> {
     cursor: BtCursor<T>,
@@ -627,7 +721,7 @@ impl Write for SectoredCursor<Vec<u8>> {
     }
 
     fn flush(&mut self) -> io::Result<()> {
-        self.cursor.flush()
+        Write::flush(&mut self.cursor)
     }
 }
 
@@ -640,7 +734,51 @@ impl<const N: usize> Write for SectoredCursor<[u8; N]> {
     }
 
     fn flush(&mut self) -> io::Result<()> {
-        self.cursor.flush()
+        Write::flush(&mut self.cursor)
+    }
+}
+
+impl WriteAt for SectoredCursor<Vec<u8>> {
+    fn write_at(&mut self, pos: u64, buf: &[u8]) -> io::Result<usize> {
+        self.cursor.write_at(pos, buf)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        Write::flush(&mut self.cursor)
+    }
+}
+
+impl<const N: usize> WriteAt for SectoredCursor<[u8; N]> {
+    fn write_at(&mut self, pos: u64, buf: &[u8]) -> io::Result<usize> {
+        self.cursor.write_at(pos, buf)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        Write::flush(&mut self.cursor)
+    }
+}
+
+impl<T: FromVec> SectoredCursor<T> {
+    fn flush_integ_impl(&mut self, integrity: &[u8]) -> io::Result<()> {
+        let dest = self
+            .meta
+            .body
+            .integrity
+            .get_or_insert_with(<VarHash as Default>::default);
+        dest.as_mut_slice().copy_from_slice(integrity);
+        Ok(())
+    }
+}
+
+impl WriteInteg for SectoredCursor<Vec<u8>> {
+    fn flush_integ(&mut self, integrity: &[u8]) -> io::Result<()> {
+        self.flush_integ_impl(integrity)
+    }
+}
+
+impl<const N: usize> WriteInteg for SectoredCursor<[u8; N]> {
+    fn flush_integ(&mut self, integrity: &[u8]) -> io::Result<()> {
+        self.flush_integ_impl(integrity)
     }
 }
 
@@ -653,6 +791,15 @@ impl<T: FromVec> Read for SectoredCursor<T> {
     }
 }
 
+impl<T: FromVec> ReadAt for SectoredCursor<T> {
+    fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result<usize> {
+        if self.require_sect_sz {
+            self.assert_sector_sz(buf.len())?;
+        }
+        self.cursor.read_at(pos, buf)
+    }
+}
+
 impl<T: FromVec> Seek for SectoredCursor<T> {
     fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
         self.cursor.seek(pos)
@@ -671,4 +818,8 @@ impl<T: FromVec> AsMut<BlockMeta> for SectoredCursor<T> {
     }
 }
 
-impl<T: FromVec> MetaAccess for SectoredCursor<T> {}
+impl<T: FromVec> Size for SectoredCursor<T> {
+    fn size(&self) -> io::Result<Option<u64>> {
+        self.cursor.size()
+    }
+}

+ 70 - 149
crates/btlib/src/trailered.rs

@@ -2,15 +2,16 @@ pub use private::Trailered;
 
 mod private {
     use std::{
-        io::{self, BufReader, Read, Seek, SeekFrom, Write},
+        io::{self, BufReader, Seek, SeekFrom},
         marker::PhantomData,
     };
 
     use btserde::{read_from, write_to};
+    use positioned_io::{ReadAt, Size, WriteAt};
     use serde::{de::DeserializeOwned, Serialize};
 
     use crate::{
-        Block, BlockMeta, BoxInIoErr, Decompose, MetaAccess, Result, Sectored, WriteInteg,
+        bterr, BoxInIoErr, Cursor, Decompose, FlushMeta, Result, Sectored, SizeExt, WriteInteg,
     };
 
     /// A struct which wraps a stream and which writes a trailing data structure to it when flushed.
@@ -21,7 +22,7 @@ mod private {
         write_buf: Vec<u8>,
     }
 
-    impl<T: Read + Seek, D: DeserializeOwned> Trailered<T, D> {
+    impl<T: ReadAt + Size, D: DeserializeOwned> Trailered<T, D> {
         pub fn empty(inner: T) -> Trailered<T, D> {
             Trailered {
                 inner,
@@ -34,20 +35,24 @@ mod private {
         /// Creates a new `Trailered<T>` containing the given `T`. This method requires that the given
         /// stream is either empty, or contains a valid serialization of `D` and the offset at which
         /// `D` is stored.
-        pub fn new(mut inner: T) -> Result<(Trailered<T, D>, Option<D>)> {
-            let pos = inner.stream_position()?;
-            let end = inner.seek(SeekFrom::End(0))?;
+        pub fn new(inner: T) -> Result<(Trailered<T, D>, Option<D>)> {
+            let end = inner.size_or_err()?;
             if 0 == end {
                 return Ok((Self::empty(inner), None));
             }
-            let mut reader = BufReader::new(inner);
+            let mut reader = BufReader::new(Cursor::new(inner));
             let offset: i64 = std::mem::size_of::<i64>() as i64;
+            if end < offset as u64 {
+                return Err(bterr!("inner stream is non-empty but too small"));
+            }
             reader.seek(SeekFrom::End(-offset))?;
             let offset: i64 = read_from(&mut reader)?;
+            if end < offset.unsigned_abs() {
+                return Err(bterr!("inner stream is non-empty but too small"));
+            }
             let body_len = reader.seek(SeekFrom::End(offset))?;
             let trailer: D = read_from(&mut reader)?;
-            let mut inner = reader.into_inner();
-            inner.seek(SeekFrom::Start(pos))?;
+            let inner = reader.into_inner().into_inner();
             Ok((
                 Trailered {
                     inner,
@@ -60,88 +65,56 @@ mod private {
         }
     }
 
-    impl<T: Seek, D> Trailered<T, D> {
-        fn post_write(&mut self, written: usize) -> io::Result<usize> {
-            if 0 == written {
-                return Ok(0);
-            }
-            // I cannot return an error at this point because bytes have already been written to inner.
-            // So if I can't track the body len due to a failure, a panic is the only option.
-            let pos = self
-                .inner
-                .stream_position()
-                .expect("failed to get stream position");
-            self.body_len = self.body_len.max(pos);
-            Ok(written)
+    impl<T, D> Trailered<T, D> {
+        fn update_body_len(&mut self, pos: u64, written: usize) -> usize {
+            let new_pos = pos + written as u64;
+            self.body_len = self.body_len.max(new_pos);
+            written
         }
     }
 
-    impl<T: Read + Seek, D> Read for Trailered<T, D> {
-        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-            let pos = self.inner.stream_position()?;
+    impl<T: ReadAt, D> ReadAt for Trailered<T, D> {
+        fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result<usize> {
+            if pos > self.body_len {
+                return Err(bterr!("pos {pos} is past the end of body ({})", self.body_len).into());
+            }
             let available_u64 = self.body_len - pos;
             let available: usize = available_u64.try_into().box_err()?;
             let limit = buf.len().min(available);
-            self.inner.read(&mut buf[..limit])
+            self.inner.read_at(pos, &mut buf[..limit])
         }
     }
 
-    impl<T: Write + Seek, D: Serialize> Trailered<T, D> {
-        pub fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-            let written = self.inner.write(buf)?;
-            self.post_write(written)
+    impl<T: WriteAt, D: Serialize> Trailered<T, D> {
+        pub fn write_at(&mut self, pos: u64, buf: &[u8]) -> io::Result<usize> {
+            let written = self.inner.write_at(pos, buf)?;
+            Ok(self.update_body_len(pos, written))
         }
 
-        fn write_trailer(&mut self, trailer: &D) -> io::Result<u64> {
-            let pos = self.inner.stream_position()?;
-            self.inner.seek(SeekFrom::Start(self.body_len))?;
+        /// Writes the trailer followed by the offset relative to the end of the inner stream where
+        /// the trailer was written. See the `Trailered::new` method for how this is read out of
+        /// the inner stream.
+        fn write_trailer(&mut self, trailer: &D) -> io::Result<()> {
             self.write_buf.clear();
             write_to(trailer, &mut self.write_buf)?;
-            let offset_u64 =
-                8 + self.inner.stream_position()? + self.write_buf.len() as u64 - self.body_len;
-            let offset = -(offset_u64 as i64);
-            write_to(&offset, &mut self.write_buf)?;
-            self.inner.write_all(&self.write_buf)?;
-            Ok(pos)
+            // This is the length of `write_buf` after writing an additional `i64` into it.
+            let write_buf_len = (self.write_buf.len() + std::mem::size_of::<i64>()) as u64;
+            let offset: i64 = write_buf_len.try_into().box_err()?;
+            write_to(&(-offset), &mut self.write_buf)?;
+            self.inner.write_all_at(self.body_len, &self.write_buf)?;
+            Ok(())
         }
 
         pub fn flush(&mut self, trailer: &D) -> io::Result<()> {
-            let prev_pos = self.write_trailer(trailer)?;
-            self.inner.flush()?;
-            self.inner.seek(SeekFrom::Start(prev_pos))?;
-            Ok(())
+            self.write_trailer(trailer)?;
+            self.inner.flush()
         }
     }
 
-    impl<T: WriteInteg + Seek, D: Serialize> Trailered<T, D> {
+    impl<T: WriteInteg + Size, D: Serialize> Trailered<T, D> {
         pub fn flush_integ(&mut self, trailer: &D, integrity: &[u8]) -> io::Result<()> {
-            let prev_pos = self.write_trailer(trailer)?;
-            self.inner.flush_integ(integrity)?;
-            self.inner.seek(SeekFrom::Start(prev_pos))?;
-            Ok(())
-        }
-    }
-
-    impl<T: Seek, D> Seek for Trailered<T, D> {
-        fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
-            /// Adds a signed integer to an unsigned integer and returns the result.
-            fn add_signed(unsigned: u64, signed: i64) -> u64 {
-                if signed >= 0 {
-                    unsigned + signed as u64
-                } else {
-                    unsigned - (-signed as u64)
-                }
-            }
-
-            let from_start = match pos {
-                SeekFrom::Start(from_start) => from_start,
-                SeekFrom::Current(from_curr) => {
-                    add_signed(self.inner.stream_position()?, from_curr)
-                }
-                SeekFrom::End(from_end) => add_signed(self.body_len, from_end),
-            };
-            let from_start = from_start.min(self.body_len);
-            self.inner.seek(SeekFrom::Start(from_start))
+            self.write_trailer(trailer)?;
+            self.inner.flush_integ(integrity)
         }
     }
 
@@ -151,21 +124,19 @@ mod private {
         }
     }
 
-    impl<T: AsRef<BlockMeta>, D> AsRef<BlockMeta> for Trailered<T, D> {
-        fn as_ref(&self) -> &BlockMeta {
+    impl<U, T: AsRef<U>, D> AsRef<U> for Trailered<T, D> {
+        fn as_ref(&self) -> &U {
             self.inner.as_ref()
         }
     }
 
-    impl<T: AsMut<BlockMeta>, D> AsMut<BlockMeta> for Trailered<T, D> {
-        fn as_mut(&mut self) -> &mut BlockMeta {
+    impl<U, T: AsMut<U>, D> AsMut<U> for Trailered<T, D> {
+        fn as_mut(&mut self) -> &mut U {
             self.inner.as_mut()
         }
     }
 
-    impl<T: MetaAccess, D> MetaAccess for Trailered<T, D> {}
-
-    impl<T: Block, D> Trailered<T, D> {
+    impl<T: FlushMeta, D> Trailered<T, D> {
         pub fn flush_meta(&mut self) -> Result<()> {
             self.inner.flush_meta()
         }
@@ -176,14 +147,20 @@ mod private {
             self.inner.sector_sz()
         }
     }
+
+    impl<T, D> Size for Trailered<T, D> {
+        fn size(&self) -> io::Result<Option<u64>> {
+            Ok(Some(self.body_len))
+        }
+    }
 }
 
 #[cfg(test)]
 mod tests {
-    use crate::Decompose;
-
     use super::*;
-    use std::io::{Cursor, Read, Seek, SeekFrom};
+    use positioned_io::ReadAt;
+
+    use crate::{test_helpers::BtCursor as Cursor, Decompose};
 
     /// Tests that a new `Trailered<T>` can be created from an empty stream.
     #[test]
@@ -230,91 +207,35 @@ mod tests {
     #[test]
     fn trailered_written_data_persisted() {
         const EXPECTED: &[u8] = b"and every life has something to teach us.";
-        let mut cursor = {
+
+        let cursor = {
             let (mut trailered, _) = Trailered::<_, u8>::new(Cursor::new(Vec::new()))
                 .expect("failed to create first trailered");
-            trailered.write(EXPECTED).expect("write failed");
+            trailered.write_at(0, EXPECTED).expect("write failed");
             trailered.flush(&1).expect("flush failed");
             trailered.into_inner()
         };
-        cursor.seek(SeekFrom::Start(0)).expect("seek failed");
-        let (mut trailered, _) =
+
+        let (trailered, _) =
             Trailered::<_, u8>::new(cursor).expect("failed to created second trailered");
-        let mut actual = vec![0u8; EXPECTED.len()];
 
-        trailered.read(&mut actual).expect("read failed");
+        let mut actual = vec![0u8; EXPECTED.len()];
+        trailered.read_at(0, &mut actual).expect("read failed");
 
         assert_eq!(EXPECTED, actual);
     }
 
-    fn trailered_for_seek_test() -> Trailered<impl Read + Seek, u8> {
-        let (mut trailered, _) =
-            Trailered::new(Cursor::new(Vec::new())).expect("failed to create trailered");
-        trailered
-            .write(&[0, 1, 2, 3, 4, 5, 6, 7])
-            .expect("write failed");
-        trailered.seek(SeekFrom::Start(0)).expect("seek failed");
-        trailered
-    }
-
-    #[test]
-    fn trailered_seek_from_start() {
-        const EXPECTED: u8 = 2;
-        let mut trailered = trailered_for_seek_test();
-
-        trailered
-            .seek(SeekFrom::Start(EXPECTED as u64))
-            .expect("seek failed");
-
-        let mut actual = [0u8; 1];
-        trailered.read(&mut actual).expect("read failed");
-        assert_eq!(EXPECTED, actual[0]);
-    }
-
-    #[test]
-    fn trailered_seek_from_curr() {
-        const EXPECTED: u8 = 5;
-        let mut trailered = trailered_for_seek_test();
-        trailered
-            .seek(SeekFrom::Start(6))
-            .expect("seek from start failed");
-
-        trailered
-            .seek(SeekFrom::Current(-1))
-            .expect("seek from current failed");
-
-        let mut actual = [0u8; 1];
-        trailered.read(&mut actual).expect("read failed");
-        assert_eq!(EXPECTED, actual[0]);
-    }
-
-    #[test]
-    fn trailered_seek_from_end() {
-        const EXPECTED: u8 = 7;
-        let mut trailered = trailered_for_seek_test();
-
-        trailered.seek(SeekFrom::End(-1)).expect("seek failed");
-
-        let mut actual = [0u8; 1];
-        trailered.read(&mut actual).expect("read failed");
-        assert_eq!(EXPECTED, actual[0]);
-    }
-
     /// Tests that a read past the end of the body in a `Trailered<T>` is not allowed.
     #[test]
     fn trailered_read_limited_to_body_len() {
-        let (mut trailered, trailer) =
-            Trailered::new(Cursor::new(Vec::new())).expect("failed to create Trailered");
-        assert!(trailer.is_none());
         const EXPECTED: &[u8] = &[1, 1, 1, 1, 1, 0, 0, 0];
-        trailered.write(&[1u8; 5]).expect("write failed");
+        let (mut trailered, ..) =
+            Trailered::new(Cursor::new(Vec::new())).expect("failed to create Trailered");
+
+        trailered.write_at(0, &[1u8; 5]).expect("write failed");
         trailered.flush(&1u8).expect("flush failed");
-        trailered.seek(SeekFrom::Start(0)).expect("seek failed");
         let mut actual = vec![0u8; EXPECTED.len()];
-
-        // If read goes past the end of the body then there will be a 1 in the sixth position of
-        // actual.
-        trailered.read(&mut actual).expect("read failed");
+        trailered.read_at(0, &mut actual).expect("read failed");
 
         assert_eq!(EXPECTED, actual);
     }

+ 92 - 0
doc/Talks/Overview/Overview.tex

@@ -0,0 +1,92 @@
+\documentclass{beamer}
+\usepackage{xcolor}
+\definecolor{primary}{rgb}{0.125, 0.424, 0.243}
+
+\usepackage[default]{opensans}
+
+\usepackage{graphicx}
+\usepackage{textcomp}
+
+\title{Blocktree}
+\subtitle{A platform for distributed computing.}
+\author{Matt Carr \\ matt@carrfound.org}
+
+\begin{document}
+
+\frame{\titlepage}
+
+\begin{frame}
+\frametitle{Problem}
+Distributed systems are hard.
+\begin{itemize}
+\item<2-> High performance messaging is often outsourced (RabbitMQ, Amazon SQS).
+\item<3-> Scaling stateful systems continues to be a challenge.
+\item<4-> The speed of light is a fundamental limit on latency which forces us to move data closer
+  to the user.
+\end{itemize}
+\end{frame}
+
+\begin{frame}{}
+\frametitle{Proposed Solution}
+Reduce latency and the scope of the problem by moving data onto a user's own devices.
+\begin{itemize}
+\item<2-> Create a high performance message passing API. Zero copy locally, with transparent
+forwarding over the network.
+\item<3-> Create an end-to-end encrypted distributed file system which is served by a user's
+    devices.
+\item<4-> Allow user's to securely share files using cryptographic mechanisms.
+\item<5-> Allow a WebAssembly runtime and native containers to access this API.
+\end{itemize}
+\end{frame}
+
+\begin{frame}
+\frametitle{User Value Proposition}
+There are many reasons why user's might prefer this to the status quo.
+\begin{itemize}
+\item<2-> Control over personal data.
+\item<3-> Privacy provided by cryptography, not legislation.
+\item<4-> Freedom from manipulative advertising.
+\end{itemize}
+\end{frame}
+
+\begin{frame}
+\frametitle{The Core Idea}
+\begin{itemize}
+\item<2-> All data and running programs are identified by paths in a single global namespace.
+\begin{center}
+\scriptsize{/0!dSip4J0kurN5VhVo\_aTipM-ywOOWrqJuRRVQ7aa-bew/docs/otf\_app.ods}
+\end{center}
+\item<3-> The first component of the path specifies a signing key which is the root of trust for
+  for all paths under it.
+\item<4-> Messages are only trusted if they're signed by a key which is
+  transitively trusted by the root.
+\item<5-> Anyone can verify a file by checking the digital signature it bears is transitively
+  trusted.
+\end{itemize}
+\end{frame}
+
+\begin{frame}
+\frametitle{OSS Alternatives} 
+There are many other decentralized systems out there. Blocktree is different in several ways.
+\begin{itemize}
+\item<2-> It avoids the scaling issues of blockchains by limiting the scope of consensus (Etherium).
+\item<3-> It offers mutable storage, which is what real-world applications need. This is difficult
+  with content addressing systems (IPFS, WNFS).
+\item<4-> Approaches which standardize the format of user data (Solid Pods) fail to address the
+  question of who will store it.
+\end{itemize}
+\end{frame}
+
+\begin{frame}
+\frametitle{Current Status and Use of Funds} 
+\begin{itemize}
+\item<3-> The metadata handling, encryption, and integrity protection mechanisms have been
+  implemented.
+\item<4-> Support for securely using keys in a TPM is supported.
+\item<5-> A FUSE daemon which allows the file system to be mounted under Linux has been written.
+\item<6-> Funds from the OTF would allow me to continue working on the project full-time and allow
+  me to hire additional developers.
+\end{itemize}
+\end{frame}
+
+\end{document}

이 변경점에서 너무 많은 파일들이 변경되어 몇몇 파일들은 표시되지 않았습니다.