|
@@ -18,10 +18,13 @@ extern crate lazy_static;
|
|
|
use brotli::{CompressorWriter, Decompressor};
|
|
|
use btserde::{self, read_from, write_to};
|
|
|
mod crypto;
|
|
|
-use crypto::{AsymKeyPub, Ciphertext, Hash, HashKind, Sign, Signature, Signer, SymKey};
|
|
|
+use crypto::{
|
|
|
+ AsymKeyPub, Ciphertext, CredsPriv, Decrypter, Encrypter, EncrypterExt, Hash, HashKind,
|
|
|
+ MerkleStream, SecretStream, Sign, Signature, Signer, SymKey,
|
|
|
+};
|
|
|
|
|
|
use log::error;
|
|
|
-use serde::{Deserialize, Serialize};
|
|
|
+use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
|
|
use serde_big_array::BigArray;
|
|
|
use std::{
|
|
|
collections::HashMap,
|
|
@@ -30,13 +33,15 @@ use std::{
|
|
|
fs::{File, OpenOptions},
|
|
|
hash::Hash as Hashable,
|
|
|
io::{self, Read, Seek, SeekFrom, Write},
|
|
|
+ marker::PhantomData,
|
|
|
ops::{Add, Sub},
|
|
|
- sync::{Arc, PoisonError, RwLock},
|
|
|
+ sync::PoisonError,
|
|
|
time::{Duration, SystemTime},
|
|
|
};
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
enum Error {
|
|
|
+ MissingWritecap,
|
|
|
Io(std::io::Error),
|
|
|
Serde(btserde::Error),
|
|
|
Crypto(crypto::Error),
|
|
@@ -53,6 +58,7 @@ impl Error {
|
|
|
impl Display for Error {
|
|
|
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
|
|
match self {
|
|
|
+ Error::MissingWritecap => write!(f, "missing writecap"),
|
|
|
Error::Io(err) => err.fmt(f),
|
|
|
Error::Serde(err) => err.fmt(f),
|
|
|
Error::Crypto(err) => err.fmt(f),
|
|
@@ -126,16 +132,10 @@ impl<T, E: Display> StrInIoErr<T> for std::result::Result<T, E> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/// A Block tagged with its version number. When a block of a previous version is received over
|
|
|
-/// the network or read from the filesystem, it is upgraded to the current version before being
|
|
|
-/// processed.
|
|
|
-#[derive(Debug)]
|
|
|
-enum VersionedBlock<T, C> {
|
|
|
- V0(Block<T, C>),
|
|
|
-}
|
|
|
-
|
|
|
const SECTOR_SZ_DEFAULT: usize = 4096;
|
|
|
|
|
|
+trait Block: Read + Write + Seek + HeaderAccess {}
|
|
|
+
|
|
|
// A trait for streams which only allow reads and writes in fixed sized units called sectors.
|
|
|
trait Sectored {
|
|
|
// Returns the size of the sector for this stream.
|
|
@@ -151,97 +151,141 @@ trait Sectored {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/// A version of the `Write` trait, which allows integrity information to be supplied when writing.
|
|
|
-trait IntegrityWrite {
|
|
|
- fn integrity_write(&mut self, buf: &[u8], integrity: &[u8]) -> io::Result<usize>;
|
|
|
+/// A version of the `Write` trait, which allows integrity information to be supplied when flushing.
|
|
|
+trait WriteInteg: Write {
|
|
|
+ fn flush_integ(&mut self, integrity: &[u8]) -> io::Result<()>;
|
|
|
+}
|
|
|
+
|
|
|
+trait Decompose<T> {
|
|
|
+ fn into_inner(self) -> T;
|
|
|
+}
|
|
|
+
|
|
|
+trait TryCompose<T, U: Decompose<T>> {
|
|
|
+ type Error;
|
|
|
+ fn try_compose(self, inner: T) -> std::result::Result<U, Self::Error>;
|
|
|
+}
|
|
|
+
|
|
|
+trait Compose<T, U> {
|
|
|
+ fn compose(self, inner: T) -> U;
|
|
|
+}
|
|
|
+
|
|
|
+impl<T, U: Decompose<T>, S: TryCompose<T, U, Error = Infallible>> Compose<T, U> for S {
|
|
|
+ fn compose(self, inner: T) -> U {
|
|
|
+ let result = self.try_compose(inner);
|
|
|
+ // Safety: Infallible has no values, so `result` must be `Ok`.
|
|
|
+ unsafe { result.unwrap_unchecked() }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+trait HeaderAccess {
|
|
|
+ fn block_key(&self) -> Result<SymKey>;
|
|
|
+ fn add_readcap_for(&mut self, owner: Principal, key: &dyn Encrypter) -> Result<()>;
|
|
|
+}
|
|
|
+
|
|
|
+/// Extensions to the `Read` trait.
|
|
|
+trait ReadExt: Read {
|
|
|
+ /// Reads repeatedly until one of the following occur:
|
|
|
+ /// 1. The given buffer is full.
|
|
|
+ /// 2. A call to `read` returns 0.
|
|
|
+ /// 3. A call to `read` returns an error.
|
|
|
+ /// The number of bytes read is returned. If an error is returned, then no bytes were read.
|
|
|
+ fn fill_buf(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
|
|
|
+ let dest_len_start = dest.len();
|
|
|
+ while !dest.is_empty() {
|
|
|
+ let byte_ct = match self.read(dest) {
|
|
|
+ Ok(byte_ct) => byte_ct,
|
|
|
+ Err(err) => {
|
|
|
+ if dest_len_start == dest.len() {
|
|
|
+ return Err(err);
|
|
|
+ } else {
|
|
|
+ // We're not allowed to return an error if we've already read from self.
|
|
|
+ error!("an error occurred in fill_buf: {}", err);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ if 0 == byte_ct {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ dest = &mut dest[byte_ct..];
|
|
|
+ }
|
|
|
+ Ok(dest_len_start - dest.len())
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
|
|
|
+impl<T: Read> ReadExt for T {}
|
|
|
+
|
|
|
+#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Default)]
|
|
|
pub struct Header {
|
|
|
path: Path,
|
|
|
readcaps: HashMap<Principal, Ciphertext<SymKey>>,
|
|
|
- writecap: Writecap,
|
|
|
+ writecap: Option<Writecap>,
|
|
|
/// A hash which provides integrity for the contents of the block body.
|
|
|
integrity: Hash,
|
|
|
}
|
|
|
|
|
|
-#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
|
|
-struct BlockShared<C> {
|
|
|
+#[derive(Serialize, Deserialize, Default)]
|
|
|
+struct BlockTrailer {
|
|
|
header: Header,
|
|
|
sig: Signature,
|
|
|
- #[serde(skip)]
|
|
|
- creds: C,
|
|
|
}
|
|
|
|
|
|
struct BlockStream<T, C> {
|
|
|
- shared: Arc<RwLock<BlockShared<C>>>,
|
|
|
- body_len: u64,
|
|
|
+ trailered: Trailered<T, BlockTrailer>,
|
|
|
+ trailer: BlockTrailer,
|
|
|
header_buf: Vec<u8>,
|
|
|
- inner: T,
|
|
|
+ creds: C,
|
|
|
}
|
|
|
|
|
|
-impl<T, C> BlockStream<T, C> {
|
|
|
- fn new(shared: BlockShared<C>, inner: T, body_len: u64) -> BlockStream<T, C> {
|
|
|
- BlockStream {
|
|
|
- shared: Arc::new(RwLock::new(shared)),
|
|
|
- inner,
|
|
|
+impl<T: Read + Seek, C> BlockStream<T, C> {
|
|
|
+ fn new(inner: T, creds: C) -> Result<BlockStream<T, C>> {
|
|
|
+ let (trailered, trailer) = Trailered::new(inner)?;
|
|
|
+ Ok(BlockStream {
|
|
|
+ trailered,
|
|
|
+ trailer: trailer.unwrap_or_default(),
|
|
|
header_buf: Vec::new(),
|
|
|
- body_len,
|
|
|
- }
|
|
|
+ creds,
|
|
|
+ })
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<T: Seek + Write, C: std::fmt::Debug + Signer> BlockStream<T, C> {
|
|
|
- fn write_trailer(&mut self, integrity: &[u8]) -> Result<()> {
|
|
|
- let pos = self.inner.stream_position()?;
|
|
|
- self.body_len = self.body_len.max(pos);
|
|
|
- self.inner.seek(SeekFrom::Start(self.body_len))?;
|
|
|
- {
|
|
|
- let mut shared = self.shared.write()?;
|
|
|
- shared.header.integrity.as_mut().copy_from_slice(integrity);
|
|
|
- self.header_buf.clear();
|
|
|
- write_to(&shared.header, &mut self.header_buf)?;
|
|
|
- shared.sig = shared
|
|
|
- .creds
|
|
|
- .sign(std::iter::once(self.header_buf.as_slice()))?;
|
|
|
-
|
|
|
- self.inner.write_all(&self.header_buf)?;
|
|
|
- write_to(&shared.sig, &mut self.inner)?;
|
|
|
- }
|
|
|
- let end: i64 = (self.inner.stream_position()? + 8).try_into()?;
|
|
|
- let body_len: i64 = self.body_len.try_into()?;
|
|
|
- let offset = end - body_len;
|
|
|
- write_to(&offset, &mut self.inner)?;
|
|
|
- self.inner.seek(SeekFrom::Start(pos))?;
|
|
|
- Ok(())
|
|
|
+impl<C> BlockStream<File, C> {
|
|
|
+ fn open<P: AsRef<std::path::Path>>(path: P, creds: C) -> Result<BlockStream<File, C>> {
|
|
|
+ let inner = OpenOptions::new().read(true).write(true).open(path)?;
|
|
|
+ BlockStream::new(inner, creds)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<T: Write + Seek, C: std::fmt::Debug + Signer> IntegrityWrite for BlockStream<T, C> {
|
|
|
- fn integrity_write(&mut self, buf: &[u8], integrity: &[u8]) -> io::Result<usize> {
|
|
|
- let written = self.inner.write(buf)?;
|
|
|
- if written > 0 {
|
|
|
- let result = self.write_trailer(integrity);
|
|
|
- if let Err(err) = result {
|
|
|
- error!("error occurred while writing block trailer: {}", err);
|
|
|
- }
|
|
|
- }
|
|
|
- Ok(written)
|
|
|
+impl<T: Write + Seek, C: Signer> Write for BlockStream<T, C> {
|
|
|
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
|
+ self.trailered.write(buf)
|
|
|
+ }
|
|
|
+
|
|
|
+ fn flush(&mut self) -> io::Result<()> {
|
|
|
+ Err(io::Error::new(
|
|
|
+ io::ErrorKind::Unsupported,
|
|
|
+ "flush is not supported, use flush_integ instead",
|
|
|
+ ))
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<T: Read, C> Read for BlockStream<T, C> {
|
|
|
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
|
- self.inner.read(buf)
|
|
|
+impl<T: Write + Seek, C: Signer> WriteInteg for BlockStream<T, C> {
|
|
|
+ fn flush_integ(&mut self, integrity: &[u8]) -> io::Result<()> {
|
|
|
+ let header = &mut self.trailer.header;
|
|
|
+ header.integrity.as_mut().copy_from_slice(integrity);
|
|
|
+ self.header_buf.clear();
|
|
|
+ write_to(&header, &mut self.header_buf).box_err()?;
|
|
|
+ self.trailer.sig = self
|
|
|
+ .creds
|
|
|
+ .sign(std::iter::once(self.header_buf.as_slice()))?;
|
|
|
+ self.trailered.flush(&self.trailer)?;
|
|
|
+ Ok(())
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/// Adds a signed integer to an unsigned integer and returns the result.
|
|
|
-fn add_signed(unsigned: u64, signed: i64) -> u64 {
|
|
|
- if signed >= 0 {
|
|
|
- unsigned + signed as u64
|
|
|
- } else {
|
|
|
- unsigned - (-signed as u64)
|
|
|
+impl<T: Read + Seek, C> Read for BlockStream<T, C> {
|
|
|
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
|
+ self.trailered.read(buf)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -249,152 +293,214 @@ impl<T: Seek, C> Seek for BlockStream<T, C> {
|
|
|
/// Seeks to the given position in the stream. If a position beyond the end of the stream is
|
|
|
/// specified, the the seek will be to the end of the stream.
|
|
|
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
|
|
|
- let from_start = match pos {
|
|
|
- SeekFrom::Start(from_start) => from_start,
|
|
|
- SeekFrom::Current(from_curr) => add_signed(self.inner.stream_position()?, from_curr),
|
|
|
- SeekFrom::End(from_end) => add_signed(self.body_len, from_end),
|
|
|
- };
|
|
|
- self.inner
|
|
|
- .seek(SeekFrom::Start(from_start.min(self.body_len)))
|
|
|
+ self.trailered.seek(pos)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/// A container which binds together ciphertext along with the metadata needed to identify,
|
|
|
-/// verify and decrypt it.
|
|
|
-#[derive(Debug)]
|
|
|
-struct Block<T, C> {
|
|
|
- shared: Arc<RwLock<BlockShared<C>>>,
|
|
|
- body: T,
|
|
|
-}
|
|
|
-
|
|
|
-impl<T, C> Block<T, C> {
|
|
|
- fn try_compose_body<E: Into<Error>, U: Decompose<T>, V: TryCompose<T, U, Error = E>>(
|
|
|
- self,
|
|
|
- new_body: V,
|
|
|
- ) -> Result<Block<U, C>> {
|
|
|
- Ok(Block {
|
|
|
- shared: self.shared,
|
|
|
- body: new_body.try_compose(self.body).map_err(|err| err.into())?,
|
|
|
- })
|
|
|
+impl<T, C: Decrypter + Owned> HeaderAccess for BlockStream<T, C> {
|
|
|
+ fn block_key(&self) -> Result<SymKey> {
|
|
|
+ let readcap = self
|
|
|
+ .trailer
|
|
|
+ .header
|
|
|
+ .readcaps
|
|
|
+ .get(&self.creds.owner())
|
|
|
+ .ok_or(Error::Crypto(crypto::Error::NoReadCap))?;
|
|
|
+ Ok(crypto::decrypt(readcap, &self.creds)?)
|
|
|
+ }
|
|
|
+
|
|
|
+ fn add_readcap_for(&mut self, owner: Principal, key: &dyn Encrypter) -> Result<()> {
|
|
|
+ let block_key = self.block_key()?;
|
|
|
+ let readcap = key.ser_encrypt(&block_key)?;
|
|
|
+ self.trailer.header.readcaps.insert(owner, readcap);
|
|
|
+ Ok(())
|
|
|
}
|
|
|
+}
|
|
|
|
|
|
- fn compose_body<U: Decompose<T>, V: Compose<T, U>>(self, new_body: V) -> Block<U, C> {
|
|
|
- Block {
|
|
|
- shared: self.shared,
|
|
|
- body: new_body.compose(self.body),
|
|
|
+struct BlockOpenOptions<T, C> {
|
|
|
+ inner: T,
|
|
|
+ creds: C,
|
|
|
+ encrypt: bool,
|
|
|
+ compress: bool,
|
|
|
+}
|
|
|
+
|
|
|
+impl BlockOpenOptions<(), ()> {
|
|
|
+ fn new() -> BlockOpenOptions<(), ()> {
|
|
|
+ BlockOpenOptions {
|
|
|
+ inner: (),
|
|
|
+ creds: (),
|
|
|
+ encrypt: true,
|
|
|
+ compress: true,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<T: Read + Seek, C> Block<T, C> {
|
|
|
- fn with_body(body: BlockStream<T, C>) -> Block<BlockStream<T, C>, C> {
|
|
|
- Block {
|
|
|
- shared: body.shared.clone(),
|
|
|
- body,
|
|
|
+impl<T, C> BlockOpenOptions<T, C> {
|
|
|
+ fn with_inner<U>(self, inner: U) -> BlockOpenOptions<U, C> {
|
|
|
+ BlockOpenOptions {
|
|
|
+ inner,
|
|
|
+ creds: self.creds,
|
|
|
+ encrypt: self.encrypt,
|
|
|
+ compress: self.compress,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- fn new(mut inner: T, creds: C) -> Result<Block<BlockStream<T, C>, C>> {
|
|
|
- // TODO: What if the inner stream is empty?
|
|
|
- inner.seek(SeekFrom::End(-8))?;
|
|
|
- let offset: i64 = read_from(&mut inner)?;
|
|
|
- let body_len = inner.seek(SeekFrom::Current(offset))?;
|
|
|
- let header: Header = read_from(&mut inner)?;
|
|
|
- let sig: Signature = read_from(&mut inner)?;
|
|
|
- crypto::verify_header(&header, &sig)?;
|
|
|
- inner.seek(SeekFrom::Start(0))?;
|
|
|
- let shared = BlockShared { header, sig, creds };
|
|
|
- let body = BlockStream::new(shared, inner, body_len);
|
|
|
- Ok(Block::with_body(body))
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-impl<C> Block<File, C> {
|
|
|
- fn from_path<P: AsRef<std::path::Path>>(
|
|
|
- creds: C,
|
|
|
- path: P,
|
|
|
- ) -> Result<Block<BlockStream<File, C>, C>> {
|
|
|
- let inner = OpenOptions::new().read(true).write(true).open(path)?;
|
|
|
- Block::new(inner, creds)
|
|
|
+ fn with_creds<D>(self, creds: D) -> BlockOpenOptions<T, D> {
|
|
|
+ BlockOpenOptions {
|
|
|
+ inner: self.inner,
|
|
|
+ creds,
|
|
|
+ encrypt: self.encrypt,
|
|
|
+ compress: self.compress,
|
|
|
+ }
|
|
|
}
|
|
|
-}
|
|
|
|
|
|
-impl<T: Write, C> Write for Block<T, C> {
|
|
|
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
|
- self.body.write(buf)
|
|
|
+ fn with_encrypt(mut self, encrypt: bool) -> Self {
|
|
|
+ self.encrypt = encrypt;
|
|
|
+ self
|
|
|
}
|
|
|
|
|
|
- fn flush(&mut self) -> io::Result<()> {
|
|
|
- self.body.flush()
|
|
|
+ fn with_compress(mut self, compress: bool) -> Self {
|
|
|
+ self.compress = compress;
|
|
|
+ self
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<T: Read, C> Read for Block<T, C> {
|
|
|
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
|
- self.body.read(buf)
|
|
|
+impl<T: Read + Write + Seek + 'static, C: CredsPriv + Owned + 'static> BlockOpenOptions<T, C> {
|
|
|
+ fn open(self) -> Result<Box<dyn Block>> {
|
|
|
+ let stream = BlockStream::new(self.inner, self.creds)?;
|
|
|
+ let block_key = stream.block_key()?;
|
|
|
+ let stream = MerkleStream::new(stream)?;
|
|
|
+ if self.encrypt {
|
|
|
+ let stream = SecretStream::new(block_key).try_compose(stream)?;
|
|
|
+ let stream = SectoredBuf::new().try_compose(stream)?;
|
|
|
+ Ok(Box::new(stream))
|
|
|
+ } else {
|
|
|
+ let stream = SectoredBuf::new().try_compose(stream)?;
|
|
|
+ Ok(Box::new(stream))
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<T: Seek, C> Seek for Block<T, C> {
|
|
|
- fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
|
|
|
- self.body.seek(pos)
|
|
|
+/// A struct which wraps a stream and which writes a trailing data structure to when flushed.
|
|
|
+struct Trailered<T, D> {
|
|
|
+ inner: T,
|
|
|
+ body_len: u64,
|
|
|
+ phantom: PhantomData<D>,
|
|
|
+}
|
|
|
+
|
|
|
+impl<T: Read + Seek, D: DeserializeOwned> Trailered<T, D> {
|
|
|
+ fn empty(inner: T) -> Trailered<T, D> {
|
|
|
+ Trailered {
|
|
|
+ inner,
|
|
|
+ body_len: 0,
|
|
|
+ phantom: PhantomData,
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Creates a new `Trailered<T>` containing the given `T`. This method requires that the given
|
|
|
+ /// stream is either empty, or contains a valid serialization of `D` and a the offset at which
|
|
|
+ /// `D` is stored.
|
|
|
+ fn new(mut inner: T) -> Result<(Trailered<T, D>, Option<D>)> {
|
|
|
+ let pos = inner.stream_position()?;
|
|
|
+ let end = inner.seek(SeekFrom::End(0))?;
|
|
|
+ if 0 == end {
|
|
|
+ return Ok((Self::empty(inner), None));
|
|
|
+ }
|
|
|
+ inner.seek(SeekFrom::End(-8))?;
|
|
|
+ let offset: i64 = read_from(&mut inner)?;
|
|
|
+ let body_len = inner.seek(SeekFrom::End(offset))?;
|
|
|
+ let trailer: D = read_from(&mut inner)?;
|
|
|
+ inner.seek(SeekFrom::Start(pos))?;
|
|
|
+ Ok((
|
|
|
+ Trailered {
|
|
|
+ inner,
|
|
|
+ body_len,
|
|
|
+ phantom: PhantomData,
|
|
|
+ },
|
|
|
+ Some(trailer),
|
|
|
+ ))
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-trait Decompose<T> {
|
|
|
- fn into_inner(self) -> T;
|
|
|
+impl<T: Seek, D> Trailered<T, D> {
|
|
|
+ fn post_write(&mut self, written: usize) -> io::Result<usize> {
|
|
|
+ if 0 == written {
|
|
|
+ return Ok(0);
|
|
|
+ }
|
|
|
+ // I cannot return an error at this point because bytes have already been written to inner.
|
|
|
+ // So if I can't track the body len due to a failure, a panic is the only option.
|
|
|
+ let pos = self
|
|
|
+ .inner
|
|
|
+ .stream_position()
|
|
|
+ .expect("failed to get stream position");
|
|
|
+ self.body_len = self.body_len.max(pos);
|
|
|
+ Ok(written)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-trait TryCompose<T, U: Decompose<T>> {
|
|
|
- type Error;
|
|
|
- fn try_compose(self, inner: T) -> std::result::Result<U, Self::Error>;
|
|
|
+impl<T: Read + Seek, D> Read for Trailered<T, D> {
|
|
|
+ fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
|
+ let pos = self.inner.stream_position()?;
|
|
|
+ let available_u64 = self.body_len - pos;
|
|
|
+ let available: usize = available_u64.try_into().box_err()?;
|
|
|
+ let limit = buf.len().min(available);
|
|
|
+ self.inner.read(&mut buf[..limit])
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-trait Compose<T, U> {
|
|
|
- fn compose(self, inner: T) -> U;
|
|
|
+impl<T: Write + Seek, D: Serialize> Trailered<T, D> {
|
|
|
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
|
|
+ let written = self.inner.write(buf)?;
|
|
|
+ self.post_write(written)
|
|
|
+ }
|
|
|
+
|
|
|
+ fn write_trailer(&mut self, trailer: &D) -> io::Result<u64> {
|
|
|
+ let pos = self.inner.stream_position()?;
|
|
|
+ self.inner.seek(SeekFrom::Start(self.body_len))?;
|
|
|
+ write_to(trailer, &mut self.inner).box_err()?;
|
|
|
+ let offset_u64 = 8 + self.inner.stream_position()? - self.body_len;
|
|
|
+ let offset = -(offset_u64 as i64);
|
|
|
+ write_to(&offset, &mut self.inner).box_err()?;
|
|
|
+ Ok(pos)
|
|
|
+ }
|
|
|
+
|
|
|
+ fn flush(&mut self, trailer: &D) -> io::Result<()> {
|
|
|
+ let prev_pos = self.write_trailer(trailer)?;
|
|
|
+ self.inner.flush()?;
|
|
|
+ self.inner.seek(SeekFrom::Start(prev_pos))?;
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-impl<T, U: Decompose<T>, S: TryCompose<T, U, Error = Infallible>> Compose<T, U> for S {
|
|
|
- fn compose(self, inner: T) -> U {
|
|
|
- let result = self.try_compose(inner);
|
|
|
- // Safety: Infallible has no values, so `result` must be `Ok`.
|
|
|
- unsafe { result.unwrap_unchecked() }
|
|
|
+impl<T: WriteInteg + Seek, D: Serialize> Trailered<T, D> {
|
|
|
+ fn flush_integ(&mut self, trailer: &D, integrity: &[u8]) -> io::Result<()> {
|
|
|
+ let prev_pos = self.write_trailer(trailer)?;
|
|
|
+ self.inner.flush_integ(integrity)?;
|
|
|
+ self.inner.seek(SeekFrom::Start(prev_pos))?;
|
|
|
+ Ok(())
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/// Extensions to the `Read` trait.
|
|
|
-trait ReadExt: Read {
|
|
|
- /// Reads repeatedly until one of the following occur:
|
|
|
- /// 1. The given buffer is full.
|
|
|
- /// 2. A call to `read` returns 0.
|
|
|
- /// 3. A call to `read` returns an error.
|
|
|
- /// The number of bytes read is returned. If an error is returned, then no bytes were read.
|
|
|
- fn fill_buf(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
|
|
|
- let dest_len_start = dest.len();
|
|
|
- while !dest.is_empty() {
|
|
|
- let byte_ct = match self.read(dest) {
|
|
|
- Ok(byte_ct) => byte_ct,
|
|
|
- Err(err) => {
|
|
|
- if dest_len_start == dest.len() {
|
|
|
- return Err(err);
|
|
|
- } else {
|
|
|
- // We're not allowed to return an error if we've already read from self.
|
|
|
- error!("an error occurred in fill_buf: {}", err);
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- };
|
|
|
- if 0 == byte_ct {
|
|
|
- break;
|
|
|
+impl<T: Seek, D> Seek for Trailered<T, D> {
|
|
|
+ fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
|
|
|
+ /// Adds a signed integer to an unsigned integer and returns the result.
|
|
|
+ fn add_signed(unsigned: u64, signed: i64) -> u64 {
|
|
|
+ if signed >= 0 {
|
|
|
+ unsigned + signed as u64
|
|
|
+ } else {
|
|
|
+ unsigned - (-signed as u64)
|
|
|
}
|
|
|
- dest = &mut dest[byte_ct..];
|
|
|
}
|
|
|
- Ok(dest_len_start - dest.len())
|
|
|
+
|
|
|
+ let from_start = match pos {
|
|
|
+ SeekFrom::Start(from_start) => from_start,
|
|
|
+ SeekFrom::Current(from_curr) => add_signed(self.inner.stream_position()?, from_curr),
|
|
|
+ SeekFrom::End(from_end) => add_signed(self.body_len, from_end),
|
|
|
+ };
|
|
|
+ let from_start = from_start.min(self.body_len);
|
|
|
+ self.inner.seek(SeekFrom::Start(from_start))
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-impl<T: Read> ReadExt for T {}
|
|
|
-
|
|
|
impl<T: Write> Decompose<T> for CompressorWriter<T> {
|
|
|
fn into_inner(self) -> T {
|
|
|
self.into_inner()
|
|
@@ -733,6 +839,18 @@ impl<T: Seek + Read + Write> Seek for SectoredBuf<T> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+impl<T: HeaderAccess> HeaderAccess for SectoredBuf<T> {
|
|
|
+ fn block_key(&self) -> Result<SymKey> {
|
|
|
+ self.inner.block_key()
|
|
|
+ }
|
|
|
+
|
|
|
+ fn add_readcap_for(&mut self, owner: Principal, key: &dyn Encrypter) -> Result<()> {
|
|
|
+ self.inner.add_readcap_for(owner, key)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl<T: Read + Write + Seek + HeaderAccess> Block for SectoredBuf<T> {}
|
|
|
+
|
|
|
/// An envelopment of a key, which is tagged with the principal who the key is meant for.
|
|
|
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
|
|
|
struct Readcap {
|
|
@@ -827,7 +945,7 @@ impl FragmentRecord {
|
|
|
}
|
|
|
|
|
|
/// An identifier for a security principal, which is any entity that can be authenticated.
|
|
|
-#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Hashable, Clone)]
|
|
|
+#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Hashable, Clone, Default)]
|
|
|
struct Principal(Hash);
|
|
|
|
|
|
impl Principal {
|
|
@@ -848,7 +966,7 @@ trait Owned {
|
|
|
}
|
|
|
|
|
|
/// An identifier for a block in a tree.
|
|
|
-#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
|
|
|
+#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)]
|
|
|
struct Path {
|
|
|
owner: Principal,
|
|
|
components: Vec<String>,
|
|
@@ -998,7 +1116,7 @@ impl Display for PathError {
|
|
|
}
|
|
|
|
|
|
/// An instant in time represented by the number of seconds since January 1st 1970, 00:00:00 UTC.
|
|
|
-#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
|
|
+#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Default)]
|
|
|
struct Epoch(u64);
|
|
|
|
|
|
impl Epoch {
|
|
@@ -1351,4 +1469,138 @@ mod tests {
|
|
|
sectored.read(&mut actual).expect("read failed");
|
|
|
assert_eq!(expected, actual);
|
|
|
}
|
|
|
+
|
|
|
+ /// Tests that a new `Trailered<T>` can be created from an empty stream.
|
|
|
+ #[test]
|
|
|
+ fn trailered_new_empty() {
|
|
|
+ let cursor = Cursor::new(Vec::new());
|
|
|
+
|
|
|
+ let (_, trailer): (_, Option<String>) =
|
|
|
+ Trailered::new(cursor).expect("Trailered::new failed");
|
|
|
+
|
|
|
+ assert_eq!(None, trailer);
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Tests that an error is returned when an attempt is made to create a `Trailered<T>` from a
|
|
|
+ /// non-empty stream which is too short.
|
|
|
+ #[test]
|
|
|
+ fn trailered_new_inner_too_short_is_error() {
|
|
|
+ let cursor = Cursor::new([0u8; 5]);
|
|
|
+
|
|
|
+ let result = Trailered::<_, u128>::new(cursor);
|
|
|
+
|
|
|
+ assert!(result.is_err())
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Checks that the trailer is persisted to the inner stream.
|
|
|
+ #[test]
|
|
|
+ fn trailered_trailer_persisted() {
|
|
|
+ const EXPECTED: &str = "Everyone deserves to be remembered,";
|
|
|
+ let cursor = {
|
|
|
+ let cursor = Cursor::new(Vec::new());
|
|
|
+ let (mut trailered, trailer) =
|
|
|
+ Trailered::<_, String>::new(cursor).expect("Trailered::new failed");
|
|
|
+ assert!(trailer.is_none());
|
|
|
+ trailered
|
|
|
+ .flush(&EXPECTED.to_string())
|
|
|
+ .expect("flush failed");
|
|
|
+ trailered.inner
|
|
|
+ };
|
|
|
+
|
|
|
+ let (_, trailer) = Trailered::<_, String>::new(cursor).expect("Trailered::new failed");
|
|
|
+
|
|
|
+ assert_eq!(EXPECTED, trailer.unwrap());
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn trailered_written_data_persisted() {
|
|
|
+ const EXPECTED: &[u8] = b"and every life has something to teach us.";
|
|
|
+ let mut cursor = {
|
|
|
+ let (mut trailered, _) = Trailered::<_, u8>::new(Cursor::new(Vec::new()))
|
|
|
+ .expect("failed to create first trailered");
|
|
|
+ trailered.write(EXPECTED).expect("write failed");
|
|
|
+ trailered.flush(&1).expect("flush failed");
|
|
|
+ trailered.inner
|
|
|
+ };
|
|
|
+ cursor.seek(SeekFrom::Start(0)).expect("seek failed");
|
|
|
+ let (mut trailered, _) =
|
|
|
+ Trailered::<_, u8>::new(cursor).expect("failed to created second trailered");
|
|
|
+ let mut actual = vec![0u8; EXPECTED.len()];
|
|
|
+
|
|
|
+ trailered.read(&mut actual).expect("read failed");
|
|
|
+
|
|
|
+ assert_eq!(EXPECTED, actual);
|
|
|
+ }
|
|
|
+
|
|
|
+ fn trailered_for_seek_test() -> Trailered<impl Read + Seek, u8> {
|
|
|
+ let (mut trailered, _) =
|
|
|
+ Trailered::new(Cursor::new(Vec::new())).expect("failed to create trailered");
|
|
|
+ trailered
|
|
|
+ .write(&[0, 1, 2, 3, 4, 5, 6, 7])
|
|
|
+ .expect("write failed");
|
|
|
+ trailered.seek(SeekFrom::Start(0)).expect("seek failed");
|
|
|
+ trailered
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn trailered_seek_from_start() {
|
|
|
+ const EXPECTED: u8 = 2;
|
|
|
+ let mut trailered = trailered_for_seek_test();
|
|
|
+
|
|
|
+ trailered
|
|
|
+ .seek(SeekFrom::Start(EXPECTED as u64))
|
|
|
+ .expect("seek failed");
|
|
|
+
|
|
|
+ let mut actual = [0u8; 1];
|
|
|
+ trailered.read(&mut actual).expect("read failed");
|
|
|
+ assert_eq!(EXPECTED, actual[0]);
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn trailered_seek_from_curr() {
|
|
|
+ const EXPECTED: u8 = 5;
|
|
|
+ let mut trailered = trailered_for_seek_test();
|
|
|
+ trailered
|
|
|
+ .seek(SeekFrom::Start(6))
|
|
|
+ .expect("seek from start failed");
|
|
|
+
|
|
|
+ trailered
|
|
|
+ .seek(SeekFrom::Current(-1))
|
|
|
+ .expect("seek from current failed");
|
|
|
+
|
|
|
+ let mut actual = [0u8; 1];
|
|
|
+ trailered.read(&mut actual).expect("read failed");
|
|
|
+ assert_eq!(EXPECTED, actual[0]);
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn trailered_seek_from_end() {
|
|
|
+ const EXPECTED: u8 = 7;
|
|
|
+ let mut trailered = trailered_for_seek_test();
|
|
|
+
|
|
|
+ trailered.seek(SeekFrom::End(-1)).expect("seek failed");
|
|
|
+
|
|
|
+ let mut actual = [0u8; 1];
|
|
|
+ trailered.read(&mut actual).expect("read failed");
|
|
|
+ assert_eq!(EXPECTED, actual[0]);
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Tests that a read past the end of the body in a `Trailered<T>` is not allowed.
|
|
|
+ #[test]
|
|
|
+ fn trailered_read_limited_to_body_len() {
|
|
|
+ let (mut trailered, trailer) =
|
|
|
+ Trailered::new(Cursor::new(Vec::new())).expect("failed to create Trailered");
|
|
|
+ assert!(trailer.is_none());
|
|
|
+ const EXPECTED: &[u8] = &[1, 1, 1, 1, 1, 0, 0, 0];
|
|
|
+ trailered.write(&[1u8; 5]).expect("write failed");
|
|
|
+ trailered.flush(&1u8).expect("flush failed");
|
|
|
+ trailered.seek(SeekFrom::Start(0)).expect("seek failed");
|
|
|
+ let mut actual = vec![0u8; EXPECTED.len()];
|
|
|
+
|
|
|
+ // If read goes past the end of the body then there will be a 1 in the sixth position of
|
|
|
+ // actual.
|
|
|
+ trailered.read(&mut actual).expect("read failed");
|
|
|
+
|
|
|
+ assert_eq!(EXPECTED, actual);
|
|
|
+ }
|
|
|
}
|