// The dead code warnings create too much noise during wire-framing. // TODO: Delete this prior to release. #![allow(dead_code)] #[cfg(test)] mod test_helpers; #[cfg(test)] mod serde_tests; #[macro_use] extern crate static_assertions; use brotli::{CompressorWriter, Decompressor}; use btserde::{self, read_from, write_to}; mod crypto; use crypto::{AsymKeyPub, Cryptotext, Hash, HashKind, Sign, Signature, SymKey}; use log::error; use serde::{Deserialize, Serialize}; use serde_big_array::BigArray; use std::{ collections::HashMap, convert::{Infallible, TryFrom}, fmt::{self, Display, Formatter}, fs::{File, OpenOptions}, hash::Hash as Hashable, io::{self, Read, Seek, SeekFrom, Write}, ops::{Add, Sub}, time::{Duration, SystemTime}, }; #[derive(Debug)] enum Error { Io(std::io::Error), Serde(btserde::Error), Crypto(crypto::Error), IncorrectSize { expected: usize, actual: usize }, Custom(Box), } impl Error { fn custom(err: E) -> Error { Error::Custom(Box::new(err)) } } impl Display for Error { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { Error::Io(err) => err.fmt(f), Error::Serde(err) => err.fmt(f), Error::Crypto(err) => err.fmt(f), Error::IncorrectSize { expected, actual } => { write!(f, "incorrect size {}, expected {}", actual, expected) } Error::Custom(err) => err.fmt(f), } } } impl std::error::Error for Error {} impl From for Error { fn from(err: std::io::Error) -> Self { Error::Io(err) } } impl From for std::io::Error { fn from(err: Error) -> Self { io::Error::new(io::ErrorKind::Other, err) } } impl From for Error { fn from(err: btserde::Error) -> Self { Error::Serde(err) } } impl From for Error { fn from(err: crypto::Error) -> Self { Error::Crypto(err) } } impl From for Error { fn from(err: std::num::TryFromIntError) -> Self { Error::custom(err) } } type Result = std::result::Result; /// A Block tagged with its version number. When a block of a previous version is received over /// the network or read from the filesystem, it is upgraded to the current version before being /// processed. #[derive(Debug, PartialEq, Serialize, Deserialize)] enum VersionedBlock { V0(Block), } const SECTOR_SZ_DEFAULT: usize = 4096; // A trait for streams which only allow reads and writes in fixed sized units called sectors. trait Sectored { // Returns the size of the sector for this stream. fn sector_sz(&self) -> usize; fn assert_sector_sz(&self, actual: usize) -> Result<()> { let expected = self.sector_sz(); if expected == actual { Ok(()) } else { Err(Error::IncorrectSize { expected, actual }) } } } #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] pub struct Header { path: Path, readcaps: HashMap>, writecap: Writecap, merkle_root: Hash, } /// A container which binds together ciphertext along with the metadata needed to identify, /// verify and decrypt it. #[derive(Debug, PartialEq, Serialize, Deserialize)] struct Block { header: Header, sig: Signature, body: T, } impl Block { fn try_compose_body, U: Decompose, V: TryCompose>( self, new_body: V, ) -> Result> { Ok(Block { header: self.header, sig: self.sig, body: new_body.try_compose(self.body).map_err(|err| err.into())?, }) } fn compose_body, V: Compose>(self, new_body: V) -> Block { Block { header: self.header, sig: self.sig, body: new_body.compose(self.body), } } } impl Block { fn new>(path: P) -> Result> { let mut file = OpenOptions::new().read(true).write(true).open(path)?; let header: Header = read_from(&mut file)?; let sig: Signature = read_from(&mut file)?; crypto::verify_header(&header, &sig)?; Ok(Block { header, sig, body: file, }) } } impl Write for Block { fn write(&mut self, buf: &[u8]) -> io::Result { self.body.write(buf) } fn flush(&mut self) -> io::Result<()> { self.body.flush() } } impl Read for Block { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.body.read(buf) } } impl Seek for Block { fn seek(&mut self, pos: io::SeekFrom) -> io::Result { self.body.seek(pos) } } trait Decompose { fn into_inner(self) -> T; } trait TryCompose> { type Error; fn try_compose(self, inner: T) -> std::result::Result; } trait Compose { fn compose(self, inner: T) -> U; } impl, S: TryCompose> Compose for S { fn compose(self, inner: T) -> U { let result = self.try_compose(inner); // Safety: Infallible has no values, so `result` must be `Ok`. unsafe { result.unwrap_unchecked() } } } /// Extensions to the `Read` trait. trait ReadExt: Read { /// Reads repeatedly until one of the following occur: /// 1. The given buffer is full. /// 2. A call to `read` returns 0. /// 3. A call to `read` returns an error. /// The number of bytes read is returned. If an error is returned, then no bytes were read. fn fill_buf(&mut self, mut dest: &mut [u8]) -> io::Result { let dest_len_start = dest.len(); while !dest.is_empty() { let byte_ct = match self.read(dest) { Ok(byte_ct) => byte_ct, Err(err) => { if dest_len_start == dest.len() { return Err(err); } else { // We're not allowed to return an error if we've already read from self. error!("an error occurred in fill_buf: {}", err); break; } } }; if 0 == byte_ct { break; } dest = &mut dest[byte_ct..]; } Ok(dest_len_start - dest.len()) } } impl ReadExt for T {} impl Decompose for CompressorWriter { fn into_inner(self) -> T { self.into_inner() } } impl Decompose for Decompressor { fn into_inner(self) -> T { self.into_inner() } } #[derive(Clone)] struct BrotliParams { buf_sz: usize, quality: u32, window_sz: u32, } impl BrotliParams { fn new(buf_sz: usize, quality: u32, window_sz: u32) -> BrotliParams { BrotliParams { buf_sz, quality, window_sz, } } } impl TryCompose> for BrotliParams { type Error = Error; fn try_compose(self, inner: T) -> Result> { Ok(CompressorWriter::new( inner, self.buf_sz, self.quality, self.window_sz, )) } } impl TryCompose> for BrotliParams { type Error = Error; fn try_compose(self, inner: T) -> Result> { Ok(Decompressor::new(inner, self.buf_sz)) } } /// TODO: Remove this once the error_chain crate is integrated. fn err_conv( result: std::result::Result, ) -> std::result::Result { result.map_err(|err| io::Error::new(io::ErrorKind::Other, err)) } /// A stream which buffers writes and read such that the inner stream only sees reads and writes /// of sector length buffers. struct SectoredBuf { inner: T, buf: Vec, /// The offset into the inner stream which the zero offset byte in `buf` corresponds to. buf_start: usize, /// Indicates if the contents of `buf` have been written to, and so whether `buf` needs to be /// written back to `inner` before it is refilled. dirty: bool, /// The total number of bytes that have been written to the inner stream, including the reserved /// bytes at the beginning. len: usize, /// The current position of this stream, expressed as an offset into the inner stream. pos: usize, } impl SectoredBuf<()> { fn new() -> SectoredBuf<()> { SectoredBuf { inner: (), buf: Vec::new(), buf_start: 0, dirty: false, len: 0, pos: 0, } } } impl SectoredBuf { /// The number of bytes at the beginning of the inner stream which are reserved to store the /// length of data written. All offsets into the stored data must be shifted by this amount to /// be translated to an offset in the inner stream. const RESERVED: usize = std::mem::size_of::(); /// Returns the position in the inner stream which the given position in this stream corresponds /// to. fn inner_pos(self_pos: u64) -> u64 { let offset: u64 = Self::RESERVED.try_into().unwrap(); self_pos + offset } /// Returns the position in this stream which the given position in the inner stream corresponds /// to. fn self_pos(inner_pos: u64) -> u64 { let offset: u64 = Self::RESERVED.try_into().unwrap(); inner_pos - offset } /// Returns the offset into the internal buffer that corresponds to the current position. fn buf_pos(&self) -> usize { self.pos - self.buf_start } /// Returns one more than the last index in the internal buffer which can be read. fn buf_end(&self) -> usize { let limit = self.len.min(self.buf_start + self.sector_sz()); limit - self.buf_start } /// Returns the index of the sector which is currently loaded into the buffer. fn buf_sector_index(&self) -> usize { self.pos / self.sector_sz() } } impl SectoredBuf { /// Fills the internal buffer by reading from the inner stream at the current position /// and updates `self.buf_start` with the position read from. fn fill_internal_buf(&mut self) -> io::Result { self.buf_start = err_conv(self.inner.stream_position()?.try_into())?; let read_bytes = if self.buf_start < self.len { let read_bytes = self.inner.fill_buf(&mut self.buf)?; if read_bytes < self.buf.len() { return Err(io::Error::new( io::ErrorKind::Other, format!( "Failed to fill SectoredBuf.buf. Expected {} bytes, got {}.", self.buf.len(), read_bytes ), )); } read_bytes } else { 0 }; Ok(read_bytes) } } impl Decompose for SectoredBuf { fn into_inner(self) -> T { self.inner } } impl TryCompose> for SectoredBuf<()> { type Error = Error; fn try_compose(self, inner: T) -> Result> { let sect_sz = inner.sector_sz(); if sect_sz < Self::RESERVED { return Err(Error::custom(format!( "a sector size of at least {} is required. Got {}", Self::RESERVED, sect_sz, ))); } let mut sectored = SectoredBuf { inner, buf: self.buf, buf_start: 0, dirty: false, len: Self::RESERVED, pos: Self::RESERVED, }; sectored.inner.seek(SeekFrom::Start(0))?; sectored.buf.resize(sect_sz, 0); let len_stored = match sectored.fill_internal_buf() { Ok(bytes_read) => bytes_read >= Self::RESERVED, Err(err) => { error!("SectoredBuf::fill_internal_buf returned an error: {}", err); false } }; if len_stored { if let Ok(len) = read_from::(&mut sectored.buf.as_slice()) { sectored.len = len.try_into()?; } } else { write_to(&Self::RESERVED, &mut sectored.buf.as_mut_slice())?; sectored.dirty = true; } Ok(sectored) } } impl Sectored for SectoredBuf { fn sector_sz(&self) -> usize { self.buf.len() } } impl Write for SectoredBuf { fn write(&mut self, mut src: &[u8]) -> io::Result { let src_len_start = src.len(); let mut dest = { let buf_pos = self.buf_pos(); &mut self.buf[buf_pos..] }; while !src.is_empty() { if dest.is_empty() { if let Err(err) = self.flush() { error!("A call to SectoredBuf::flush returned an error: {}", err); break; } dest = &mut self.buf[..]; } let sz = src.len().min(dest.len()); (&mut dest[..sz]).copy_from_slice(&src[..sz]); dest = &mut dest[sz..]; src = &src[sz..]; self.dirty = sz > 0; self.pos += sz; } Ok(src_len_start - src.len()) } fn flush(&mut self) -> io::Result<()> { if !self.dirty { return Ok(()); } // Write out the contents of the buffer. let sect_sz: u64 = err_conv(self.sector_sz().try_into())?; let inner_pos = self.inner.stream_position()?; let inner_pos_usize: usize = err_conv(inner_pos.try_into())?; let is_new_sector = self.pos > inner_pos_usize; let is_full = (self.buf.len() - self.buf_pos()) == 0; let seek_to = if is_new_sector { if is_full { inner_pos + sect_sz } else { inner_pos } } else { // The contents of the buffer were previously read from inner, so we write the // updated contents to the same offset. let sect_start: u64 = err_conv(self.buf_start.try_into())?; self.inner.seek(SeekFrom::Start(sect_start))?; if is_full { inner_pos } else { inner_pos - sect_sz } }; self.inner.write_all(&self.buf)?; // Update the stored length. self.len = self.len.max(self.pos); self.inner.seek(SeekFrom::Start(0))?; self.fill_internal_buf()?; let len: u64 = err_conv(self.len.try_into())?; err_conv(write_to(&len, &mut self.buf.as_mut_slice()))?; self.inner.seek(SeekFrom::Start(0))?; self.inner.write_all(&self.buf)?; // Seek to the next position. self.inner.seek(SeekFrom::Start(seek_to))?; self.fill_internal_buf()?; self.dirty = false; Ok(()) } } impl Read for SectoredBuf { fn read(&mut self, mut dest: &mut [u8]) -> io::Result { if self.pos == self.len { return Ok(0); } let dest_len_start = dest.len(); let mut src = { let start = self.buf_pos(); let end = self.buf_end(); &self.buf[start..end] }; while !dest.is_empty() { if src.is_empty() { if self.pos >= self.len { break; } let byte_ct = match self.fill_internal_buf() { Ok(byte_ct) => byte_ct, Err(err) => { error!("SectoredBuf::full_internal_buf returned an error: {}", err); break; } }; if 0 == byte_ct { break; } src = &self.buf[..byte_ct]; } let sz = src.len().min(dest.len()); (&mut dest[..sz]).copy_from_slice(&src[..sz]); dest = &mut dest[sz..]; src = &src[sz..]; self.pos += sz; } Ok(dest_len_start - dest.len()) } } impl Seek for SectoredBuf { fn seek(&mut self, pos: SeekFrom) -> io::Result { let inner_pos = self.inner.stream_position()?; let inner_pos_new = match pos { SeekFrom::Start(rel_start) => Self::inner_pos(rel_start), SeekFrom::Current(rel_curr) => { if rel_curr > 0 { inner_pos + rel_curr as u64 } else { inner_pos - rel_curr as u64 } } SeekFrom::End(_) => { return Err(io::Error::new( io::ErrorKind::Unsupported, "seeking relative to the end of the stream is not supported", )) } }; let sect_sz = self.sector_sz(); let sect_index = self.buf_sector_index(); let sect_index_new = err_conv(TryInto::::try_into(inner_pos_new))? / sect_sz; let pos: u64 = err_conv(self.pos.try_into())?; if sect_index != sect_index_new || pos == inner_pos { self.flush()?; let seek_to: u64 = err_conv((sect_index_new * sect_sz).try_into())?; self.inner.seek(SeekFrom::Start(seek_to))?; self.fill_internal_buf()?; } self.pos = err_conv(inner_pos_new.try_into())?; Ok(Self::self_pos(inner_pos_new)) } } /// An envelopment of a key, which is tagged with the principal who the key is meant for. #[derive(Debug, PartialEq, Serialize, Deserialize)] struct Readcap { /// The principal this `Readcap` was issued to. issued_to: Principal, /// An encipherment of a block key using the public key of the principal. key: Cryptotext, } impl Readcap { fn new(issued_to: Hash, key: Cryptotext) -> Readcap { Readcap { issued_to: Principal(issued_to), key, } } } /// Verifies that a principal is authorized to write blocks in a tree. #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] struct Writecap { /// The principal this `Writecap` was issued to. issued_to: Principal, /// The path where this write caps's validity begins. path: Path, /// The point in time after which this write cap is no longer valid. expires: Epoch, /// The public key used to sign this write cap. signing_key: AsymKeyPub, /// A digital signature which covers all of the fields in the write cap except for next. signature: Signature, /// The next write cap in the chain leading back to the root. next: Option>, } /// Fragments are created from blocks using Erasure Encoding and stored with other nodes in the /// network to provide availability and redundancy of data. #[derive(Debug, PartialEq, Serialize, Deserialize)] struct Fragment { /// The path to the block this fragment is from. path: Path, /// The serial number of this fragment. serial: FragmentSerial, /// The actual data. body: Vec, } impl Fragment { /// Create a new fragment with the given fields. If `path_str` cannot be parsed then a failed /// `Result` is returned containing a `PathError`. fn new( path_str: &str, serial_num: u32, body: Vec, ) -> std::result::Result { let result = Path::try_from(path_str); Ok(Fragment { path: result?, serial: FragmentSerial(serial_num), body, }) } } /// The body of every non-leaf node in a tree contains this data structure. #[derive(Debug, PartialEq, Serialize, Deserialize)] struct Directory { /// The nodes that are attached to the tree at this block. nodes: Vec, /// This block's descendants. children: HashMap>, } /// Keeps track of which principal is storing a fragment. #[derive(Debug, PartialEq, Serialize, Deserialize)] struct FragmentRecord { /// The fragment serial number this record is for. serial: FragmentSerial, /// The principal who is storing this fragment. stored_by: Principal, } impl FragmentRecord { /// Creates a new `FragmentRecord` whose `serial` and `stored_by` fields are set to /// the given values. fn new(serial: u32, stored_by: Hash) -> FragmentRecord { FragmentRecord { serial: FragmentSerial(serial), stored_by: Principal(stored_by), } } } /// An identifier for a security principal, which is any entity that can be authenticated. #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Hashable, Clone)] struct Principal(Hash); impl Principal { fn kind(&self) -> HashKind { HashKind::from(&self.0) } } /// Trait for types which are owned by a `Principal`. trait Owned { /// Returns the `Principal` that owns `self`, using the given hash algorithm. fn owner_of_kind(&self, kind: HashKind) -> Principal; /// Returns the `Principal` that owns `self`, using the default hash algorithm. fn owner(&self) -> Principal { self.owner_of_kind(HashKind::default()) } } /// An identifier for a block in a tree. #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] struct Path { owner: Principal, components: Vec, } impl Path { /// The character that is used to separate path components. const SEP: char = '/'; /// The limit, in bytes, of a `Path`'s length. const BYTE_LIMIT: usize = 4096; /// Returns a result which, when successful, contains the index after the last character in the /// current path component. fn component_end>( start: usize, first: char, pairs: &mut I, ) -> std::result::Result { if first == Path::SEP { return Err(PathError::EmptyComponent); } let end; let mut last = start; loop { match pairs.next() { Some((index, Path::SEP)) => { end = index; break; } Some((index, _)) => last = index, None => { end = last + 1; break; } } } if end == start { Err(PathError::EmptyComponent) } else { Ok(end) } } /// Asserts that the number of bytes in the given string is no more than `Path::BYTE_LIMIT`. fn assert_not_too_long(string: &str) -> std::result::Result<(), PathError> { let len = string.len(); if len > Path::BYTE_LIMIT { return Err(PathError::PathTooLong(len)); } Ok(()) } /// Returns true if `other` is a subpath of this `Path`. fn contains(&self, other: &Path) -> bool { if self.owner != other.owner { return false; }; // This path must be no longer than the other path. if self.components.len() > other.components.len() { return false; } // Skip the component containing the owner. let self_iter = self.components.iter().skip(1); let other_iter = other.components.iter().skip(1); for pair in self_iter.zip(other_iter) { if pair.0 != pair.1 { return false; } } true } } impl<'s> TryFrom<&'s str> for Path { type Error = PathError; fn try_from(string: &'s str) -> std::result::Result { Path::assert_not_too_long(string)?; let mut pairs = string.char_indices(); let mut components = Vec::new(); let mut last_end = 0; while let Some((start, c)) = pairs.next() { let end = Path::component_end(start, c, &mut pairs)?; last_end = end; let slice = &string[start..end]; components.push(slice.to_string()); } // An empty component is added to the end to indicate if there was a trailing slash. if string.len() - 1 == last_end { components.push("".to_string()); } let leading = components .get(0) .ok_or(PathError::InvalidLeadingComponent)?; let hash = Hash::try_from(leading.as_str()).map_err(|_| PathError::InvalidLeadingComponent)?; Ok(Path { owner: Principal(hash), components, }) } } impl Display for Path { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { if self.components.is_empty() { return write!(f, ""); }; let mut iter = self.components.iter(); let first = iter.next().unwrap(); let mut output = write!(f, "{}", first); for component in iter { output = write!(f, "{}{}", Path::SEP, component) } output } } /// Errors which can occur when converting a string to a `Path`. #[derive(Debug, PartialEq)] enum PathError { /// Occurs when the number of bytes in a string is greater than `Path::BYTE_LIMIT`. PathTooLong(usize), /// Indicates that a path string was empty. Empty, /// Occurs when a component in a path string was empty. EmptyComponent, /// Occurs when the leading component of a path is not in the correct format. InvalidLeadingComponent, } impl Display for PathError { fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { match self { PathError::PathTooLong(length) => formatter.write_fmt(format_args!( "path contained {} bytes, which is over the {} byte limit", length, Path::BYTE_LIMIT )), PathError::Empty => formatter.write_str("path was empty"), PathError::EmptyComponent => formatter.write_str("component of path was empty"), PathError::InvalidLeadingComponent => { formatter.write_str("invalid leading path component") } } } } /// An instant in time represented by the number of seconds since January 1st 1970, 00:00:00 UTC. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)] struct Epoch(u64); impl Epoch { /// Returns the current epoch time. fn now() -> Epoch { let now = SystemTime::now(); // If the system clock is before the unix epoch, just panic. let epoch = now.duration_since(SystemTime::UNIX_EPOCH).unwrap(); Epoch(epoch.as_secs()) } } impl Copy for Epoch {} impl Add for Epoch { type Output = Self; fn add(self, other: Duration) -> Self { Epoch(self.0 + other.as_secs()) } } impl Sub for Epoch { type Output = Self; fn sub(self, other: Duration) -> Self { Epoch(self.0 - other.as_secs()) } } /// The serial number of a block fragment. #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Hashable)] struct FragmentSerial(u32); #[cfg(test)] mod tests { use std::io::Cursor; use super::*; use test_helpers::*; fn path_from_str_test_case( expected: std::result::Result, input: &str, ) -> std::result::Result<(), PathError> { let result = Path::try_from(input); assert_eq!(expected, result); Ok(()) } #[test] fn path_from_str_multiple_components_ok() -> std::result::Result<(), PathError> { let expected = make_path(vec!["red", "green", "blue"]); let input = format!("{}/red/green/blue", expected.owner.0); path_from_str_test_case(Ok(expected), input.as_str())?; Ok(()) } #[test] fn path_from_str_one_component_ok() -> std::result::Result<(), PathError> { let expected = make_path(vec![]); let input = expected.owner.0.to_string(); path_from_str_test_case(Ok(expected), input.as_str())?; Ok(()) } #[test] fn path_from_str_trailing_slash_ok() -> std::result::Result<(), PathError> { // Notice the empty component at the end of this path due to the trailing slash. let expected = make_path(vec!["orange", "banana", "shotgun", ""]); let input = format!("{}/orange/banana/shotgun/", expected.owner.0); path_from_str_test_case(Ok(expected), input.as_str())?; Ok(()) } #[test] fn path_from_str_path_too_long_fail() -> std::result::Result<(), PathError> { let principal = make_principal(); let input = format!("{}/{}", principal.0, "*".repeat(4097)); let expected = Err(PathError::PathTooLong(input.len())); path_from_str_test_case(expected, input.as_str())?; Ok(()) } #[test] fn path_from_str_multiple_slashes_fail() -> std::result::Result<(), PathError> { let expected = Err(PathError::EmptyComponent); let input = format!("{}//orange", make_principal().0); path_from_str_test_case(expected, input.as_str())?; Ok(()) } #[test] fn path_from_str_leading_slash_fail() -> std::result::Result<(), PathError> { let expected = Err(PathError::EmptyComponent); let input = format!("/{}/orange/banana/shotgun", make_principal().0); path_from_str_test_case(expected, input.as_str())?; Ok(()) } #[test] fn path_round_trip() -> std::result::Result<(), PathError> { let expected = make_path(vec!["interstitial", "inter-related", "intersections"]); let actual = Path::try_from(expected.to_string().as_str())?; assert_eq!(expected, actual); Ok(()) } #[test] fn path_contains_true() { let larger = make_path(vec!["apps"]); let smaller = make_path(vec!["apps", "bohdi"]); assert!(larger.contains(&smaller)); } #[test] fn path_contains_true_only_owner() { let larger = make_path(vec![]); let smaller = make_path(vec![]); assert!(larger.contains(&smaller)); } #[test] fn path_contains_false_self_is_longer() { let first = make_path(vec!["apps", "bohdi"]); let second = make_path(vec!["apps"]); assert!(!first.contains(&second)); } #[test] fn path_contains_false_same_owners() { let first = make_path(vec!["apps"]); let second = make_path(vec!["nodes"]); assert!(!first.contains(&second)); } #[test] fn path_contains_false_different_owners() { let first = make_path(vec!["apps"]); let mut second = make_path(vec!["apps"]); second.owner = Principal(Hash::Sha2_256(PRINCIPAL2)); assert!(!first.contains(&second)); } #[test] fn brotli_compress_decompress() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let params = BrotliParams::new(SECT_SZ, 8, 20); let mut memory = Cursor::new([0u8; SECT_SZ * SECT_CT]); { let write: CompressorWriter<_> = params .clone() .try_compose(&mut memory) .expect("compose for write failed"); write_fill(write, SECT_SZ, SECT_CT); } memory.seek(SeekFrom::Start(0)).expect("seek failed"); { let read: Decompressor<_> = params .try_compose(&mut memory) .expect("compose for read failed"); read_check(read, SECT_SZ, SECT_CT); } } fn make_sectored_buf(sect_sz: usize, sect_ct: usize) -> SectoredBuf>> { SectoredBuf::new() .try_compose(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz)) .expect("compose for sectored buffer failed") } #[test] fn sectored_buf_fill_inner() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); let sect_sz = sectored.sector_sz(); assert_eq!(0, sect_sz % 16); let chunk_sz = sect_sz / 16; let chunk_ct = SECT_CT * 16; write_fill(&mut sectored, chunk_sz, chunk_ct); } #[test] fn sectored_buf_write_read_sequential() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); let sect_sz = sectored.sector_sz(); assert_eq!(0, sect_sz % 16); let chunk_sz = sect_sz / 16; // We subtract one here so that the underlying buffer is not completely filled. This // exercises the length limiting capability of the sectored buffer. let chunk_ct = SECT_CT * 16 - 1; write_fill(&mut sectored, chunk_sz, chunk_ct); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); read_check(&mut sectored, chunk_sz, chunk_ct); } #[test] fn sectored_buf_sect_sz_too_small_is_error() { const MIN: usize = SectoredBuf::<()>::RESERVED; let result = SectoredBuf::new().try_compose(SectoredCursor::new([0u8; MIN], MIN - 1)); assert!(result.is_err()); } #[test] fn sectored_buf_len_preserved() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); let expected = vec![42u8; 12]; // We need to ensure that writing expected will not fill up the buffer in sectored. assert!(expected.len() < sectored.sector_sz() - SectoredBuf::<()>::RESERVED); sectored.write_all(&expected).expect("write failed"); sectored.flush().expect("flush failed"); let inner = sectored.into_inner(); let mut sectored = SectoredBuf::new() .try_compose(inner) .expect("failed to compose sectored buffer"); let mut actual = vec![0u8; expected.len()]; sectored .fill_buf(actual.as_mut_slice()) .expect("failed to fill actual"); assert_eq!(expected, actual); } #[test] fn sectored_buf_seek() { let sect_sz = 16usize; let sect_ct = 16usize; let cap = sect_sz * sect_ct - std::mem::size_of::(); let source = { let mut source = Vec::with_capacity(cap); source.extend( std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None }) .take(cap), ); source }; let mut sectored = make_sectored_buf(sect_sz, sect_ct); sectored.write(&source).expect("write failed"); let mut buf = [0u8; 1]; let end = cap.try_into().expect("cap cannot fit into a u8"); for pos in (0..end).rev() { sectored .seek(SeekFrom::Start(pos as u64)) .expect("seek failed"); sectored.read(&mut buf).expect("read failed"); assert_eq!(pos, buf[0]); } } #[test] fn sectored_buf_write_read_random() { const SECT_SZ: usize = 16; const SECT_CT: usize = 16; const CAP: usize = SECT_SZ * SECT_CT - std::mem::size_of::(); let source = { let mut expected = Vec::with_capacity(CAP); expected.extend( std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None }) .take(CAP), ); expected }; let indices: Vec<(usize, usize)> = { let rando = Randomizer::new([3u8; Randomizer::HASH.len()]); let rando2 = Randomizer::new([5u8; Randomizer::HASH.len()]); rando .zip(rando2) .take(SECT_CT) .map(|(mut first, mut second)| { first %= source.len(); second &= source.len(); let low = first.min(second); let high = first.max(second); (low, high) }) .collect() }; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); sectored .write_all(&[0u8; CAP]) .expect("failed to fill sectored"); sectored.flush().expect("flush failed"); for (_k, (low, high)) in indices.iter().enumerate() { sectored .seek(SeekFrom::Start(*low as u64)) .expect("seek failed"); let src = &source[*low..*high]; sectored.write(src).expect("write failed"); } sectored.flush().expect("flush failed"); let mut buf = vec![0u8; CAP]; for (_k, (low, high)) in indices.iter().enumerate() { sectored .seek(SeekFrom::Start(*low as u64)) .expect("seek failed"); let actual = &mut buf[*low..*high]; sectored.fill_buf(actual).expect("read failed"); let expected = &source[*low..*high]; assert_eq!(expected, actual); } } #[test] fn sectored_buf_read_past_end() { const LEN: usize = 32; let mut sectored = SectoredBuf::new() .try_compose(SectoredCursor::new([0u8; LEN], LEN)) .expect("compose failed"); const BUF_LEN: usize = LEN - SectoredBuf::<()>::RESERVED + 1; sectored.write(&[1u8; BUF_LEN - 1]).expect("write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut buf = [0u8; BUF_LEN]; // Note that buf is one byte longer than the available capacity in the cursor. sectored.read(&mut buf).expect("read failed"); assert_eq!(&[1u8; BUF_LEN - 1], &buf[..(BUF_LEN - 1)]); assert_eq!(0u8, buf[BUF_LEN - 1]); } /// Tests that the data written in try_compose is actually written back to the underlying stream. #[test] fn sectored_buf_write_back() { let mut sectored = SectoredBuf::new() .try_compose(SectoredCursor::new(vec![0u8; 24], 16)) .expect("compose failed"); let expected = [1u8; 8]; sectored.write(&expected).expect("first write failed"); sectored.write(&[2u8; 8]).expect("second write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut actual = [0u8; 8]; sectored.read(&mut actual).expect("read failed"); assert_eq!(expected, actual); } #[test] fn sectored_buf_write_past_end() { const LEN: usize = 8; let mut sectored = SectoredBuf::new() .try_compose(SectoredCursor::new(vec![0u8; 0], LEN)) .expect("compos failed"); let expected = [1u8; LEN + 1]; sectored.write(&expected).expect("write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut actual = [0u8; LEN + 1]; sectored.read(&mut actual).expect("read failed"); assert_eq!(expected, actual); } }