// The dead code warnings create too much noise during wire-framing. // TODO: Delete this prior to release. #![allow(dead_code)] mod crypto; #[cfg(test)] mod test_helpers; #[macro_use] extern crate static_assertions; #[cfg(test)] #[macro_use] extern crate lazy_static; use brotli::{CompressorWriter, Decompressor}; use btserde::{self, read_from, write_to}; use crypto::{ AsymKeyPub, Ciphertext, Creds, Decrypter, Encrypter, EncrypterExt, Hash, HashKind, MerkleStream, SecretStream, Sign, Signature, Signer, SymKey, SymKeyKind, }; use log::{error, warn}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_big_array::BigArray; use std::{ collections::HashMap, convert::{Infallible, TryFrom}, fmt::{self, Display, Formatter}, hash::Hash as Hashable, io::{self, Read, Seek, SeekFrom, Write}, marker::PhantomData, ops::{Add, Sub}, sync::PoisonError, time::{Duration, SystemTime}, }; #[derive(Debug)] pub enum Error { MissingWritecap, Io(std::io::Error), Serde(btserde::Error), Crypto(crypto::Error), IncorrectSize { expected: usize, actual: usize }, Custom(Box), } impl Error { fn custom(err: E) -> Error { Error::Custom(Box::new(err)) } } impl Display for Error { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { Error::MissingWritecap => write!(f, "missing writecap"), Error::Io(err) => err.fmt(f), Error::Serde(err) => err.fmt(f), Error::Crypto(err) => err.fmt(f), Error::IncorrectSize { expected, actual } => { write!(f, "incorrect size {}, expected {}", actual, expected) } Error::Custom(err) => err.fmt(f), } } } impl std::error::Error for Error {} impl From for Error { fn from(err: std::io::Error) -> Self { Error::Io(err) } } impl From for std::io::Error { fn from(err: Error) -> Self { io::Error::new(io::ErrorKind::Other, err) } } impl From for Error { fn from(err: btserde::Error) -> Self { Error::Serde(err) } } impl From for Error { fn from(err: crypto::Error) -> Self { Error::Crypto(err) } } impl From for Error { fn from(err: std::num::TryFromIntError) -> Self { Error::custom(err) } } impl From> for Error { fn from(err: PoisonError) -> Self { Error::custom(err.to_string()) } } type Result = std::result::Result; /// TODO: Remove this once the error_chain crate is integrated. trait BoxInIoErr { fn box_err(self) -> std::result::Result; } impl BoxInIoErr for std::result::Result { fn box_err(self) -> std::result::Result { self.map_err(|err| io::Error::new(io::ErrorKind::Other, err)) } } /// TODO: Remove this once the error_chain crate is integrated. trait StrInIoErr { fn str_err(self) -> std::result::Result; } impl StrInIoErr for std::result::Result { fn str_err(self) -> std::result::Result { self.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string())) } } /// The default sector size to use for new blocks. const SECTOR_SZ_DEFAULT: usize = 4096; /// Trait for types which provide read and write access to blocks. pub trait Block: Read + Write + Seek + MetaAccess {} // A trait for streams which only allow reads and writes in fixed sized units called sectors. trait Sectored { // Returns the size of the sector for this stream. fn sector_sz(&self) -> usize; fn assert_sector_sz(&self, actual: usize) -> Result<()> { let expected = self.sector_sz(); if expected == actual { Ok(()) } else { Err(Error::IncorrectSize { expected, actual }) } } } /// A version of the `Write` trait, which allows integrity information to be supplied when flushing. trait WriteInteg: Write { fn flush_integ(&mut self, integrity: &[u8]) -> io::Result<()>; } trait Decompose { fn into_inner(self) -> T; } trait TryCompose> { type Error; fn try_compose(self, inner: T) -> std::result::Result; } trait Compose { fn compose(self, inner: T) -> U; } impl, S: TryCompose> Compose for S { fn compose(self, inner: T) -> U { let result = self.try_compose(inner); // Safety: Infallible has no values, so `result` must be `Ok`. unsafe { result.unwrap_unchecked() } } } pub trait MetaAccess { fn block_key(&self) -> Result; fn add_readcap_for(&mut self, owner: Principal, key: &dyn Encrypter) -> Result<()>; /// Returns the integrity value used to protect the contents of the block. fn integrity(&self) -> Option<&[u8]>; fn set_path(&mut self, path: Path); } /// Extensions to the `Read` trait. trait ReadExt: Read { /// Reads repeatedly until one of the following occur: /// 1. The given buffer is full. /// 2. A call to `read` returns 0. /// 3. A call to `read` returns an error. /// The number of bytes read is returned. If an error is returned, then no bytes were read. fn fill_buf(&mut self, mut dest: &mut [u8]) -> io::Result { let dest_len_start = dest.len(); while !dest.is_empty() { let byte_ct = match self.read(dest) { Ok(byte_ct) => byte_ct, Err(err) => { if dest_len_start == dest.len() { return Err(err); } else { // We're not allowed to return an error if we've already read from self. error!("an error occurred in fill_buf: {}", err); break; } } }; if 0 == byte_ct { break; } dest = &mut dest[byte_ct..]; } Ok(dest_len_start - dest.len()) } } impl ReadExt for T {} /// This struct contains all of the metadata fields associated with a block, except for its /// signature. Since this struct implements `Serialize`, this allows for convenient signature /// calculations. #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] pub struct BlockMetaBody { path: Path, readcaps: HashMap>, writecap: Option, /// A hash which provides integrity for the contents of the block body. integrity: Option, /// The public key that corresponds to the private key used to sign this header. signing_key: AsymKeyPub, } impl BlockMetaBody { fn new(creds: &C) -> BlockMetaBody { BlockMetaBody { path: Path::default(), readcaps: HashMap::new(), writecap: creds.writecap().map(|e| e.to_owned()), integrity: None, signing_key: creds.public_sign().to_owned(), } } } /// Signed metadata associated with a block. #[derive(Serialize, Deserialize)] struct BlockMeta { body: BlockMetaBody, sig: Signature, } impl BlockMeta { fn new(creds: &C) -> BlockMeta { let body = BlockMetaBody::new(creds); let sig = Signature::empty(body.signing_key.scheme()); BlockMeta { body, sig } } } struct BlockStream { trailered: Trailered, meta: BlockMeta, meta_body_buf: Vec, creds: C, } impl BlockStream { fn new(inner: T, creds: C) -> Result> { let (trailered, trailer) = Trailered::<_, BlockMeta>::new(inner)?; let trailer = match trailer { Some(trailer) => { crypto::verify_header(&trailer.body, &trailer.sig)?; trailer } None => { let mut meta = BlockMeta::new(&creds); let block_key = SymKey::generate(SymKeyKind::default())?; meta.body .readcaps .insert(creds.principal(), creds.ser_encrypt(&block_key)?); meta.body.writecap = creds.writecap().map(|e| e.to_owned()); meta } }; Ok(BlockStream { trailered, meta: trailer, meta_body_buf: Vec::new(), creds, }) } } impl Write for BlockStream { fn write(&mut self, buf: &[u8]) -> io::Result { self.trailered.write(buf) } fn flush(&mut self) -> io::Result<()> { Err(io::Error::new( io::ErrorKind::Unsupported, "flush is not supported, use flush_integ instead", )) } } impl WriteInteg for BlockStream { fn flush_integ(&mut self, integrity: &[u8]) -> io::Result<()> { let meta_body = &mut self.meta.body; let integ = meta_body.integrity.get_or_insert_with(Hash::default); integ.as_mut().copy_from_slice(integrity); self.meta_body_buf.clear(); write_to(&meta_body, &mut self.meta_body_buf).box_err()?; self.meta.sig = self .creds .sign(std::iter::once(self.meta_body_buf.as_slice()))?; self.trailered.flush(&self.meta)?; Ok(()) } } impl Read for BlockStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.trailered.read(buf) } } impl Seek for BlockStream { /// Seeks to the given position in the stream. If a position beyond the end of the stream is /// specified, the the seek will be to the end of the stream. fn seek(&mut self, pos: SeekFrom) -> io::Result { self.trailered.seek(pos) } } impl MetaAccess for BlockStream { fn block_key(&self) -> Result { let readcap = self .meta .body .readcaps .get(&self.creds.principal()) .ok_or(Error::Crypto(crypto::Error::NoReadCap))?; Ok(crypto::decrypt(readcap, &self.creds)?) } fn add_readcap_for(&mut self, owner: Principal, key: &dyn Encrypter) -> Result<()> { let block_key = self.block_key()?; let readcap = key.ser_encrypt(&block_key)?; self.meta.body.readcaps.insert(owner, readcap); Ok(()) } fn integrity(&self) -> Option<&[u8]> { self.meta.body.integrity.as_ref().map(|hash| hash.as_ref()) } fn set_path(&mut self, path: Path) { self.meta.body.path = path; } } struct BlockOpenOptions { inner: T, creds: C, encrypt: bool, compress: bool, } impl BlockOpenOptions<(), ()> { fn new() -> BlockOpenOptions<(), ()> { BlockOpenOptions { inner: (), creds: (), encrypt: true, compress: true, } } } impl BlockOpenOptions { fn with_inner(self, inner: U) -> BlockOpenOptions { BlockOpenOptions { inner, creds: self.creds, encrypt: self.encrypt, compress: self.compress, } } fn with_creds(self, creds: D) -> BlockOpenOptions { BlockOpenOptions { inner: self.inner, creds, encrypt: self.encrypt, compress: self.compress, } } fn with_encrypt(mut self, encrypt: bool) -> Self { self.encrypt = encrypt; self } fn with_compress(mut self, compress: bool) -> Self { self.compress = compress; self } } impl BlockOpenOptions { fn open(self) -> Result> { let stream = BlockStream::new(self.inner, self.creds)?; let block_key = stream.block_key()?; let mut stream = MerkleStream::new(stream)?; stream.assert_root_integrity()?; if self.encrypt { let stream = SecretStream::new(block_key).try_compose(stream)?; let stream = SectoredBuf::new().try_compose(stream)?; Ok(Box::new(stream)) } else { let stream = SectoredBuf::new().try_compose(stream)?; Ok(Box::new(stream)) } } } /// A struct which wraps a stream and which writes a trailing data structure to when flushed. struct Trailered { inner: T, body_len: u64, phantom: PhantomData, } impl Trailered { fn empty(inner: T) -> Trailered { Trailered { inner, body_len: 0, phantom: PhantomData, } } /// Creates a new `Trailered` containing the given `T`. This method requires that the given /// stream is either empty, or contains a valid serialization of `D` and a the offset at which /// `D` is stored. fn new(mut inner: T) -> Result<(Trailered, Option)> { let pos = inner.stream_position()?; let end = inner.seek(SeekFrom::End(0))?; if 0 == end { return Ok((Self::empty(inner), None)); } inner.seek(SeekFrom::End(-8))?; let offset: i64 = read_from(&mut inner)?; let body_len = inner.seek(SeekFrom::End(offset))?; let trailer: D = read_from(&mut inner)?; inner.seek(SeekFrom::Start(pos))?; Ok(( Trailered { inner, body_len, phantom: PhantomData, }, Some(trailer), )) } } impl Trailered { fn post_write(&mut self, written: usize) -> io::Result { if 0 == written { return Ok(0); } // I cannot return an error at this point because bytes have already been written to inner. // So if I can't track the body len due to a failure, a panic is the only option. let pos = self .inner .stream_position() .expect("failed to get stream position"); self.body_len = self.body_len.max(pos); Ok(written) } } impl Read for Trailered { fn read(&mut self, buf: &mut [u8]) -> io::Result { let pos = self.inner.stream_position()?; let available_u64 = self.body_len - pos; let available: usize = available_u64.try_into().box_err()?; let limit = buf.len().min(available); self.inner.read(&mut buf[..limit]) } } impl Trailered { fn write(&mut self, buf: &[u8]) -> io::Result { let written = self.inner.write(buf)?; self.post_write(written) } fn write_trailer(&mut self, trailer: &D) -> io::Result { let pos = self.inner.stream_position()?; self.inner.seek(SeekFrom::Start(self.body_len))?; write_to(trailer, &mut self.inner).box_err()?; let offset_u64 = 8 + self.inner.stream_position()? - self.body_len; let offset = -(offset_u64 as i64); write_to(&offset, &mut self.inner).box_err()?; Ok(pos) } fn flush(&mut self, trailer: &D) -> io::Result<()> { let prev_pos = self.write_trailer(trailer)?; self.inner.flush()?; self.inner.seek(SeekFrom::Start(prev_pos))?; Ok(()) } } impl Trailered { fn flush_integ(&mut self, trailer: &D, integrity: &[u8]) -> io::Result<()> { let prev_pos = self.write_trailer(trailer)?; self.inner.flush_integ(integrity)?; self.inner.seek(SeekFrom::Start(prev_pos))?; Ok(()) } } impl Seek for Trailered { fn seek(&mut self, pos: SeekFrom) -> io::Result { /// Adds a signed integer to an unsigned integer and returns the result. fn add_signed(unsigned: u64, signed: i64) -> u64 { if signed >= 0 { unsigned + signed as u64 } else { unsigned - (-signed as u64) } } let from_start = match pos { SeekFrom::Start(from_start) => from_start, SeekFrom::Current(from_curr) => add_signed(self.inner.stream_position()?, from_curr), SeekFrom::End(from_end) => add_signed(self.body_len, from_end), }; let from_start = from_start.min(self.body_len); self.inner.seek(SeekFrom::Start(from_start)) } } impl Decompose for CompressorWriter { fn into_inner(self) -> T { self.into_inner() } } impl Decompose for Decompressor { fn into_inner(self) -> T { self.into_inner() } } #[derive(Clone)] struct BrotliParams { buf_sz: usize, quality: u32, window_sz: u32, } impl BrotliParams { fn new(buf_sz: usize, quality: u32, window_sz: u32) -> BrotliParams { BrotliParams { buf_sz, quality, window_sz, } } } impl TryCompose> for BrotliParams { type Error = Error; fn try_compose(self, inner: T) -> Result> { Ok(CompressorWriter::new( inner, self.buf_sz, self.quality, self.window_sz, )) } } impl TryCompose> for BrotliParams { type Error = Error; fn try_compose(self, inner: T) -> Result> { Ok(Decompressor::new(inner, self.buf_sz)) } } /// A stream which buffers writes and read such that the inner stream only sees reads and writes /// of sector length buffers. struct SectoredBuf { inner: T, buf: Vec, /// The offset into the inner stream which the zero offset byte in `buf` corresponds to. buf_start: usize, /// Indicates if the contents of `buf` have been written to, and so whether `buf` needs to be /// written back to `inner` before it is refilled. dirty: bool, /// The total number of bytes that have been written to the inner stream, including the reserved /// bytes at the beginning. len: usize, /// The current position of this stream, expressed as an offset into the inner stream. pos: usize, } impl SectoredBuf<()> { fn new() -> SectoredBuf<()> { SectoredBuf { inner: (), buf: Vec::new(), buf_start: 0, dirty: false, len: 0, pos: 0, } } } impl SectoredBuf { /// The number of bytes at the beginning of the inner stream which are reserved to store the /// length of data written. All offsets into the stored data must be shifted by this amount to /// be translated to an offset in the inner stream. const RESERVED: usize = std::mem::size_of::(); /// Returns the position in the inner stream which the given position in this stream corresponds /// to. fn inner_pos(self_pos: u64) -> u64 { let offset: u64 = Self::RESERVED.try_into().unwrap(); self_pos + offset } /// Returns the position in this stream which the given position in the inner stream corresponds /// to. fn self_pos(inner_pos: u64) -> u64 { let offset: u64 = Self::RESERVED.try_into().unwrap(); inner_pos - offset } /// Returns the offset into the internal buffer that corresponds to the current position. fn buf_pos(&self) -> usize { self.pos - self.buf_start } /// Returns one more than the last index in the internal buffer which can be read. fn buf_end(&self) -> usize { let limit = self.len.min(self.buf_start + self.sector_sz()); limit - self.buf_start } /// Returns the index of the sector which is currently loaded into the buffer. fn buf_sector_index(&self) -> usize { self.pos / self.sector_sz() } } impl SectoredBuf { /// Fills the internal buffer by reading from the inner stream at the current position /// and updates `self.buf_start` with the position read from. fn fill_internal_buf(&mut self) -> Result { self.buf_start = self.inner.stream_position()?.try_into().box_err()?; let read_bytes = if self.buf_start < self.len { let read_bytes = self.inner.fill_buf(&mut self.buf)?; if read_bytes < self.buf.len() { return Err(Error::IncorrectSize { expected: self.buf.len(), actual: read_bytes, }); } read_bytes } else { 0 }; Ok(read_bytes) } } impl Decompose for SectoredBuf { fn into_inner(self) -> T { self.inner } } impl TryCompose> for SectoredBuf<()> { type Error = Error; fn try_compose(self, inner: T) -> Result> { let sect_sz = inner.sector_sz(); if sect_sz < Self::RESERVED { return Err(Error::custom(format!( "a sector size of at least {} is required. Got {}", Self::RESERVED, sect_sz, ))); } let mut sectored = SectoredBuf { inner, buf: self.buf, buf_start: 0, dirty: false, len: Self::RESERVED, pos: Self::RESERVED, }; sectored.inner.seek(SeekFrom::Start(0))?; sectored.buf.resize(sect_sz, 0); let len_stored = match sectored.fill_internal_buf() { Ok(bytes_read) => bytes_read >= Self::RESERVED, Err(Error::IncorrectSize { actual, expected }) => { if actual > 0 { return Err(Error::IncorrectSize { expected, actual }); } // When the actual size was 0 that just means the inner stream was empty, which is // not an error. false } Err(err) => return Err(err), }; if len_stored { if let Ok(len) = read_from::(&mut sectored.buf.as_slice()) { sectored.len = len.try_into()?; } } else { write_to(&Self::RESERVED, &mut sectored.buf.as_mut_slice())?; sectored.dirty = true; } Ok(sectored) } } impl Sectored for SectoredBuf { fn sector_sz(&self) -> usize { self.buf.len() } } impl Write for SectoredBuf { fn write(&mut self, mut src: &[u8]) -> io::Result { let src_len_start = src.len(); let mut dest = { let buf_pos = self.buf_pos(); &mut self.buf[buf_pos..] }; while !src.is_empty() { if dest.is_empty() { if let Err(err) = self.flush() { error!("A call to SectoredBuf::flush returned an error: {}", err); break; } dest = &mut self.buf[..]; } let sz = src.len().min(dest.len()); (&mut dest[..sz]).copy_from_slice(&src[..sz]); dest = &mut dest[sz..]; src = &src[sz..]; self.dirty = sz > 0; self.pos += sz; } Ok(src_len_start - src.len()) } fn flush(&mut self) -> io::Result<()> { if !self.dirty { return Ok(()); } // Write out the contents of the buffer. let sect_sz: u64 = self.sector_sz().try_into().box_err()?; let inner_pos = self.inner.stream_position()?; let inner_pos_usize: usize = inner_pos.try_into().box_err()?; let is_new_sector = self.pos > inner_pos_usize; let is_full = (self.buf.len() - self.buf_pos()) == 0; let seek_to = if is_new_sector { if is_full { inner_pos + sect_sz } else { inner_pos } } else { // The contents of the buffer were previously read from inner, so we write the // updated contents to the same offset. let sect_start: u64 = self.buf_start.try_into().box_err()?; self.inner.seek(SeekFrom::Start(sect_start))?; if is_full { inner_pos } else { inner_pos - sect_sz } }; self.inner.write_all(&self.buf)?; // Update the stored length. self.len = self.len.max(self.pos); self.inner.seek(SeekFrom::Start(0))?; self.fill_internal_buf()?; let len: u64 = self.len.try_into().box_err()?; write_to(&len, &mut self.buf.as_mut_slice()).box_err()?; self.inner.seek(SeekFrom::Start(0))?; self.inner.write_all(&self.buf)?; self.inner.flush()?; // Seek to the next position. self.inner.seek(SeekFrom::Start(seek_to))?; self.fill_internal_buf()?; self.dirty = false; Ok(()) } } impl Read for SectoredBuf { fn read(&mut self, mut dest: &mut [u8]) -> io::Result { if self.pos == self.len { return Ok(0); } let dest_len_start = dest.len(); let mut src = { let start = self.buf_pos(); let end = self.buf_end(); &self.buf[start..end] }; while !dest.is_empty() { if src.is_empty() { if self.pos >= self.len { break; } let byte_ct = match self.fill_internal_buf() { Ok(byte_ct) => byte_ct, Err(err) => { warn!("SectoredBuf::full_internal_buf returned an error: {}", err); break; } }; if 0 == byte_ct { break; } src = &self.buf[..byte_ct]; } let sz = src.len().min(dest.len()); (&mut dest[..sz]).copy_from_slice(&src[..sz]); dest = &mut dest[sz..]; src = &src[sz..]; self.pos += sz; } Ok(dest_len_start - dest.len()) } } impl Seek for SectoredBuf { fn seek(&mut self, pos: SeekFrom) -> io::Result { let inner_pos = self.inner.stream_position()?; let inner_pos_new = match pos { SeekFrom::Start(rel_start) => Self::inner_pos(rel_start), SeekFrom::Current(rel_curr) => { if rel_curr > 0 { inner_pos + rel_curr as u64 } else { inner_pos - rel_curr as u64 } } SeekFrom::End(_) => { return Err(io::Error::new( io::ErrorKind::Unsupported, "seeking relative to the end of the stream is not supported", )) } }; let sect_sz = self.sector_sz(); let sect_index = self.buf_sector_index(); let sect_index_new = TryInto::::try_into(inner_pos_new).box_err()? / sect_sz; let pos: u64 = self.pos.try_into().box_err()?; if sect_index != sect_index_new || pos == inner_pos { self.flush()?; let seek_to: u64 = (sect_index_new * sect_sz).try_into().box_err()?; self.inner.seek(SeekFrom::Start(seek_to))?; self.fill_internal_buf()?; } self.pos = inner_pos_new.try_into().box_err()?; Ok(Self::self_pos(inner_pos_new)) } } impl MetaAccess for SectoredBuf { fn block_key(&self) -> Result { self.inner.block_key() } fn add_readcap_for(&mut self, owner: Principal, key: &dyn Encrypter) -> Result<()> { self.inner.add_readcap_for(owner, key) } fn integrity(&self) -> Option<&[u8]> { self.inner.integrity() } fn set_path(&mut self, path: Path) { self.inner.set_path(path) } } impl Block for SectoredBuf {} /// An envelopment of a key, which is tagged with the principal who the key is meant for. #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] struct Readcap { /// The principal this `Readcap` was issued to. issued_to: Principal, /// An encipherment of a block key using the public key of the principal. key: Ciphertext, } impl Readcap { fn new(issued_to: Hash, key: Ciphertext) -> Readcap { Readcap { issued_to: Principal(issued_to), key, } } } /// Verifies that a principal is authorized to write blocks in a tree. #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] struct Writecap { /// The principal this `Writecap` was issued to. issued_to: Principal, /// The path where this write caps's validity begins. path: Path, /// The point in time after which this write cap is no longer valid. expires: Epoch, /// The public key used to sign this write cap. signing_key: AsymKeyPub, /// A digital signature which covers all of the fields in the write cap except for next. signature: Signature, /// The next write cap in the chain leading back to the root. next: Option>, } /// Fragments are created from blocks using Erasure Encoding and stored with other nodes in the /// network to provide availability and redundancy of data. #[derive(Debug, PartialEq, Serialize, Deserialize)] struct Fragment { /// The path to the block this fragment is from. path: Path, /// The serial number of this fragment. serial: FragmentSerial, /// The actual data. body: Vec, } impl Fragment { /// Create a new fragment with the given fields. If `path_str` cannot be parsed then a failed /// `Result` is returned containing a `PathError`. fn new( path_str: &str, serial_num: u32, body: Vec, ) -> std::result::Result { let result = Path::try_from(path_str); Ok(Fragment { path: result?, serial: FragmentSerial(serial_num), body, }) } } /// The body of every non-leaf node in a tree contains this data structure. #[derive(Debug, PartialEq, Serialize, Deserialize)] struct Directory { /// The nodes that are attached to the tree at this block. nodes: Vec, /// This block's descendants. children: HashMap>, } /// Keeps track of which principal is storing a fragment. #[derive(Debug, PartialEq, Serialize, Deserialize)] struct FragmentRecord { /// The fragment serial number this record is for. serial: FragmentSerial, /// The principal who is storing this fragment. stored_by: Principal, } impl FragmentRecord { /// Creates a new `FragmentRecord` whose `serial` and `stored_by` fields are set to /// the given values. fn new(serial: u32, stored_by: Hash) -> FragmentRecord { FragmentRecord { serial: FragmentSerial(serial), stored_by: Principal(stored_by), } } } /// An identifier for a security principal, which is any entity that can be authenticated. #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Hashable, Clone, Default)] pub struct Principal(Hash); impl Principal { fn kind(&self) -> HashKind { HashKind::from(&self.0) } } /// Trait for types which are owned by a `Principal`. trait Principaled { /// Returns the `Principal` that owns `self`, using the given hash algorithm. fn principal_of_kind(&self, kind: HashKind) -> Principal; /// Returns the `Principal` that owns `self`, using the default hash algorithm. fn principal(&self) -> Principal { self.principal_of_kind(HashKind::default()) } } /// An identifier for a block in a tree. #[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)] pub struct Path { root: Principal, components: Vec, } impl Path { /// The character that is used to separate path components. const SEP: char = '/'; /// The limit, in bytes, of a `Path`'s length. const BYTE_LIMIT: usize = 4096; /// Returns a result which, when successful, contains the index after the last character in the /// current path component. fn component_end>( start: usize, first: char, pairs: &mut I, ) -> std::result::Result { if first == Path::SEP { return Err(PathError::EmptyComponent); } let end; let mut last = start; loop { match pairs.next() { Some((index, Path::SEP)) => { end = index; break; } Some((index, _)) => last = index, None => { end = last + 1; break; } } } if end == start { Err(PathError::EmptyComponent) } else { Ok(end) } } /// Asserts that the number of bytes in the given string is no more than `Path::BYTE_LIMIT`. fn assert_not_too_long(string: &str) -> std::result::Result<(), PathError> { let len = string.len(); if len > Path::BYTE_LIMIT { return Err(PathError::PathTooLong(len)); } Ok(()) } /// Returns true if `other` is a subpath of this `Path`. fn contains(&self, other: &Path) -> bool { if self.root != other.root { return false; }; // This path must be no longer than the other path. if self.components.len() > other.components.len() { return false; } // Skip the component containing the owner. let self_iter = self.components.iter().skip(1); let other_iter = other.components.iter().skip(1); for pair in self_iter.zip(other_iter) { if pair.0 != pair.1 { return false; } } true } } impl<'s> TryFrom<&'s str> for Path { type Error = PathError; fn try_from(string: &'s str) -> std::result::Result { Path::assert_not_too_long(string)?; let mut pairs = string.char_indices(); let mut components = Vec::new(); let mut last_end = 0; while let Some((start, c)) = pairs.next() { let end = Path::component_end(start, c, &mut pairs)?; last_end = end; let slice = &string[start..end]; components.push(slice.to_string()); } // An empty component is added to the end to indicate if there was a trailing slash. if string.len() - 1 == last_end { components.push("".to_string()); } let leading = components .get(0) .ok_or(PathError::InvalidLeadingComponent)?; let hash = Hash::try_from(leading.as_str()).map_err(|_| PathError::InvalidLeadingComponent)?; Ok(Path { root: Principal(hash), components, }) } } impl Display for Path { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { if self.components.is_empty() { return write!(f, ""); }; let mut iter = self.components.iter(); let first = iter.next().unwrap(); let mut output = write!(f, "{}", first); for component in iter { output = write!(f, "{}{}", Path::SEP, component) } output } } /// Errors which can occur when converting a string to a `Path`. #[derive(Debug, PartialEq)] pub enum PathError { /// Occurs when the number of bytes in a string is greater than `Path::BYTE_LIMIT`. PathTooLong(usize), /// Indicates that a path string was empty. Empty, /// Occurs when a component in a path string was empty. EmptyComponent, /// Occurs when the leading component of a path is not in the correct format. InvalidLeadingComponent, } impl Display for PathError { fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result { match self { PathError::PathTooLong(length) => formatter.write_fmt(format_args!( "path contained {} bytes, which is over the {} byte limit", length, Path::BYTE_LIMIT )), PathError::Empty => formatter.write_str("path was empty"), PathError::EmptyComponent => formatter.write_str("component of path was empty"), PathError::InvalidLeadingComponent => { formatter.write_str("invalid leading path component") } } } } /// An instant in time represented by the number of seconds since January 1st 1970, 00:00:00 UTC. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Default)] struct Epoch(u64); impl Epoch { /// Returns the current epoch time. fn now() -> Epoch { let now = SystemTime::now(); // If the system clock is before the unix epoch, just panic. let epoch = now.duration_since(SystemTime::UNIX_EPOCH).unwrap(); Epoch(epoch.as_secs()) } } impl Copy for Epoch {} impl Add for Epoch { type Output = Self; fn add(self, other: Duration) -> Self { Epoch(self.0 + other.as_secs()) } } impl Sub for Epoch { type Output = Self; fn sub(self, other: Duration) -> Self { Epoch(self.0 - other.as_secs()) } } /// The serial number of a block fragment. #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Hashable)] struct FragmentSerial(u32); #[cfg(test)] mod tests { use std::{fs::OpenOptions, io::Cursor}; use crate::crypto::{tpm::TpmCredStore, CredStore, CredsPriv}; use super::*; use tempdir::TempDir; use test_helpers::*; fn path_from_str_test_case( expected: std::result::Result, input: &str, ) -> std::result::Result<(), PathError> { let result = Path::try_from(input); assert_eq!(expected, result); Ok(()) } #[test] fn path_from_str_multiple_components_ok() -> std::result::Result<(), PathError> { let expected = make_path(vec!["red", "green", "blue"]); let input = format!("{}/red/green/blue", expected.root.0); path_from_str_test_case(Ok(expected), input.as_str())?; Ok(()) } #[test] fn path_from_str_one_component_ok() -> std::result::Result<(), PathError> { let expected = make_path(vec![]); let input = expected.root.0.to_string(); path_from_str_test_case(Ok(expected), input.as_str())?; Ok(()) } #[test] fn path_from_str_trailing_slash_ok() -> std::result::Result<(), PathError> { // Notice the empty component at the end of this path due to the trailing slash. let expected = make_path(vec!["orange", "banana", "shotgun", ""]); let input = format!("{}/orange/banana/shotgun/", expected.root.0); path_from_str_test_case(Ok(expected), input.as_str())?; Ok(()) } #[test] fn path_from_str_path_too_long_fail() -> std::result::Result<(), PathError> { let principal = make_principal(); let input = format!("{}/{}", principal.0, "*".repeat(4097)); let expected = Err(PathError::PathTooLong(input.len())); path_from_str_test_case(expected, input.as_str())?; Ok(()) } #[test] fn path_from_str_multiple_slashes_fail() -> std::result::Result<(), PathError> { let expected = Err(PathError::EmptyComponent); let input = format!("{}//orange", make_principal().0); path_from_str_test_case(expected, input.as_str())?; Ok(()) } #[test] fn path_from_str_leading_slash_fail() -> std::result::Result<(), PathError> { let expected = Err(PathError::EmptyComponent); let input = format!("/{}/orange/banana/shotgun", make_principal().0); path_from_str_test_case(expected, input.as_str())?; Ok(()) } #[test] fn path_round_trip() -> std::result::Result<(), PathError> { let expected = make_path(vec!["interstitial", "inter-related", "intersections"]); let actual = Path::try_from(expected.to_string().as_str())?; assert_eq!(expected, actual); Ok(()) } #[test] fn path_contains_true() { let larger = make_path(vec!["apps"]); let smaller = make_path(vec!["apps", "bohdi"]); assert!(larger.contains(&smaller)); } #[test] fn path_contains_true_only_owner() { let larger = make_path(vec![]); let smaller = make_path(vec![]); assert!(larger.contains(&smaller)); } #[test] fn path_contains_false_self_is_longer() { let first = make_path(vec!["apps", "bohdi"]); let second = make_path(vec!["apps"]); assert!(!first.contains(&second)); } #[test] fn path_contains_false_same_owners() { let first = make_path(vec!["apps"]); let second = make_path(vec!["nodes"]); assert!(!first.contains(&second)); } #[test] fn path_contains_false_different_owners() { let first = make_path(vec!["apps"]); let mut second = make_path(vec!["apps"]); second.root = Principal(Hash::Sha2_256(PRINCIPAL2)); assert!(!first.contains(&second)); } #[test] fn brotli_compress_decompress() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let params = BrotliParams::new(SECT_SZ, 8, 20); let mut memory = Cursor::new([0u8; SECT_SZ * SECT_CT]); { let write: CompressorWriter<_> = params .clone() .try_compose(&mut memory) .expect("compose for write failed"); write_fill(write, SECT_SZ, SECT_CT); } memory.seek(SeekFrom::Start(0)).expect("seek failed"); { let read: Decompressor<_> = params .try_compose(&mut memory) .expect("compose for read failed"); read_check(read, SECT_SZ, SECT_CT); } } fn make_sectored_buf(sect_sz: usize, sect_ct: usize) -> SectoredBuf>> { SectoredBuf::new() .try_compose(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz)) .expect("compose for sectored buffer failed") } #[test] fn sectored_buf_fill_inner() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); let sect_sz = sectored.sector_sz(); assert_eq!(0, sect_sz % 16); let chunk_sz = sect_sz / 16; let chunk_ct = SECT_CT * 16; write_fill(&mut sectored, chunk_sz, chunk_ct); } #[test] fn sectored_buf_write_read_sequential() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); let sect_sz = sectored.sector_sz(); assert_eq!(0, sect_sz % 16); let chunk_sz = sect_sz / 16; // We subtract one here so that the underlying buffer is not completely filled. This // exercises the length limiting capability of the sectored buffer. let chunk_ct = SECT_CT * 16 - 1; write_fill(&mut sectored, chunk_sz, chunk_ct); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); read_check(&mut sectored, chunk_sz, chunk_ct); } #[test] fn sectored_buf_sect_sz_too_small_is_error() { const MIN: usize = SectoredBuf::<()>::RESERVED; let result = SectoredBuf::new().try_compose(SectoredCursor::new([0u8; MIN], MIN - 1)); assert!(result.is_err()); } #[test] fn sectored_buf_len_preserved() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); let expected = vec![42u8; 12]; // We need to ensure that writing expected will not fill up the buffer in sectored. assert!(expected.len() < sectored.sector_sz() - SectoredBuf::<()>::RESERVED); sectored.write_all(&expected).expect("write failed"); sectored.flush().expect("flush failed"); let inner = sectored.into_inner(); let mut sectored = SectoredBuf::new() .try_compose(inner) .expect("failed to compose sectored buffer"); let mut actual = vec![0u8; expected.len()]; sectored .fill_buf(actual.as_mut_slice()) .expect("failed to fill actual"); assert_eq!(expected, actual); } #[test] fn sectored_buf_seek() { let sect_sz = 16usize; let sect_ct = 16usize; let cap = sect_sz * sect_ct - std::mem::size_of::(); let source = { let mut source = Vec::with_capacity(cap); source.extend( std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None }) .take(cap), ); source }; let mut sectored = make_sectored_buf(sect_sz, sect_ct); sectored.write(&source).expect("write failed"); let mut buf = [0u8; 1]; let end = cap.try_into().expect("cap cannot fit into a u8"); for pos in (0..end).rev() { sectored .seek(SeekFrom::Start(pos as u64)) .expect("seek failed"); sectored.read(&mut buf).expect("read failed"); assert_eq!(pos, buf[0]); } } #[test] fn sectored_buf_write_read_random() { const SECT_SZ: usize = 16; const SECT_CT: usize = 16; const CAP: usize = SECT_SZ * SECT_CT - std::mem::size_of::(); let source = { let mut expected = Vec::with_capacity(CAP); expected.extend( std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None }) .take(CAP), ); expected }; let indices: Vec<(usize, usize)> = { let rando = Randomizer::new([3u8; Randomizer::HASH.len()]); let rando2 = Randomizer::new([5u8; Randomizer::HASH.len()]); rando .zip(rando2) .take(SECT_CT) .map(|(mut first, mut second)| { first %= source.len(); second &= source.len(); let low = first.min(second); let high = first.max(second); (low, high) }) .collect() }; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); sectored .write_all(&[0u8; CAP]) .expect("failed to fill sectored"); sectored.flush().expect("flush failed"); for (_k, (low, high)) in indices.iter().enumerate() { sectored .seek(SeekFrom::Start(*low as u64)) .expect("seek failed"); let src = &source[*low..*high]; sectored.write(src).expect("write failed"); } sectored.flush().expect("flush failed"); let mut buf = vec![0u8; CAP]; for (_k, (low, high)) in indices.iter().enumerate() { sectored .seek(SeekFrom::Start(*low as u64)) .expect("seek failed"); let actual = &mut buf[*low..*high]; sectored.fill_buf(actual).expect("read failed"); let expected = &source[*low..*high]; assert_eq!(expected, actual); } } #[test] fn sectored_buf_read_past_end() { const LEN: usize = 32; let mut sectored = SectoredBuf::new() .try_compose(SectoredCursor::new([0u8; LEN], LEN)) .expect("compose failed"); const BUF_LEN: usize = LEN - SectoredBuf::<()>::RESERVED + 1; sectored.write(&[1u8; BUF_LEN - 1]).expect("write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut buf = [0u8; BUF_LEN]; // Note that buf is one byte longer than the available capacity in the cursor. sectored.read(&mut buf).expect("read failed"); assert_eq!(&[1u8; BUF_LEN - 1], &buf[..(BUF_LEN - 1)]); assert_eq!(0u8, buf[BUF_LEN - 1]); } /// Tests that the data written in try_compose is actually written back to the underlying stream. #[test] fn sectored_buf_write_back() { let mut sectored = SectoredBuf::new() .try_compose(SectoredCursor::new(vec![0u8; 24], 16)) .expect("compose failed"); let expected = [1u8; 8]; sectored.write(&expected).expect("first write failed"); sectored.write(&[2u8; 8]).expect("second write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut actual = [0u8; 8]; sectored.read(&mut actual).expect("read failed"); assert_eq!(expected, actual); } #[test] fn sectored_buf_write_past_end() { const LEN: usize = 8; let mut sectored = SectoredBuf::new() .try_compose(SectoredCursor::new(vec![0u8; 0], LEN)) .expect("compos failed"); let expected = [1u8; LEN + 1]; sectored.write(&expected).expect("write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut actual = [0u8; LEN + 1]; sectored.read(&mut actual).expect("read failed"); assert_eq!(expected, actual); } /// Tests that a new `Trailered` can be created from an empty stream. #[test] fn trailered_new_empty() { let cursor = Cursor::new(Vec::new()); let (_, trailer): (_, Option) = Trailered::new(cursor).expect("Trailered::new failed"); assert_eq!(None, trailer); } /// Tests that an error is returned when an attempt is made to create a `Trailered` from a /// non-empty stream which is too short. #[test] fn trailered_new_inner_too_short_is_error() { let cursor = Cursor::new([0u8; 5]); let result = Trailered::<_, u128>::new(cursor); assert!(result.is_err()) } /// Checks that the trailer is persisted to the inner stream. #[test] fn trailered_trailer_persisted() { const EXPECTED: &str = "Everyone deserves to be remembered,"; let cursor = { let cursor = Cursor::new(Vec::new()); let (mut trailered, trailer) = Trailered::<_, String>::new(cursor).expect("Trailered::new failed"); assert!(trailer.is_none()); trailered .flush(&EXPECTED.to_string()) .expect("flush failed"); trailered.inner }; let (_, trailer) = Trailered::<_, String>::new(cursor).expect("Trailered::new failed"); assert_eq!(EXPECTED, trailer.unwrap()); } #[test] fn trailered_written_data_persisted() { const EXPECTED: &[u8] = b"and every life has something to teach us."; let mut cursor = { let (mut trailered, _) = Trailered::<_, u8>::new(Cursor::new(Vec::new())) .expect("failed to create first trailered"); trailered.write(EXPECTED).expect("write failed"); trailered.flush(&1).expect("flush failed"); trailered.inner }; cursor.seek(SeekFrom::Start(0)).expect("seek failed"); let (mut trailered, _) = Trailered::<_, u8>::new(cursor).expect("failed to created second trailered"); let mut actual = vec![0u8; EXPECTED.len()]; trailered.read(&mut actual).expect("read failed"); assert_eq!(EXPECTED, actual); } fn trailered_for_seek_test() -> Trailered { let (mut trailered, _) = Trailered::new(Cursor::new(Vec::new())).expect("failed to create trailered"); trailered .write(&[0, 1, 2, 3, 4, 5, 6, 7]) .expect("write failed"); trailered.seek(SeekFrom::Start(0)).expect("seek failed"); trailered } #[test] fn trailered_seek_from_start() { const EXPECTED: u8 = 2; let mut trailered = trailered_for_seek_test(); trailered .seek(SeekFrom::Start(EXPECTED as u64)) .expect("seek failed"); let mut actual = [0u8; 1]; trailered.read(&mut actual).expect("read failed"); assert_eq!(EXPECTED, actual[0]); } #[test] fn trailered_seek_from_curr() { const EXPECTED: u8 = 5; let mut trailered = trailered_for_seek_test(); trailered .seek(SeekFrom::Start(6)) .expect("seek from start failed"); trailered .seek(SeekFrom::Current(-1)) .expect("seek from current failed"); let mut actual = [0u8; 1]; trailered.read(&mut actual).expect("read failed"); assert_eq!(EXPECTED, actual[0]); } #[test] fn trailered_seek_from_end() { const EXPECTED: u8 = 7; let mut trailered = trailered_for_seek_test(); trailered.seek(SeekFrom::End(-1)).expect("seek failed"); let mut actual = [0u8; 1]; trailered.read(&mut actual).expect("read failed"); assert_eq!(EXPECTED, actual[0]); } /// Tests that a read past the end of the body in a `Trailered` is not allowed. #[test] fn trailered_read_limited_to_body_len() { let (mut trailered, trailer) = Trailered::new(Cursor::new(Vec::new())).expect("failed to create Trailered"); assert!(trailer.is_none()); const EXPECTED: &[u8] = &[1, 1, 1, 1, 1, 0, 0, 0]; trailered.write(&[1u8; 5]).expect("write failed"); trailered.flush(&1u8).expect("flush failed"); trailered.seek(SeekFrom::Start(0)).expect("seek failed"); let mut actual = vec![0u8; EXPECTED.len()]; // If read goes past the end of the body then there will be a 1 in the sixth position of // actual. trailered.read(&mut actual).expect("read failed"); assert_eq!(EXPECTED, actual); } #[test] fn block_can_create_empty() { let harness = SwtpmHarness::new().expect("failed to start swtpm"); let context = harness.context().expect("failed to retrieve context"); let cred_store = TpmCredStore::new(context, harness.state_path()) .expect("failed to create TpmCredStore"); let creds = cred_store.node_creds().expect("failed to get node creds"); BlockOpenOptions::new() .with_inner(Cursor::new(Vec::::new())) .with_creds(creds) .with_encrypt(true) .open() .expect("failed to open block"); } #[test] fn block_contents_persisted() { const EXPECTED: &[u8] = b"Silly sordid sulking sultans."; let temp_dir = TempDir::new("btlib").expect("failed to create temp dir"); let file_path = temp_dir.path().join("test.blk").to_owned(); let harness = SwtpmHarness::new().expect("failed to start swtpm"); let context = harness.context().expect("failed to retrieve context"); let cred_store = TpmCredStore::new(context, harness.state_path()) .expect("failed to create TpmCredStore"); let root_creds = cred_store .gen_root_creds("(1337Prestidigitation7331)") .expect("failed to get root creds"); let mut node_creds = cred_store.node_creds().expect("failed to get node creds"); let writecap = root_creds .issue_writecap( node_creds.principal(), vec!["nodes".to_string(), "phone".to_string()], Epoch::now() + Duration::from_secs(3600), ) .expect("failed to issue writecap"); let path = writecap.path.clone(); node_creds.set_writecap(writecap); { let file = OpenOptions::new() .create_new(true) .write(true) .read(true) .open(&file_path) .expect("failed to open file"); let mut block = BlockOpenOptions::new() .with_inner(file) .with_creds(node_creds.clone()) .with_encrypt(true) .open() .expect("failed to open block"); block.set_path(path); block.write(EXPECTED).expect("failed to write"); block.flush().expect("flush failed"); } let file = OpenOptions::new() .read(true) .open(&file_path) .expect("failed to reopen file"); let mut block = BlockOpenOptions::new() .with_inner(file) .with_creds(node_creds) .with_encrypt(true) .open() .expect("failed to reopen block"); let mut actual = [0u8; EXPECTED.len()]; block.read(&mut actual).expect("read failed"); assert_eq!(EXPECTED, actual); } }