Răsfoiți Sursa

Started implementing a buffering stream.

Matthew Carr 2 ani în urmă
părinte
comite
02756cf24a

+ 37 - 0
crates/btnode/Cargo.lock

@@ -11,6 +11,21 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "alloc-no-stdlib"
+version = "2.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "35ef4730490ad1c4eae5c4325b2a95f521d023e5c885853ff7aca0a6a1631db3"
+
+[[package]]
+name = "alloc-stdlib"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "697ed7edc0f1711de49ce108c541623a0af97c6c60b2f6e2b65229847ac843c2"
+dependencies = [
+ "alloc-no-stdlib",
+]
+
 [[package]]
 name = "ansi_term"
 version = "0.12.1"
@@ -87,11 +102,33 @@ version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
 
+[[package]]
+name = "brotli"
+version = "3.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a1a0b1dbcc8ae29329621f8d4f0d835787c1c38bb1401979b49d13b0b305ff68"
+dependencies = [
+ "alloc-no-stdlib",
+ "alloc-stdlib",
+ "brotli-decompressor",
+]
+
+[[package]]
+name = "brotli-decompressor"
+version = "2.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "59ad2d4653bf5ca36ae797b1f4bb4dbddb60ce49ca4aed8a2ce4829f60425b80"
+dependencies = [
+ "alloc-no-stdlib",
+ "alloc-stdlib",
+]
+
 [[package]]
 name = "btnode"
 version = "0.1.0"
 dependencies = [
  "base64-url",
+ "brotli",
  "ctor",
  "env_logger",
  "foreign-types",

+ 1 - 0
crates/btnode/Cargo.toml

@@ -22,6 +22,7 @@ tss-esapi-sys = "0.3.0"
 foreign-types = "0.3.1"
 zeroize = { version = "1.5.7", features = ["zeroize_derive"] }
 static_assertions = "1.1.0"
+brotli = "3.3.4"
 
 [dev-dependencies]
 tempdir = "0.3.7"

+ 86 - 41
crates/btnode/src/crypto/mod.rs

@@ -2,6 +2,8 @@
 
 mod tpm;
 
+use crate::Decompose;
+
 use super::{
     Block,
     Writecap,
@@ -163,6 +165,12 @@ impl From<Error> for io::Error {
     }
 }
 
+impl From<crate::Error> for Error {
+    fn from(err: crate::Error) -> Self {
+        Error::custom(err)
+    }
+}
+
 pub(crate) type Result<T> = std::result::Result<T, Error>;
 
 /// Returns an array of the given length filled with cryptographically strong random data.
@@ -438,7 +446,7 @@ impl AeadKey {
 
 #[derive(Debug, PartialEq, Serialize, Deserialize, Clone, EnumDiscriminants)]
 #[strum_discriminants(name(SymKeyKind))]
-pub(crate) enum SymKey {
+pub enum SymKey {
     /// A key for the AES 256 cipher in Cipher Block Chaining mode. Note that this includes the
     /// initialization vector, so that a value of this variant contains all the information needed
     /// to fully initialize a cipher context.
@@ -1158,20 +1166,6 @@ fn exp2(x: isize) -> usize {
     }
 }
 
-trait SectoredExt : Sectored {
-    fn assert_sector_sz(&self, actual: usize) -> Result<()> {
-        let expected = self.sector_sz();
-        if expected == actual {
-            Ok(())
-        }
-        else {
-            Err(Error::IncorrectSize { expected, actual })
-        }
-    }
-}
-
-impl<T: Sectored> SectoredExt for T {}
-
 /// Trait for types which can be used as nodes in a `MerkleTree`.
 trait MerkleNode: Default + Serialize + for<'de> Deserialize<'de> {
     /// The kind of hash algorithm that this `HashData` uses.
@@ -1580,12 +1574,19 @@ impl<T, H> Sectored for MerkleStream<T, H> {
 }
 
 impl<T, H> Compose<T, MerkleStream<T, H>> for MerkleStream<(), H> {
-    fn compose(self, inner: T) -> MerkleStream<T, H> {
-        MerkleStream {
+    type Error = Error;
+    fn compose(self, inner: T) -> Result<MerkleStream<T, H>> {
+        Ok(MerkleStream {
             inner,
             tree: self.tree,
             offset: self.offset,
-        }
+        })
+    }
+}
+
+impl<T, H> Decompose<T> for MerkleStream<T, H> {
+    fn into_inner(self) -> T {
+        self.inner
     }
 }
 
@@ -1622,7 +1623,7 @@ impl<T: Seek, H> Seek for MerkleStream<T, H> {
 }
 
 // A stream which encrypts all data written to it and decrypts all data read from it.
-struct SecretStream<T> {
+pub struct SecretStream<T> {
     inner: T,
     // The sector size of the inner stream. Reads and writes are only executed using buffers of
     // this size.
@@ -1654,7 +1655,7 @@ impl<T> SecretStream<T> {
 }
 
 impl SecretStream<()> {
-    fn new(key: SymKey) -> SecretStream<()> {
+    pub fn new(key: SymKey) -> SecretStream<()> {
         SecretStream {
             inner: (),
             inner_sect_sz: 0,
@@ -1666,8 +1667,15 @@ impl SecretStream<()> {
     }
 }
 
+impl<T> Decompose<T> for SecretStream<T> {
+    fn into_inner(self) -> T {
+        self.inner
+    }
+}
+
 impl<T, U: Sectored> Compose<U, SecretStream<U>> for SecretStream<T> {
-    fn compose(mut self, inner: U) -> SecretStream<U> {
+    type Error = Error;
+    fn compose(mut self, inner: U) -> Result<SecretStream<U>> {
         let inner_sect_sz = inner.sector_sz();
         let expansion_sz = self.key.expansion_sz();
         let sect_sz = inner_sect_sz - expansion_sz;
@@ -1677,14 +1685,14 @@ impl<T, U: Sectored> Compose<U, SecretStream<U>> for SecretStream<T> {
         }
         self.pt_buf.resize(inner_sect_sz, 0);
         self.pt_buf.resize(inner_sect_sz + block_sz, 0);
-        SecretStream {
+        Ok(SecretStream {
             inner,
             inner_sect_sz,
             sect_sz: inner_sect_sz - expansion_sz,
             key: self.key,
             ct_buf: self.ct_buf,
             pt_buf: self.pt_buf,
-        }
+        })
     }
 }
 
@@ -1696,12 +1704,7 @@ impl<T> Sectored for SecretStream<T> {
 
 impl<T: Write> Write for SecretStream<T> {
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
-        if buf.len() != self.sect_sz {
-            return Err(io::Error::new(
-                ErrorKind::Other,
-                Error::IncorrectSize { actual: buf.len(), expected: self.sect_sz, }
-            ));
-        }
+        self.assert_sector_sz(buf.len())?;
 
         self.ct_buf.resize(self.inner_sect_sz, 0);
         let mut encrypter = self.key.to_encrypter()?;
@@ -1719,12 +1722,7 @@ impl<T: Write> Write for SecretStream<T> {
 
 impl<T: Read> Read for SecretStream<T> {
     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        if buf.len() != self.sect_sz {
-            return Err(io::Error::new(
-                ErrorKind::Other,
-                Error::IncorrectSize { actual: buf.len(), expected: self.sect_sz, }
-            ));
-        }
+        self.assert_sector_sz(buf.len())?;
 
         self.ct_buf.resize(self.inner_sect_sz, 0);
         self.inner.read_exact(&mut self.ct_buf)?;
@@ -1960,6 +1958,7 @@ mod tests {
     use crate::{
         SECTOR_SZ_DEFAULT,
         test_helpers::*,
+        BrotliParams,
     };
     use super::*;
     use std::{
@@ -2143,7 +2142,8 @@ mod tests {
         key: SymKey, inner_sect_sz: usize, sect_ct: usize
     ) {
         let mut stream = SecretStream::new(key)
-            .compose(SectoredCursor::new(vec![0u8; inner_sect_sz * sect_ct], inner_sect_sz));
+            .compose(SectoredCursor::new(vec![0u8; inner_sect_sz * sect_ct], inner_sect_sz))
+            .expect("compose failed");
         let sector_sz = stream.sector_sz();
         for k in 0..sect_ct {
             let sector = vec![k as u8; sector_sz];
@@ -2177,7 +2177,8 @@ mod tests {
         rando: Randomizer, key: SymKey, inner_sect_sz: usize, sect_ct: usize
     ) {
         let mut stream = SecretStream::new(key)
-            .compose(SectoredCursor::new(vec![0u8; inner_sect_sz * sect_ct], inner_sect_sz));
+            .compose(SectoredCursor::new(vec![0u8; inner_sect_sz * sect_ct], inner_sect_sz))
+            .expect("compose failed");
         let sect_sz = stream.sector_sz();
         let indices: Vec<usize> = rando.take(sect_ct).map(|e| e % sect_ct).collect();
         for index in indices.iter().map(|e| *e) {
@@ -2224,7 +2225,7 @@ mod tests {
             vec![0u8; num_sectors * SECTOR_SZ_DEFAULT],
             SECTOR_SZ_DEFAULT
         );
-        SecretStream::new(key).compose(inner)
+        SecretStream::new(key).compose(inner).expect("compose failed")
     }
 
     #[test]
@@ -2243,7 +2244,7 @@ mod tests {
                     vec![0u8; num_sectors * SECTOR_SZ_DEFAULT],
                     SECTOR_SZ_DEFAULT
                 );
-                SecretStream::new(key).compose(inner)
+                SecretStream::new(key).compose(inner).expect("compose failed")
             }
         }
 
@@ -2364,7 +2365,8 @@ mod tests {
     fn merkle_stream_sequential_test_case(sect_sz: usize, sect_count: usize) {
         let mut stream = MerkleStream
             ::new(MerkleTree::<Sha2_256>::empty(sect_sz))
-            .compose(Cursor::new(vec![0u8; sect_count * sect_sz]));
+            .compose(Cursor::new(vec![0u8; sect_count * sect_sz]))
+            .expect("compose failed");
         for k in 1..(sect_count + 1) {
             let sector = vec![k as u8; sect_sz];
             stream.write(&sector).expect("write failed");
@@ -2390,7 +2392,8 @@ mod tests {
     fn merkle_stream_random_test_case(rando: Randomizer, sect_sz: usize, sect_ct: usize) {
         let mut stream = MerkleStream
             ::new(MerkleTree::<Sha2_256>::empty(sect_sz))
-            .compose(Cursor::new(vec![0u8; sect_sz * sect_ct]));
+            .compose(Cursor::new(vec![0u8; sect_sz * sect_ct]))
+            .expect("compose failed");
         let indices: Vec<usize> = rando.take(sect_ct).map(|e| e % sect_ct).collect();
         for index in indices.iter().map(|e| *e) {
             let offset = sect_sz * index;
@@ -2420,6 +2423,48 @@ mod tests {
         merkle_stream_random_test_case(Randomizer::new(SEED), 8192, 63);
     }
 
+    #[test]
+    fn compose_merkle_and_secret_streams() {
+        const SECT_SZ: usize = 4096;
+        const SECT_CT: usize = 16;
+        let memory = Cursor::new([0u8; SECT_SZ * SECT_CT]);
+        let merkle = MerkleStream::new(MerkleTree::<Sha2_256>::empty(SECT_SZ))
+            .compose(memory)
+            .expect("compose for merkle failed");
+        let key = SymKey::generate(SymKeyKind::Aes256Cbc).expect("key generation failed");
+        let mut secret = SecretStream::new(key)
+            .compose(merkle)
+            .expect("compose for secret failed");
+        let secret_sect_sz = secret.sector_sz();
+        write_fill(&mut secret, secret_sect_sz, SECT_CT);
+        secret.seek(SeekFrom::Start(0)).expect("seek failed");
+        read_check(&mut secret, secret_sect_sz, SECT_CT);
+    }
+
+    #[test]
+    fn compress_then_encrypt() {
+        unimplemented!();
+        use brotli::{CompressorWriter, Decompressor};
+        const SECT_SZ: usize = 4096;
+        const SECT_CT: usize = 16;
+        let params = BrotliParams::new(SECT_SZ, 8, 20);
+        let key = SymKey::generate(SymKeyKind::Aes256Cbc).expect("key generation failed");
+        let mut inner = SecretStream::new(key)
+            .compose(SectoredCursor::new([0u8; SECT_SZ * SECT_CT], SECT_SZ))
+            .expect("compose for inner failed");
+        {
+            let write: CompressorWriter<_> = params.clone().compose(&mut inner)
+                .expect("compose for write failed");
+            write_fill(write, SECT_SZ, SECT_CT);
+        } 
+        inner.seek(SeekFrom::Start(0)).expect("seek failed");
+        {
+            let read: Decompressor<_> = params.compose(&mut inner)
+                .expect("compose for read failed");
+            read_check(read, SECT_SZ, SECT_CT);
+        } 
+    }
+
     /// Tests that validate the dependencies of this module.
     mod dependency_tests {
         use super::*;

+ 453 - 6
crates/btnode/src/main.rs

@@ -11,6 +11,7 @@ mod serde_tests;
 #[macro_use]
 extern crate static_assertions;
 
+use brotli::{CompressorWriter, Decompressor};
 use serde_block_tree::{self, read_from, write_to};
 use harness::Message;
 mod crypto;
@@ -29,18 +30,48 @@ use serde::{Serialize, Deserialize};
 use serde_big_array::BigArray;
 use log::{info, error};
 
+#[derive(Debug)]
 enum Error {
     Io(std::io::Error),
     Serde(serde_block_tree::Error),
     Crypto(crypto::Error),
+    IncorrectSize { expected: usize, actual: usize },
+    Custom(Box<dyn std::fmt::Debug + Send + Sync>),
 }
 
+impl Error {
+    fn custom<E: std::fmt::Debug + Send + Sync + 'static>(err: E) -> Error {
+        Error::Custom(Box::new(err))
+    }
+}
+
+impl Display for Error {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        match self {
+            Error::Io(err) => err.fmt(f),
+            Error::Serde(err) => err.fmt(f),
+            Error::Crypto(err) => err.fmt(f),
+            Error::IncorrectSize { expected, actual }
+                => write!(f, "incorrect size {}, expected {}", actual, expected),
+            Error::Custom(err) => err.fmt(f),
+        }
+    }
+}
+
+impl std::error::Error for Error {}
+
 impl From<std::io::Error> for Error {
     fn from(err: std::io::Error) -> Self {
         Error::Io(err)
     }
 }
 
+impl From<Error> for std::io::Error {
+    fn from(err: Error) -> Self {
+        io::Error::new(io::ErrorKind::Other, err)
+    }
+}
+
 impl From<serde_block_tree::Error> for Error {
     fn from(err: serde_block_tree::Error) -> Self {
         Error::Serde(err)
@@ -53,6 +84,12 @@ impl From<crypto::Error> for Error {
     }
 }
 
+impl From<std::num::TryFromIntError> for Error {
+    fn from(err: std::num::TryFromIntError) -> Self {
+        Error::custom(err)
+    }
+}
+
 type Result<T> = std::result::Result<T, Error>;
 
 /// A Block tagged with its version number. When a block of a previous version is received over
@@ -69,6 +106,16 @@ const SECTOR_SZ_DEFAULT: usize = 4096;
 trait Sectored {
     // Returns the size of the sector for this stream.
     fn sector_sz(&self) -> usize;
+
+    fn assert_sector_sz(&self, actual: usize) -> Result<()> {
+        let expected = self.sector_sz();
+        if expected == actual {
+            Ok(())
+        }
+        else {
+            Err(Error::IncorrectSize { expected, actual })
+        }
+    }
 }
 
 #[derive(Serialize, Deserialize, Debug, PartialEq)]
@@ -86,12 +133,14 @@ struct NewBlock<T> {
 }
 
 impl<T> NewBlock<T> {
-    fn compose<U, V: Compose<T, U>>(self, new_body: V) -> NewBlock<U> {
-        NewBlock {
+    fn compose<E: Into<Error>, U: Decompose<T>, V: Compose<T, U, Error = E>>(
+        self, new_body: V
+    ) -> Result<NewBlock<U>> {
+        Ok(NewBlock {
             header: self.header,
             sig: self.sig,
-            body: new_body.compose(self.body),
-        }
+            body: new_body.compose(self.body).map_err(|err| err.into())?,
+        })
     }
 }
 
@@ -131,8 +180,13 @@ impl<T: Seek> Seek for NewBlock<T> {
     }
 }
 
-trait Compose<T, U> {
-    fn compose(self, inner: T) -> U;
+trait Decompose<T> {
+    fn into_inner(self) -> T;
+}
+
+trait Compose<T, U: Decompose<T>> {
+    type Error;
+    fn compose(self, inner: T) -> std::result::Result<U, Self::Error>;
 }
 
 struct FileBody {
@@ -179,6 +233,279 @@ impl Seek for FileBody {
     }
 }
 
+/// Extensions to the `Read` trait.
+trait ReadExt: Read {
+    /// Reads repeatedly until one of the following occur:
+    ///  1. The given buffer is full.
+    ///  2. A call to `read` returns 0.
+    ///  3. A call to `read` returns an error.
+    fn fill_buf(&mut self, mut dest: &mut [u8]) -> io::Result<()> {
+        while !dest.is_empty() {
+            let byte_ct = self.read(dest)?;
+            if 0 == byte_ct {
+                break;
+            }
+            dest = &mut dest[byte_ct..];
+        }
+        Ok(())
+    }
+}
+
+impl<T: Read> ReadExt for T {}
+
+impl<T: Write> Decompose<T> for CompressorWriter<T> {
+    fn into_inner(self) -> T {
+        self.into_inner()
+    }
+}
+
+impl<T: Read> Decompose<T> for Decompressor<T> {
+    fn into_inner(self) -> T {
+        self.into_inner()
+    }
+}
+
+#[derive(Clone)]
+struct BrotliParams {
+    buf_sz: usize,
+    quality: u32,
+    window_sz: u32,
+}
+
+impl BrotliParams {
+    fn new(buf_sz: usize, quality: u32, window_sz: u32) -> BrotliParams {
+        BrotliParams { buf_sz, quality, window_sz }
+    }
+}
+
+impl<T: Write> Compose<T, CompressorWriter<T>> for BrotliParams {
+    type Error = Error;
+    fn compose(self, inner: T) -> Result<CompressorWriter<T>> {
+        Ok(CompressorWriter::new(inner, self.buf_sz, self.quality, self.window_sz))
+    }
+}
+
+impl<T: Read> Compose<T, Decompressor<T>> for BrotliParams {
+    type Error = Error;
+    fn compose(self, inner: T) -> Result<Decompressor<T>> {
+        Ok(Decompressor::new(inner, self.buf_sz))
+    }
+}
+
+struct BufSectored<T> {
+    inner: T,
+    buf: Vec<u8>,
+    dirty: bool,
+    len: usize,
+    pos: usize,
+}
+
+impl<T> BufSectored<T> {
+    fn buf_pos(&self) -> usize {
+        self.pos % self.sector_sz()
+    }
+
+    fn curr_sector_start(&self) -> usize {
+        if self.pos == 0 {
+            return 0
+        }
+        let sect_sz = self.sector_sz();
+        let index = (self.pos - 1) / sect_sz;
+        sect_sz * index
+    }
+
+    fn buf_end(&self) -> usize {
+        let sect_start = self.curr_sector_start();
+        let limit = self.len.min(sect_start + self.sector_sz());
+        limit - sect_start
+    }
+}
+
+impl BufSectored<()> {
+    fn new() -> BufSectored<()> {
+        BufSectored {
+            inner: (),
+            buf: Vec::new(),
+            dirty: false,
+            len: 0,
+            pos: 0,
+        }
+    }
+}
+
+impl<T: Read> BufSectored<T> {
+    fn fill_internal_buf(&mut self) -> io::Result<()> {
+        self.inner.fill_buf(&mut self.buf)?;
+        Ok(())
+    }
+}
+
+impl<T> Decompose<T> for BufSectored<T> {
+    fn into_inner(self) -> T {
+        self.inner
+    }
+}
+
+impl<T: Sectored + Read + Seek> Compose<T, BufSectored<T>> for BufSectored<()> {
+    type Error = Error;
+    fn compose(self, inner: T) -> Result<BufSectored<T>> {
+        let sect_sz = inner.sector_sz();
+        let mut sectored = BufSectored {
+            inner,
+            buf: self.buf,
+            dirty: false,
+            len: 0,
+            pos: 0,
+        };
+        sectored.inner.seek(SeekFrom::Start(0))?;
+        sectored.buf.resize(sect_sz, 0);
+        if sectored.fill_internal_buf().is_ok() {
+            let mut slice = sectored.buf.as_slice();
+            if let Ok(len) = read_from::<u64, _>(&mut slice) {
+                sectored.len = len.try_into()?;
+                sectored.pos = std::mem::size_of::<u64>();
+            }
+        }
+        Ok(sectored)
+    }
+}
+
+impl<T> Sectored for BufSectored<T> {
+    fn sector_sz(&self) -> usize {
+        self.buf.len()
+    }
+}
+
+impl<T: Seek + Read + Write> Write for BufSectored<T> {
+    fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
+        let src_len_start = src.len();
+        let mut dest = {
+            let buf_pos = self.buf_pos();
+            &mut self.buf[buf_pos..]
+        };
+        while !src.is_empty() {
+            if dest.is_empty() {
+                self.flush()?;
+                dest = {
+                    let buf_pos = self.buf_pos();
+                    &mut self.buf[buf_pos..]
+                };
+            }
+            let sz = src.len().min(dest.len());
+            (&mut dest[..sz]).copy_from_slice(&src[..sz]);
+            dest = &mut dest[sz..];
+            src = &src[sz..];
+            self.dirty = sz > 0;
+            self.pos += sz;
+        }
+        Ok(src_len_start)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        fn err_conv<T, E: std::error::Error + Send + Sync + 'static>(
+            result: std::result::Result<T, E>
+        ) -> std::result::Result<T, io::Error> {
+            result.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
+        }
+
+        if !self.dirty {
+            return Ok(())
+        }
+
+        // Write out the contents of the buffer.
+        let inner_pos = self.inner.stream_position()?;
+        let inner_pos_usize: usize = err_conv(inner_pos.try_into())?;
+        if self.pos <= inner_pos_usize {
+            // The contents of the buffer were previously read from inner, so we write the updated
+            // contents to the same offset.
+            let sect_start: u64 = err_conv(self.curr_sector_start().try_into())?;
+            self.inner.seek(SeekFrom::Start(sect_start))?;
+        }
+        self.inner.write_all(&self.buf)?;
+
+        // Update the stored length.
+        self.len = self.len.max(self.pos);
+        self.inner.seek(SeekFrom::Start(0))?;
+        self.fill_internal_buf()?;
+        {
+            let len: u64 = err_conv(self.len.try_into())?;
+            let mut slice = self.buf.as_mut_slice();
+            err_conv(write_to(&len, &mut slice))?;
+        }
+        self.inner.seek(SeekFrom::Start(0))?;
+        self.inner.write_all(&self.buf)?;
+
+        // Seek back to the previous position.
+        self.inner.seek(SeekFrom::Start(inner_pos))?;
+        self.fill_internal_buf()?;
+        self.dirty = false;
+
+        Ok(())
+    }
+}
+
+impl<T: Read> Read for BufSectored<T> {
+    fn read(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
+        if self.pos == self.len {
+            return Ok(0)
+        }
+
+        let dest_len_start = dest.len();
+        let mut src = {
+            let start = self.buf_pos();
+            let end = self.buf_end();
+            &self.buf[start..end]
+        };
+        while !dest.is_empty() {
+            if src.is_empty() {
+                self.fill_internal_buf()?;
+                src = &self.buf[..];
+            }
+            let sz = src.len().min(dest.len());
+            (&mut dest[..sz]).copy_from_slice(&src[..sz]);
+            dest = &mut dest[sz..];
+            src = &src[sz..];
+            self.pos += sz;
+        }
+        Ok(dest_len_start)
+    }
+}
+
+impl<T: Seek + Read + Write> Seek for BufSectored<T> {
+    fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
+        let stream_pos = self.inner.stream_position()?;
+        let rel_start = match pos {
+            SeekFrom::Start(rel_start) => rel_start,
+            SeekFrom::Current(rel_curr) => {
+                if rel_curr > 0 {
+                    stream_pos + rel_curr as u64
+                }
+                else {
+                    stream_pos - rel_curr as u64
+                }
+            },
+            SeekFrom::End(_) => return Err(io::Error::new(
+                io::ErrorKind::Unsupported,
+                "seeking relative to the end of the stream is not supported"
+            ))
+        };
+        let sect_sz = self.sector_sz();
+        let sect_index = stream_pos as usize / sect_sz;
+        let sect_index_new = rel_start as usize / sect_sz;
+        let stream_pos = if sect_index != sect_index_new {
+            self.flush()?;
+            let stream_pos = self.inner.seek(SeekFrom::Start((sect_index_new * sect_sz) as u64))?;
+            self.fill_internal_buf()?;
+            stream_pos
+        }
+        else {
+            stream_pos
+        };
+        self.pos = stream_pos as usize;
+        Ok(rel_start)
+    }
+}
+
 /// A container which binds together ciphertext along with the metadata needed to identify,
 /// verify and decrypt it.
 #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
@@ -527,6 +854,8 @@ fn main() {
 
 #[cfg(test)]
 mod tests {
+    use std::{io::Cursor};
+
     use super::*;
     use test_helpers::*;
 
@@ -631,4 +960,122 @@ mod tests {
         second.owner = Principal(Hash::Sha2_256(PRINCIPAL2));
         assert!(!first.contains(&second));
     }
+
+    #[test]
+    fn brotli_compress_decompress() {
+        const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
+        const SECT_CT: usize = 16;
+        let params = BrotliParams::new(SECT_SZ, 8, 20);
+        let mut memory = Cursor::new([0u8; SECT_SZ * SECT_CT]);
+        {
+            let write: CompressorWriter<_> = params.clone().compose(&mut memory)
+                .expect("compose for write failed");
+            write_fill(write, SECT_SZ, SECT_CT);
+        }
+        memory.seek(SeekFrom::Start(0)).expect("seek failed");
+        {
+            let read: Decompressor<_> = params.compose(&mut memory)
+                .expect("compose for read failed");
+            read_check(read, SECT_SZ, SECT_CT);
+        }
+    }
+
+    fn make_buf_sectored(
+        sect_sz: usize, sect_ct: usize
+    ) -> BufSectored<SectoredCursor<Vec<u8>>> {
+        BufSectored::new()
+            .compose(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz))
+            .expect("compose for sectored buffer failed")
+    }
+
+    #[test]
+    fn buf_sectored_write_to_secret_stream() {
+        const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
+        const SECT_CT: usize = 16;
+        let mut sectored = make_buf_sectored(SECT_SZ, SECT_CT);
+        let sect_sz = sectored.sector_sz();
+        assert_eq!(0, sect_sz % 16);
+        let chunk_sz = sect_sz / 16; 
+        let chunk_ct = SECT_CT * 16;
+        write_fill(&mut sectored, chunk_sz, chunk_ct);
+    }
+
+    #[test]
+    fn buf_sectored_write_read_sequential() {
+        const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
+        const SECT_CT: usize = 16;
+        let mut sectored = make_buf_sectored(SECT_SZ, SECT_CT);
+        let sect_sz = sectored.sector_sz();
+        assert_eq!(0, sect_sz % 16);
+        let chunk_sz = sect_sz / 16; 
+        let chunk_ct = SECT_CT * 16 - 1;
+        write_fill(&mut sectored, chunk_sz, chunk_ct);
+        sectored.seek(SeekFrom::Start(0)).expect("seek failed");
+        let len: u64 = read_from(&mut sectored).expect("read_from failed");
+        assert_eq!(sectored.len, len as usize);
+        read_check(&mut sectored, chunk_sz, chunk_ct);
+    }
+
+    #[test]
+    fn buf_sectored_len_preserved() {
+        const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
+        const SECT_CT: usize = 16;
+        let mut sectored = make_buf_sectored(SECT_SZ, SECT_CT);
+        let expected = vec![42u8; 12];
+        // We need to ensure that writing expected will not fill up the buffer in sectored.
+        assert!(expected.len() < sectored.sector_sz() - std::mem::size_of::<usize>());
+
+        sectored.write_all(&expected).expect("write failed");
+        sectored.flush().expect("flush failed");
+        let inner = sectored.into_inner();
+        let mut sectored = BufSectored::new()
+            .compose(inner)
+            .expect("failed to compose sectored buffer");
+        let mut actual = vec![0u8; expected.len()];
+        sectored.fill_buf(actual.as_mut_slice()).expect("failed to fill actual");
+
+        assert_eq!(expected, actual);
+    }
+
+    #[test]
+    fn buf_sectored_write_read_random() {
+        const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
+        const SECT_CT: usize = 16;
+        const CAP: usize = SECT_SZ * SECT_CT - std::mem::size_of::<usize>();
+        let mut rando = Randomizer::new([3u8; Randomizer::HASH.len()]);
+        let source = {
+            let mut expected = Vec::with_capacity(CAP);
+            expected.extend((&mut rando).take(expected.capacity()).map(|e| (e % 256) as u8));
+            expected
+        };
+        let rando2 = Randomizer::new([5u8; Randomizer::HASH.len()]);
+        let indices: Vec<(usize, usize)> = rando
+            .zip(rando2)
+            .take(2)
+            .map(|(mut first, mut second)| {
+                first %= source.len();
+                second &= source.len();
+                let low = first.min(second);
+                let high = first.max(second);
+                (low, high)
+            })
+            .collect();
+
+        let mut sectored = make_buf_sectored(SECT_SZ, SECT_CT);
+        sectored.write_all(&[0u8; CAP]).expect("failed to fill sectored");
+        sectored.flush().expect("flush failed");
+        for (low, high) in indices.iter() {
+            sectored.seek(SeekFrom::Start(*low as u64)).expect("seek failed");
+            sectored.write_all(&source[*low..*high]).expect("write failed");
+            sectored.flush().expect("flush failed");
+        }
+        let mut buf = vec![0u8; CAP];
+        for (_k, (low, high)) in indices.iter().enumerate() {
+            sectored.seek(SeekFrom::Start(*low as u64)).expect("seek failed");
+            let actual = &mut buf[*low..*high];
+            sectored.fill_buf(actual).expect("read failed");
+            let expected = &source[*low..*high];
+            assert_eq!(expected, actual);
+        }
+    }
 }

+ 40 - 0
crates/btnode/src/test_helpers.rs

@@ -559,6 +559,33 @@ impl Iterator for Randomizer {
     }
 }
 
+pub fn make_sector(mut buf: &mut [u8], sect_index: usize) {
+    let data = (sect_index + 1).to_ne_bytes();
+    for chunk in std::iter::repeat(data).take(buf.len() / data.len()) {
+        (&mut buf[..chunk.len()]).copy_from_slice(chunk.as_slice());
+        buf = &mut buf[chunk.len()..];
+    }
+}
+
+pub fn write_fill<W: Write>(mut write: W, sect_sz: usize, sect_ct: usize) {
+    let mut buf = vec![0u8; sect_sz];
+    for sect_index in 0..sect_ct {
+        make_sector(&mut buf, sect_index);
+        write.write_all(&mut buf).expect("write failed");
+    }
+    write.flush().expect("flush failed");
+}
+
+pub fn read_check<R: Read>(mut read: R, sect_sz: usize, sect_ct: usize) {
+    let mut actual = vec![0u8; sect_sz];
+    let mut expected = vec![0u8; sect_sz];
+    for sect_index in 0..sect_ct {
+        make_sector(&mut expected, sect_index);
+        read.read_exact(&mut actual).expect("read failed");
+        assert_eq!(expected, actual);
+    }
+}
+
 pub struct SectoredCursor<T> {
     cursor: Cursor<T>,
     sect_sz: usize,
@@ -578,6 +605,18 @@ impl<T> Sectored for SectoredCursor<T> {
 
 impl Write for SectoredCursor<Vec<u8>> {
     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.assert_sector_sz(buf.len())?;
+        self.cursor.write(buf)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.cursor.flush()
+    }
+}
+
+impl<const N: usize> Write for SectoredCursor<[u8; N]> {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.assert_sector_sz(buf.len())?;
         self.cursor.write(buf)
     }
 
@@ -588,6 +627,7 @@ impl Write for SectoredCursor<Vec<u8>> {
 
 impl<T: AsRef<[u8]>> Read for SectoredCursor<T> {
     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        self.assert_sector_sz(buf.len())?;
         self.cursor.read(buf)
     }
 }