Переглянути джерело

Finished implementing the sectored buffer.

Matthew Carr 2 роки тому
батько
коміт
5bf7c9590c
2 змінених файлів з 367 додано та 158 видалено
  1. 36 24
      crates/btnode/src/crypto/mod.rs
  2. 331 134
      crates/btnode/src/main.rs

+ 36 - 24
crates/btnode/src/crypto/mod.rs

@@ -24,7 +24,7 @@ use super::{
     Write,
     Seek,
     Header,
-    Compose,
+    TryCompose,
     Sectored,
 };
 
@@ -45,7 +45,7 @@ use serde::{
 };
 use zeroize::ZeroizeOnDrop;
 use std::{
-    str::FromStr, num::TryFromIntError, marker::PhantomData, io::{ErrorKind, SeekFrom},
+    str::FromStr, num::TryFromIntError, marker::PhantomData, io::{ErrorKind, SeekFrom}, convert::Infallible,
 };
 use strum_macros::{EnumString, EnumDiscriminants, Display};
 use foreign_types::ForeignType;
@@ -1573,9 +1573,9 @@ impl<T, H> Sectored for MerkleStream<T, H> {
     }
 }
 
-impl<T, H> Compose<T, MerkleStream<T, H>> for MerkleStream<(), H> {
-    type Error = Error;
-    fn compose(self, inner: T) -> Result<MerkleStream<T, H>> {
+impl<T, H> TryCompose<T, MerkleStream<T, H>> for MerkleStream<(), H> {
+    type Error = Infallible;
+    fn try_compose(self, inner: T) -> std::result::Result<MerkleStream<T, H>, Infallible> {
         Ok(MerkleStream {
             inner,
             tree: self.tree,
@@ -1673,15 +1673,15 @@ impl<T> Decompose<T> for SecretStream<T> {
     }
 }
 
-impl<T, U: Sectored> Compose<U, SecretStream<U>> for SecretStream<T> {
+impl<T, U: Sectored> TryCompose<U, SecretStream<U>> for SecretStream<T> {
     type Error = Error;
-    fn compose(mut self, inner: U) -> Result<SecretStream<U>> {
+    fn try_compose(mut self, inner: U) -> Result<SecretStream<U>> {
         let inner_sect_sz = inner.sector_sz();
         let expansion_sz = self.key.expansion_sz();
         let sect_sz = inner_sect_sz - expansion_sz;
         let block_sz = self.key.block_size();
         if 0 != sect_sz % block_sz {
-            panic!("{}", Error::IndivisibleSize { divisor: block_sz, actual: sect_sz })
+            return Err(Error::IndivisibleSize { divisor: block_sz, actual: sect_sz });
         }
         self.pt_buf.resize(inner_sect_sz, 0);
         self.pt_buf.resize(inner_sect_sz + block_sz, 0);
@@ -1725,7 +1725,17 @@ impl<T: Read> Read for SecretStream<T> {
         self.assert_sector_sz(buf.len())?;
 
         self.ct_buf.resize(self.inner_sect_sz, 0);
-        self.inner.read_exact(&mut self.ct_buf)?;
+        match self.inner.read_exact(&mut self.ct_buf) {
+            Ok(_) => (),
+            Err(err) => {
+                if err.kind() == io::ErrorKind::UnexpectedEof {
+                    return Ok(0)
+                }
+                else {
+                    return Err(err)
+                }
+            }
+        }
 
         self.pt_buf.resize(self.inner_sect_sz + self.key.block_size(), 0);
         let mut decrypter = self.key.to_decrypter()?;
@@ -1959,6 +1969,7 @@ mod tests {
         SECTOR_SZ_DEFAULT,
         test_helpers::*,
         BrotliParams,
+        SectoredBuf,
     };
     use super::*;
     use std::{
@@ -2142,7 +2153,7 @@ mod tests {
         key: SymKey, inner_sect_sz: usize, sect_ct: usize
     ) {
         let mut stream = SecretStream::new(key)
-            .compose(SectoredCursor::new(vec![0u8; inner_sect_sz * sect_ct], inner_sect_sz))
+            .try_compose(SectoredCursor::new(vec![0u8; inner_sect_sz * sect_ct], inner_sect_sz))
             .expect("compose failed");
         let sector_sz = stream.sector_sz();
         for k in 0..sect_ct {
@@ -2177,7 +2188,7 @@ mod tests {
         rando: Randomizer, key: SymKey, inner_sect_sz: usize, sect_ct: usize
     ) {
         let mut stream = SecretStream::new(key)
-            .compose(SectoredCursor::new(vec![0u8; inner_sect_sz * sect_ct], inner_sect_sz))
+            .try_compose(SectoredCursor::new(vec![0u8; inner_sect_sz * sect_ct], inner_sect_sz))
             .expect("compose failed");
         let sect_sz = stream.sector_sz();
         let indices: Vec<usize> = rando.take(sect_ct).map(|e| e % sect_ct).collect();
@@ -2225,7 +2236,7 @@ mod tests {
             vec![0u8; num_sectors * SECTOR_SZ_DEFAULT],
             SECTOR_SZ_DEFAULT
         );
-        SecretStream::new(key).compose(inner).expect("compose failed")
+        SecretStream::new(key).try_compose(inner).expect("compose failed")
     }
 
     #[test]
@@ -2244,7 +2255,7 @@ mod tests {
                     vec![0u8; num_sectors * SECTOR_SZ_DEFAULT],
                     SECTOR_SZ_DEFAULT
                 );
-                SecretStream::new(key).compose(inner).expect("compose failed")
+                SecretStream::new(key).try_compose(inner).expect("compose failed")
             }
         }
 
@@ -2365,7 +2376,7 @@ mod tests {
     fn merkle_stream_sequential_test_case(sect_sz: usize, sect_count: usize) {
         let mut stream = MerkleStream
             ::new(MerkleTree::<Sha2_256>::empty(sect_sz))
-            .compose(Cursor::new(vec![0u8; sect_count * sect_sz]))
+            .try_compose(Cursor::new(vec![0u8; sect_count * sect_sz]))
             .expect("compose failed");
         for k in 1..(sect_count + 1) {
             let sector = vec![k as u8; sect_sz];
@@ -2392,7 +2403,7 @@ mod tests {
     fn merkle_stream_random_test_case(rando: Randomizer, sect_sz: usize, sect_ct: usize) {
         let mut stream = MerkleStream
             ::new(MerkleTree::<Sha2_256>::empty(sect_sz))
-            .compose(Cursor::new(vec![0u8; sect_sz * sect_ct]))
+            .try_compose(Cursor::new(vec![0u8; sect_sz * sect_ct]))
             .expect("compose failed");
         let indices: Vec<usize> = rando.take(sect_ct).map(|e| e % sect_ct).collect();
         for index in indices.iter().map(|e| *e) {
@@ -2429,11 +2440,11 @@ mod tests {
         const SECT_CT: usize = 16;
         let memory = Cursor::new([0u8; SECT_SZ * SECT_CT]);
         let merkle = MerkleStream::new(MerkleTree::<Sha2_256>::empty(SECT_SZ))
-            .compose(memory)
+            .try_compose(memory)
             .expect("compose for merkle failed");
         let key = SymKey::generate(SymKeyKind::Aes256Cbc).expect("key generation failed");
         let mut secret = SecretStream::new(key)
-            .compose(merkle)
+            .try_compose(merkle)
             .expect("compose for secret failed");
         let secret_sect_sz = secret.sector_sz();
         write_fill(&mut secret, secret_sect_sz, SECT_CT);
@@ -2443,26 +2454,27 @@ mod tests {
 
     #[test]
     fn compress_then_encrypt() {
-        unimplemented!();
         use brotli::{CompressorWriter, Decompressor};
         const SECT_SZ: usize = 4096;
         const SECT_CT: usize = 16;
         let params = BrotliParams::new(SECT_SZ, 8, 20);
         let key = SymKey::generate(SymKeyKind::Aes256Cbc).expect("key generation failed");
-        let mut inner = SecretStream::new(key)
-            .compose(SectoredCursor::new([0u8; SECT_SZ * SECT_CT], SECT_SZ))
-            .expect("compose for inner failed");
+        let mut inner = SectoredBuf::new()
+            .try_compose(SecretStream::new(key)
+                .try_compose(SectoredCursor::new([0u8; SECT_SZ * SECT_CT], SECT_SZ))
+                .expect("compose for inner failed")
+            ).expect("compose with sectored buffer failed");
         {
-            let write: CompressorWriter<_> = params.clone().compose(&mut inner)
+            let write: CompressorWriter<_> = params.clone().try_compose(&mut inner)
                 .expect("compose for write failed");
             write_fill(write, SECT_SZ, SECT_CT);
         } 
         inner.seek(SeekFrom::Start(0)).expect("seek failed");
         {
-            let read: Decompressor<_> = params.compose(&mut inner)
+            let read: Decompressor<_> = params.try_compose(&mut inner)
                 .expect("compose for read failed");
             read_check(read, SECT_SZ, SECT_CT);
-        } 
+        }
     }
 
     /// Tests that validate the dependencies of this module.

+ 331 - 134
crates/btnode/src/main.rs

@@ -19,7 +19,7 @@ use crypto::{Hash, HashKind, Signature, SymKey, Cryptotext, Sign, AsymKeyPub};
 
 use std::{
     collections::HashMap,
-    convert::TryFrom,
+    convert::{TryFrom, Infallible},
     hash::Hash as Hashable,
     fmt::{self, Display, Formatter},
     time::{Duration, SystemTime},
@@ -133,13 +133,13 @@ struct NewBlock<T> {
 }
 
 impl<T> NewBlock<T> {
-    fn compose<E: Into<Error>, U: Decompose<T>, V: Compose<T, U, Error = E>>(
+    fn compose<E: Into<Error>, U: Decompose<T>, V: TryCompose<T, U, Error = E>>(
         self, new_body: V
     ) -> Result<NewBlock<U>> {
         Ok(NewBlock {
             header: self.header,
             sig: self.sig,
-            body: new_body.compose(self.body).map_err(|err| err.into())?,
+            body: new_body.try_compose(self.body).map_err(|err| err.into())?,
         })
     }
 }
@@ -184,9 +184,21 @@ trait Decompose<T> {
     fn into_inner(self) -> T;
 }
 
-trait Compose<T, U: Decompose<T>> {
+trait TryCompose<T, U: Decompose<T>> {
     type Error;
-    fn compose(self, inner: T) -> std::result::Result<U, Self::Error>;
+    fn try_compose(self, inner: T) -> std::result::Result<U, Self::Error>;
+}
+
+trait Compose<T, U> {
+    fn compose(self, inner: T) -> U;
+}
+
+impl<T, U: Decompose<T>, S: TryCompose<T, U, Error = Infallible>> Compose<T, U> for S {
+    fn compose(self, inner: T) -> U {
+        let result = self.try_compose(inner);
+        // Safety: Infallible has no values, so `result` must be `Ok`.
+        unsafe { result.unwrap_unchecked() }
+    }
 }
 
 struct FileBody {
@@ -239,15 +251,29 @@ trait ReadExt: Read {
     ///  1. The given buffer is full.
     ///  2. A call to `read` returns 0.
     ///  3. A call to `read` returns an error.
-    fn fill_buf(&mut self, mut dest: &mut [u8]) -> io::Result<()> {
+    /// The number of bytes read is returned. If an error is returned, then no bytes were read.
+    fn fill_buf(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
+        let dest_len_start = dest.len();
         while !dest.is_empty() {
-            let byte_ct = self.read(dest)?;
+            let byte_ct = match self.read(dest) {
+                Ok(byte_ct) => byte_ct,
+                Err(err) => {
+                    if dest_len_start == dest.len() {
+                        return Err(err)
+                    }
+                    else {
+                        // We're not allowed to return an error if we've already read from self.
+                        error!("an error occurred in fill_buf: {}", err);
+                        break;
+                    }
+                }
+            };
             if 0 == byte_ct {
                 break;
             }
             dest = &mut dest[byte_ct..];
         }
-        Ok(())
+        Ok(dest_len_start - dest.len())
     }
 }
 
@@ -278,105 +304,174 @@ impl BrotliParams {
     }
 }
 
-impl<T: Write> Compose<T, CompressorWriter<T>> for BrotliParams {
+impl<T: Write> TryCompose<T, CompressorWriter<T>> for BrotliParams {
     type Error = Error;
-    fn compose(self, inner: T) -> Result<CompressorWriter<T>> {
+    fn try_compose(self, inner: T) -> Result<CompressorWriter<T>> {
         Ok(CompressorWriter::new(inner, self.buf_sz, self.quality, self.window_sz))
     }
 }
 
-impl<T: Read> Compose<T, Decompressor<T>> for BrotliParams {
+impl<T: Read> TryCompose<T, Decompressor<T>> for BrotliParams {
     type Error = Error;
-    fn compose(self, inner: T) -> Result<Decompressor<T>> {
+    fn try_compose(self, inner: T) -> Result<Decompressor<T>> {
         Ok(Decompressor::new(inner, self.buf_sz))
     }
 }
 
-struct BufSectored<T> {
+/// TODO: Remove this once the error_chain crate is integrated.
+fn err_conv<T, E: std::error::Error + Send + Sync + 'static>(
+    result: std::result::Result<T, E>
+) -> std::result::Result<T, io::Error> {
+    result.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
+}
+
+/// A stream which buffers writes and read such that the inner stream only sees reads and writes
+/// of sector length buffers.
+struct SectoredBuf<T> {
     inner: T,
     buf: Vec<u8>,
+    /// The offset into the inner stream which the zero offset byte in `buf` corresponds to.
+    buf_start: usize,
+    /// Indicates if the contents of `buf` have been written to, and so whether `buf` needs to be
+    /// written back to `inner` before it is refilled.
     dirty: bool,
+    /// The total number of bytes that have been written to the inner stream, including the reserved
+    /// bytes at the beginning.
     len: usize,
+    /// The current position of this stream, expressed as an offset into the inner stream.
     pos: usize,
 }
 
-impl<T> BufSectored<T> {
-    fn buf_pos(&self) -> usize {
-        self.pos % self.sector_sz()
+impl SectoredBuf<()> {
+    fn new() -> SectoredBuf<()> {
+        SectoredBuf {
+            inner: (),
+            buf: Vec::new(),
+            buf_start: 0,
+            dirty: false,
+            len: 0,
+            pos: 0,
+        }
     }
+}
 
-    fn curr_sector_start(&self) -> usize {
-        if self.pos == 0 {
-            return 0
-        }
-        let sect_sz = self.sector_sz();
-        let index = (self.pos - 1) / sect_sz;
-        sect_sz * index
+impl<T> SectoredBuf<T> {
+    /// The number of bytes at the beginning of the inner stream which are reserved to store the
+    /// length of data written. All offsets into the stored data must be shifted by this amount to
+    /// be translated to an offset in the inner stream.
+    const RESERVED: usize = std::mem::size_of::<usize>();
+
+    /// Returns the position in the inner stream which the given position in this stream corresponds
+    /// to.
+    fn inner_pos(self_pos: u64) -> u64 {
+        let offset: u64 = Self::RESERVED.try_into().unwrap(); 
+        self_pos + offset
+    }
+
+    /// Returns the position in this stream which the given position in the inner stream corresponds
+    /// to.
+    fn self_pos(inner_pos: u64) -> u64{
+        let offset: u64 = Self::RESERVED.try_into().unwrap(); 
+        inner_pos - offset
     }
 
+    /// Returns the offset into the internal buffer that corresponds to the current position.
+    fn buf_pos(&self) -> usize {
+        self.pos - self.buf_start
+    }
+
+    /// Returns one more than the last index in the internal buffer which can be read.
     fn buf_end(&self) -> usize {
-        let sect_start = self.curr_sector_start();
-        let limit = self.len.min(sect_start + self.sector_sz());
-        limit - sect_start
+        let limit = self.len.min(self.buf_start + self.sector_sz());
+        limit - self.buf_start
     }
-}
 
-impl BufSectored<()> {
-    fn new() -> BufSectored<()> {
-        BufSectored {
-            inner: (),
-            buf: Vec::new(),
-            dirty: false,
-            len: 0,
-            pos: 0,
-        }
+    /// Returns the index of the sector which is currently loaded into the buffer.
+    fn buf_sector_index(&self) -> usize {
+        self.pos / self.sector_sz()
     }
 }
 
-impl<T: Read> BufSectored<T> {
-    fn fill_internal_buf(&mut self) -> io::Result<()> {
-        self.inner.fill_buf(&mut self.buf)?;
-        Ok(())
+impl<T: Read + Seek> SectoredBuf<T> {
+    /// Fills the internal buffer by reading from the inner stream at the current position
+    /// and updates `self.buf_start` with the position read from.
+    fn fill_internal_buf(&mut self) -> io::Result<usize> {
+        self.buf_start = err_conv(self.inner.stream_position()?.try_into())?;
+        let read_bytes = if self.buf_start < self.len {
+            let read_bytes = self.inner.fill_buf(&mut self.buf)?;
+            if read_bytes < self.buf.len() {
+                return Err(io::Error::new(
+                    io::ErrorKind::Other,
+                    format!(
+                        "Failed to fill SectoredBuf.buf. Expected {} bytes, got {}.",
+                        self.buf.len(),
+                        read_bytes
+                    )
+                ));
+            }
+            read_bytes
+        }
+        else {
+            0
+        };
+        Ok(read_bytes)
     }
 }
 
-impl<T> Decompose<T> for BufSectored<T> {
+impl<T> Decompose<T> for SectoredBuf<T> {
     fn into_inner(self) -> T {
         self.inner
     }
 }
 
-impl<T: Sectored + Read + Seek> Compose<T, BufSectored<T>> for BufSectored<()> {
+impl<T: Sectored + Read + Seek> TryCompose<T, SectoredBuf<T>> for SectoredBuf<()> {
     type Error = Error;
-    fn compose(self, inner: T) -> Result<BufSectored<T>> {
+    fn try_compose(self, inner: T) -> Result<SectoredBuf<T>> {
         let sect_sz = inner.sector_sz();
-        let mut sectored = BufSectored {
+        if sect_sz < Self::RESERVED {
+            return Err(Error::custom(format!(
+                "a sector size of at least {} is required. Got {}",
+                Self::RESERVED,
+                sect_sz,
+            )));
+        }
+        let mut sectored = SectoredBuf {
             inner,
             buf: self.buf,
+            buf_start: 0,
             dirty: false,
-            len: 0,
-            pos: 0,
+            len: Self::RESERVED,
+            pos: Self::RESERVED,
         };
         sectored.inner.seek(SeekFrom::Start(0))?;
         sectored.buf.resize(sect_sz, 0);
-        if sectored.fill_internal_buf().is_ok() {
-            let mut slice = sectored.buf.as_slice();
-            if let Ok(len) = read_from::<u64, _>(&mut slice) {
+        let len_stored = match sectored.fill_internal_buf() {
+            Ok(bytes_read) => bytes_read >= Self::RESERVED,
+            Err(err) => {
+                error!("SectoredBuf::fill_internal_buf returned an error: {}", err);
+                false
+            },
+        };
+        if len_stored {
+            if let Ok(len) = read_from::<u64, _>(&mut sectored.buf.as_slice()) {
                 sectored.len = len.try_into()?;
-                sectored.pos = std::mem::size_of::<u64>();
             }
         }
+        else {
+            write_to(&Self::RESERVED, &mut sectored.buf.as_mut_slice())?;
+            sectored.dirty = true;
+        }
         Ok(sectored)
     }
 }
 
-impl<T> Sectored for BufSectored<T> {
+impl<T> Sectored for SectoredBuf<T> {
     fn sector_sz(&self) -> usize {
         self.buf.len()
     }
 }
 
-impl<T: Seek + Read + Write> Write for BufSectored<T> {
+impl<T: Seek + Read + Write> Write for SectoredBuf<T> {
     fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
         let src_len_start = src.len();
         let mut dest = {
@@ -385,11 +480,11 @@ impl<T: Seek + Read + Write> Write for BufSectored<T> {
         };
         while !src.is_empty() {
             if dest.is_empty() {
-                self.flush()?;
-                dest = {
-                    let buf_pos = self.buf_pos();
-                    &mut self.buf[buf_pos..]
-                };
+                if let Err(err) = self.flush() {
+                    error!("A call to SectoredBuf::flush returned an error: {}", err);
+                    break;
+                }
+                dest = &mut self.buf[..];
             }
             let sz = src.len().min(dest.len());
             (&mut dest[..sz]).copy_from_slice(&src[..sz]);
@@ -398,45 +493,53 @@ impl<T: Seek + Read + Write> Write for BufSectored<T> {
             self.dirty = sz > 0;
             self.pos += sz;
         }
-        Ok(src_len_start)
+        Ok(src_len_start - src.len())
     }
 
     fn flush(&mut self) -> io::Result<()> {
-        fn err_conv<T, E: std::error::Error + Send + Sync + 'static>(
-            result: std::result::Result<T, E>
-        ) -> std::result::Result<T, io::Error> {
-            result.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
-        }
-
         if !self.dirty {
             return Ok(())
         }
 
         // Write out the contents of the buffer.
+        let sect_sz: u64 = err_conv(self.sector_sz().try_into())?;
         let inner_pos = self.inner.stream_position()?;
         let inner_pos_usize: usize = err_conv(inner_pos.try_into())?;
-        if self.pos <= inner_pos_usize {
-            // The contents of the buffer were previously read from inner, so we write the updated
-            // contents to the same offset.
-            let sect_start: u64 = err_conv(self.curr_sector_start().try_into())?;
-            self.inner.seek(SeekFrom::Start(sect_start))?;
+        let is_new_sector = self.pos > inner_pos_usize;
+        let is_full = (self.buf.len() - self.buf_pos()) == 0;
+        let seek_to = if is_new_sector {
+            if is_full {
+                inner_pos + sect_sz
+            }
+            else {
+                inner_pos
+            }
         }
+        else {
+            // The contents of the buffer were previously read from inner, so we write the
+            // updated contents to the same offset.
+            let sect_start: u64 = err_conv(self.buf_start.try_into())?;
+            self.inner.seek(SeekFrom::Start(sect_start))?;
+            if is_full {
+                inner_pos
+            }
+            else {
+                inner_pos - sect_sz
+            }
+        };
         self.inner.write_all(&self.buf)?;
 
         // Update the stored length.
         self.len = self.len.max(self.pos);
         self.inner.seek(SeekFrom::Start(0))?;
         self.fill_internal_buf()?;
-        {
-            let len: u64 = err_conv(self.len.try_into())?;
-            let mut slice = self.buf.as_mut_slice();
-            err_conv(write_to(&len, &mut slice))?;
-        }
+        let len: u64 = err_conv(self.len.try_into())?;
+        err_conv(write_to(&len, &mut self.buf.as_mut_slice()))?;
         self.inner.seek(SeekFrom::Start(0))?;
         self.inner.write_all(&self.buf)?;
 
-        // Seek back to the previous position.
-        self.inner.seek(SeekFrom::Start(inner_pos))?;
+        // Seek to the next position.
+        self.inner.seek(SeekFrom::Start(seek_to))?;
         self.fill_internal_buf()?;
         self.dirty = false;
 
@@ -444,7 +547,7 @@ impl<T: Seek + Read + Write> Write for BufSectored<T> {
     }
 }
 
-impl<T: Read> Read for BufSectored<T> {
+impl<T: Read + Seek> Read for SectoredBuf<T> {
     fn read(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
         if self.pos == self.len {
             return Ok(0)
@@ -458,8 +561,20 @@ impl<T: Read> Read for BufSectored<T> {
         };
         while !dest.is_empty() {
             if src.is_empty() {
-                self.fill_internal_buf()?;
-                src = &self.buf[..];
+                if self.pos >= self.len {
+                    break;
+                }
+                let byte_ct = match self.fill_internal_buf() {
+                    Ok(byte_ct) => byte_ct,
+                    Err(err) => {
+                        error!("SectoredBuf::full_internal_buf returned an error: {}", err);
+                        break;
+                    }
+                };
+                if 0 == byte_ct {
+                    break;
+                }
+                src = &self.buf[..byte_ct];
             }
             let sz = src.len().min(dest.len());
             (&mut dest[..sz]).copy_from_slice(&src[..sz]);
@@ -467,21 +582,21 @@ impl<T: Read> Read for BufSectored<T> {
             src = &src[sz..];
             self.pos += sz;
         }
-        Ok(dest_len_start)
+        Ok(dest_len_start - dest.len())
     }
 }
 
-impl<T: Seek + Read + Write> Seek for BufSectored<T> {
+impl<T: Seek + Read + Write> Seek for SectoredBuf<T> {
     fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
-        let stream_pos = self.inner.stream_position()?;
-        let rel_start = match pos {
-            SeekFrom::Start(rel_start) => rel_start,
+        let inner_pos = self.inner.stream_position()?;
+        let inner_pos_new = match pos {
+            SeekFrom::Start(rel_start) => Self::inner_pos(rel_start),
             SeekFrom::Current(rel_curr) => {
                 if rel_curr > 0 {
-                    stream_pos + rel_curr as u64
+                    inner_pos + rel_curr as u64
                 }
                 else {
-                    stream_pos - rel_curr as u64
+                    inner_pos - rel_curr as u64
                 }
             },
             SeekFrom::End(_) => return Err(io::Error::new(
@@ -490,19 +605,17 @@ impl<T: Seek + Read + Write> Seek for BufSectored<T> {
             ))
         };
         let sect_sz = self.sector_sz();
-        let sect_index = stream_pos as usize / sect_sz;
-        let sect_index_new = rel_start as usize / sect_sz;
-        let stream_pos = if sect_index != sect_index_new {
+        let sect_index = self.buf_sector_index();
+        let sect_index_new = err_conv(TryInto::<usize>::try_into(inner_pos_new))? / sect_sz;
+        let pos: u64 = err_conv(self.pos.try_into())?;
+        if sect_index != sect_index_new || pos == inner_pos {
             self.flush()?;
-            let stream_pos = self.inner.seek(SeekFrom::Start((sect_index_new * sect_sz) as u64))?;
+            let seek_to: u64 = err_conv((sect_index_new * sect_sz).try_into())?;
+            self.inner.seek(SeekFrom::Start(seek_to))?;
             self.fill_internal_buf()?;
-            stream_pos
         }
-        else {
-            stream_pos
-        };
-        self.pos = stream_pos as usize;
-        Ok(rel_start)
+        self.pos = err_conv(inner_pos_new.try_into())?;
+        Ok(Self::self_pos(inner_pos_new))
     }
 }
 
@@ -968,31 +1081,31 @@ mod tests {
         let params = BrotliParams::new(SECT_SZ, 8, 20);
         let mut memory = Cursor::new([0u8; SECT_SZ * SECT_CT]);
         {
-            let write: CompressorWriter<_> = params.clone().compose(&mut memory)
+            let write: CompressorWriter<_> = params.clone().try_compose(&mut memory)
                 .expect("compose for write failed");
             write_fill(write, SECT_SZ, SECT_CT);
         }
         memory.seek(SeekFrom::Start(0)).expect("seek failed");
         {
-            let read: Decompressor<_> = params.compose(&mut memory)
+            let read: Decompressor<_> = params.try_compose(&mut memory)
                 .expect("compose for read failed");
             read_check(read, SECT_SZ, SECT_CT);
         }
     }
 
-    fn make_buf_sectored(
+    fn make_sectored_buf(
         sect_sz: usize, sect_ct: usize
-    ) -> BufSectored<SectoredCursor<Vec<u8>>> {
-        BufSectored::new()
-            .compose(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz))
+    ) -> SectoredBuf<SectoredCursor<Vec<u8>>> {
+        SectoredBuf::new()
+            .try_compose(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz))
             .expect("compose for sectored buffer failed")
     }
 
     #[test]
-    fn buf_sectored_write_to_secret_stream() {
+    fn sectored_buf_fill_inner() {
         const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
         const SECT_CT: usize = 16;
-        let mut sectored = make_buf_sectored(SECT_SZ, SECT_CT);
+        let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
         let sect_sz = sectored.sector_sz();
         assert_eq!(0, sect_sz % 16);
         let chunk_sz = sect_sz / 16; 
@@ -1001,35 +1114,43 @@ mod tests {
     }
 
     #[test]
-    fn buf_sectored_write_read_sequential() {
+    fn sectored_buf_write_read_sequential() {
         const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
         const SECT_CT: usize = 16;
-        let mut sectored = make_buf_sectored(SECT_SZ, SECT_CT);
+        let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
         let sect_sz = sectored.sector_sz();
         assert_eq!(0, sect_sz % 16);
         let chunk_sz = sect_sz / 16; 
+        // We subtract one here so that the underlying buffer is not completely filled. This
+        // exercises the length limiting capability of the sectored buffer.
         let chunk_ct = SECT_CT * 16 - 1;
         write_fill(&mut sectored, chunk_sz, chunk_ct);
         sectored.seek(SeekFrom::Start(0)).expect("seek failed");
-        let len: u64 = read_from(&mut sectored).expect("read_from failed");
-        assert_eq!(sectored.len, len as usize);
         read_check(&mut sectored, chunk_sz, chunk_ct);
     }
 
     #[test]
-    fn buf_sectored_len_preserved() {
+    fn sectored_buf_sect_sz_too_small_is_error() {
+        const MIN: usize = SectoredBuf::<()>::RESERVED;
+        let result = SectoredBuf::new()
+            .try_compose(SectoredCursor::new([0u8; MIN], MIN - 1));
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn sectored_buf_len_preserved() {
         const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
         const SECT_CT: usize = 16;
-        let mut sectored = make_buf_sectored(SECT_SZ, SECT_CT);
+        let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
         let expected = vec![42u8; 12];
         // We need to ensure that writing expected will not fill up the buffer in sectored.
-        assert!(expected.len() < sectored.sector_sz() - std::mem::size_of::<usize>());
+        assert!(expected.len() < sectored.sector_sz() - SectoredBuf::<()>::RESERVED);
 
         sectored.write_all(&expected).expect("write failed");
         sectored.flush().expect("flush failed");
         let inner = sectored.into_inner();
-        let mut sectored = BufSectored::new()
-            .compose(inner)
+        let mut sectored = SectoredBuf::new()
+            .try_compose(inner)
             .expect("failed to compose sectored buffer");
         let mut actual = vec![0u8; expected.len()];
         sectored.fill_buf(actual.as_mut_slice()).expect("failed to fill actual");
@@ -1038,37 +1159,67 @@ mod tests {
     }
 
     #[test]
-    fn buf_sectored_write_read_random() {
-        const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
+    fn sectored_buf_seek() {
+        let sect_sz = 16usize;
+        let sect_ct = 16usize;
+        let cap = sect_sz * sect_ct - std::mem::size_of::<usize>(); 
+        let source = {
+            let mut source = Vec::with_capacity(cap);
+            source.extend(
+                std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
+                    .take(cap)
+            );
+            source
+        };
+        let mut sectored = make_sectored_buf(sect_sz, sect_ct);
+        sectored.write(&source).expect("write failed");
+        let mut buf = [0u8; 1];
+        let end = cap.try_into().expect("cap cannot fit into a u8");
+        for pos in (0..end).rev() {
+            sectored.seek(SeekFrom::Start(pos as u64)).expect("seek failed");
+            sectored.read(&mut buf).expect("read failed");
+            assert_eq!(pos, buf[0]);
+        }
+    }
+
+    #[test]
+    fn sectored_buf_write_read_random() {
+        const SECT_SZ: usize = 16;
         const SECT_CT: usize = 16;
         const CAP: usize = SECT_SZ * SECT_CT - std::mem::size_of::<usize>();
-        let mut rando = Randomizer::new([3u8; Randomizer::HASH.len()]);
         let source = {
             let mut expected = Vec::with_capacity(CAP);
-            expected.extend((&mut rando).take(expected.capacity()).map(|e| (e % 256) as u8));
+            expected.extend(
+                std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
+                    .take(CAP)
+            );
             expected
         };
-        let rando2 = Randomizer::new([5u8; Randomizer::HASH.len()]);
-        let indices: Vec<(usize, usize)> = rando
-            .zip(rando2)
-            .take(2)
-            .map(|(mut first, mut second)| {
-                first %= source.len();
-                second &= source.len();
-                let low = first.min(second);
-                let high = first.max(second);
-                (low, high)
-            })
-            .collect();
-
-        let mut sectored = make_buf_sectored(SECT_SZ, SECT_CT);
+        let indices: Vec<(usize, usize)> = {
+            let rando = Randomizer::new([3u8; Randomizer::HASH.len()]);
+            let rando2 = Randomizer::new([5u8; Randomizer::HASH.len()]);
+            rando
+                .zip(rando2)
+                .take(SECT_CT)
+                .map(|(mut first, mut second)| {
+                    first %= source.len();
+                    second &= source.len();
+                    let low = first.min(second);
+                    let high = first.max(second);
+                    (low, high)
+                })
+                .collect()
+        };
+
+        let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
         sectored.write_all(&[0u8; CAP]).expect("failed to fill sectored");
         sectored.flush().expect("flush failed");
-        for (low, high) in indices.iter() {
+        for (_k, (low, high)) in indices.iter().enumerate() {
             sectored.seek(SeekFrom::Start(*low as u64)).expect("seek failed");
-            sectored.write_all(&source[*low..*high]).expect("write failed");
-            sectored.flush().expect("flush failed");
+            let src = &source[*low..*high];
+            sectored.write(src).expect("write failed");
         }
+        sectored.flush().expect("flush failed");
         let mut buf = vec![0u8; CAP];
         for (_k, (low, high)) in indices.iter().enumerate() {
             sectored.seek(SeekFrom::Start(*low as u64)).expect("seek failed");
@@ -1078,4 +1229,50 @@ mod tests {
             assert_eq!(expected, actual);
         }
     }
+
+    #[test]
+    fn sectored_buf_read_past_end() {
+        const LEN: usize = 32;
+        let mut sectored = SectoredBuf::new()
+            .try_compose(SectoredCursor::new([0u8; LEN], LEN))
+            .expect("compose failed");
+        const BUF_LEN: usize = LEN - SectoredBuf::<()>::RESERVED + 1;
+        sectored.write(&[1u8; BUF_LEN - 1]).expect("write failed");
+        sectored.seek(SeekFrom::Start(0)).expect("seek failed");
+        let mut buf = [0u8; BUF_LEN];
+        // Note that buf is one byte longer than the available capacity in the cursor.
+        sectored.read(&mut buf).expect("read failed");
+        assert_eq!(&[1u8; BUF_LEN - 1], &buf[..(BUF_LEN - 1)]);
+        assert_eq!(0u8, buf[BUF_LEN - 1]);
+    }
+
+    /// Tests that the data written in try_compose is actually written back to the underlying stream.
+    #[test]
+    fn sectored_buf_write_back() {
+        let mut sectored = SectoredBuf::new()
+            .try_compose(SectoredCursor::new(vec![0u8; 24], 16))
+            .expect("compose failed");
+        let expected = [1u8; 8];
+        sectored.write(&expected).expect("first write failed");
+        sectored.write(&[2u8; 8]).expect("second write failed");
+        sectored.seek(SeekFrom::Start(0)).expect("seek failed");
+        let mut actual = [0u8; 8];
+        sectored.read(&mut actual).expect("read failed");
+        assert_eq!(expected, actual);
+    }
+
+    #[test]
+    fn sectored_buf_write_past_end() {
+        const LEN: usize = 8;
+        let mut sectored = SectoredBuf::new()
+            .try_compose(SectoredCursor::new(vec![0u8; 0], LEN))
+            .expect("compos failed");
+        let expected = [1u8; LEN + 1];
+        sectored.write(&expected).expect("write failed");
+        sectored.seek(SeekFrom::Start(0)).expect("seek failed");
+        let mut actual = [0u8; LEN + 1];
+        sectored.read(&mut actual).expect("read failed");
+        assert_eq!(expected, actual);
+
+    }
 }