|
- // SPDX-License-Identifier: AGPL-3.0-or-later
- pub use private::SignStream;
- mod private {
- use crate::{
- bterr,
- crypto::{Error, Result, Signature, Signer, Verifier},
- Block, BlockMeta, Decompose, MetaAccess, ReadExt, Sectored,
- };
- use anyhow::anyhow;
- use btserde::{read_from, to_vec, write_to};
- use std::io::{self, Read, Seek, SeekFrom, Write};
- /// The length of the array in the `SignStream::index_bytes` field.
- const INDEX_BYTES_LEN: usize = std::mem::size_of::<u64>();
- /// A stream which signs each sector of data written and verifies each sector read.
- ///
- /// Note that the correct [BlockId] needs to be configured for the inner stream before the
- /// first call to `read` or `write`. Upon the first such call the `id_bytes` field will be
- /// initialized by serializing the current value of the [BlockId] obtained from the inner stream
- /// using the [MetaAccess] trait.
- pub struct SignStream<T, C> {
- inner: T,
- creds: C,
- /// The `btserde` serialization of the `BlockId` of this stream. This data is signed
- /// along with the index of each sector to ensure sectors cannot be reordered or moved
- /// between blocks.
- id_bytes: Option<Vec<u8>>,
- /// The 0-based index of the next sector to be read or written.
- index: u64,
- /// The `btserde` serialization of index.
- index_bytes: [u8; INDEX_BYTES_LEN],
- /// The sector size of this stream. This is the size of the buffer expected by the
- /// `read` and `write` methods.
- out_sz: usize,
- /// The length of the `data` field expected from [Signature] instances.
- sig_len: usize,
- }
- impl<T: Sectored, C: Signer> SignStream<T, C> {
- #[allow(dead_code)]
- pub fn new(inner: T, creds: C) -> Result<SignStream<T, C>> {
- // TODO: This is way too brittle. If creds that produce a different sized signature
- // are ever used in the future, then this will break.
- let (extra, sig_len) = {
- let sig = creds.sign(std::iter::empty())?;
- let vec = to_vec(&sig)?;
- (vec.len(), sig.data.len())
- };
- let in_sz = inner.sector_sz();
- if in_sz < extra {
- return Err(bterr!("sector size is too small"));
- }
- let out_sz = in_sz - extra;
- Ok(SignStream {
- inner,
- creds,
- id_bytes: None,
- index: 0,
- index_bytes: [0u8; INDEX_BYTES_LEN],
- out_sz,
- sig_len,
- })
- }
- }
- impl<T, C> SignStream<T, C> {
- fn out_to_index(&self, outer_offset: u64) -> u64 {
- outer_offset / self.out_sz as u64
- }
- /// Asserts that the `data` field in the given [Signature] is the correct length.
- fn assert_sig_len(&self, sig: &Signature) -> Result<()> {
- let actual = sig.data.len();
- if self.sig_len != actual {
- Err(bterr!(Error::IncorrectSize {
- expected: self.sig_len,
- actual,
- }))
- } else {
- Ok(())
- }
- }
- fn set_index(&mut self, index: u64) -> Result<()> {
- let mut slice = self.index_bytes.as_mut_slice();
- write_to(&index, &mut slice)?;
- self.index = index;
- Ok(())
- }
- fn incr_index(&mut self) -> Result<()> {
- self.set_index(self.index + 1)
- }
- }
- impl<T: MetaAccess, C> SignStream<T, C> {
- fn sig_input<'b, 's: 'b>(
- &'s self,
- buf: &'b [u8],
- ) -> Result<impl Iterator<Item = &'b [u8]>> {
- let id_bytes = self
- .id_bytes
- .as_ref()
- .ok_or_else(|| bterr!("id_bytes has not been initialized"))?;
- Ok([id_bytes, self.index_bytes.as_slice(), buf].into_iter())
- }
- fn init_id_bytes(&mut self) -> Result<()> {
- if self.id_bytes.is_none() {
- let vec = to_vec(self.inner.meta_body().block_id()?)?;
- self.id_bytes = Some(vec);
- }
- Ok(())
- }
- }
- impl<T: Sectored, C> SignStream<T, C> {
- fn index_to_in(&self, index: u64) -> u64 {
- self.inner.offset_at(index)
- }
- fn index_to_out(&self, index: u64) -> u64 {
- self.out_sz as u64 * index
- }
- }
- impl<T: Sectored + Seek, C> SignStream<T, C> {
- fn reset_inner_pos(&mut self) -> io::Result<u64> {
- self.inner
- .seek(SeekFrom::Start(self.index_to_in(self.index)))
- }
- }
- impl<T, C> Sectored for SignStream<T, C> {
- fn sector_sz(&self) -> usize {
- self.out_sz
- }
- }
- impl<T: Write + MetaAccess, C: Signer> Write for SignStream<T, C> {
- fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
- self.assert_sector_sz(buf.len())?;
- self.inner.write_all(buf)?;
- self.init_id_bytes()?;
- let sig = self.creds.sign(self.sig_input(buf)?)?;
- self.assert_sig_len(&sig)?;
- write_to(&sig, &mut self.inner)?;
- self.incr_index()?;
- Ok(self.out_sz)
- }
- fn flush(&mut self) -> std::io::Result<()> {
- self.inner.flush()
- }
- }
- impl<T: Read + Seek + Sectored + MetaAccess, C: Verifier> Read for SignStream<T, C> {
- fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
- self.assert_at_least_sector_sz(buf.len())?;
- let buf = &mut buf[..self.sector_sz()];
- let read = self.inner.fill_buf(buf)?;
- if 0 == read {
- return Ok(0);
- }
- self.assert_sector_sz(read)?;
- let sig: Signature = match read_from(&mut self.inner) {
- Ok(sig) => sig,
- Err(err) => {
- self.reset_inner_pos()?;
- return Err(err.into());
- }
- };
- self.init_id_bytes()?;
- let result = self.creds.verify(self.sig_input(buf)?, sig.as_slice());
- if let Err(err) = result {
- self.reset_inner_pos()?;
- return Err(bterr!(err).into());
- }
- if let Err(err) = self.incr_index() {
- self.reset_inner_pos()?;
- return Err(bterr!(err).into());
- }
- Ok(read)
- }
- }
- impl<T: Seek + Sectored, C> Seek for SignStream<T, C> {
- fn seek(&mut self, out_pos: std::io::SeekFrom) -> std::io::Result<u64> {
- let out_pos = match out_pos {
- SeekFrom::Start(from_start) => from_start,
- SeekFrom::Current(from_curr) => {
- self.index_to_out(self.index).wrapping_add_signed(from_curr)
- }
- SeekFrom::End(_) => {
- return Err(crate::Error::new(
- anyhow!("seek from end is not supported")
- .context(io::ErrorKind::Unsupported),
- )
- .into());
- }
- };
- let index = self.out_to_index(out_pos);
- let in_pos = self.index_to_in(index);
- self.inner.seek(SeekFrom::Start(in_pos))?;
- self.set_index(index)?;
- Ok(out_pos)
- }
- }
- impl<T: AsRef<BlockMeta>, C> AsRef<BlockMeta> for SignStream<T, C> {
- fn as_ref(&self) -> &BlockMeta {
- self.inner.as_ref()
- }
- }
- impl<T: AsMut<BlockMeta>, C> AsMut<BlockMeta> for SignStream<T, C> {
- fn as_mut(&mut self) -> &mut BlockMeta {
- self.inner.as_mut()
- }
- }
- impl<T: MetaAccess, C> MetaAccess for SignStream<T, C> {}
- impl<T: Block + Sectored, C: Signer + Verifier> Block for SignStream<T, C> {
- fn flush_meta(&mut self) -> crate::Result<()> {
- self.inner.flush_meta()
- }
- }
- impl<T, C> Decompose<T> for SignStream<T, C> {
- fn into_inner(self) -> T {
- self.inner
- }
- }
- }
- #[cfg(test)]
- mod tests {
- use std::io::{Read, Seek, SeekFrom, Write};
- use super::*;
- use crate::{
- crypto::ConcreteCreds,
- test_helpers::{node_creds, Randomizer, SectoredCursor},
- Decompose, Sectored, SECTOR_SZ_DEFAULT,
- };
- fn sign_stream_with_block_id(
- sect_sz: usize,
- ) -> SignStream<SectoredCursor<Vec<u8>>, &'static ConcreteCreds> {
- let cursor = SectoredCursor::new(Vec::new(), sect_sz).require_sect_sz(false);
- SignStream::new(cursor, node_creds()).unwrap()
- }
- fn sign_stream_with_sz(
- sect_sz: usize,
- ) -> SignStream<SectoredCursor<Vec<u8>>, &'static ConcreteCreds> {
- sign_stream_with_block_id(sect_sz)
- }
- fn sign_stream() -> SignStream<SectoredCursor<Vec<u8>>, &'static ConcreteCreds> {
- sign_stream_with_sz(SECTOR_SZ_DEFAULT)
- }
- #[test]
- fn new_empty() {
- let _ = sign_stream();
- }
- #[test]
- fn write() {
- let mut stream = sign_stream();
- let data = vec![1u8; stream.sector_sz()];
- stream.write(&data).expect("write failed");
- }
- #[test]
- fn seek() {
- let in_sect_sz = SECTOR_SZ_DEFAULT;
- let mut stream = sign_stream_with_sz(in_sect_sz);
- let out_sect_sz = stream.sector_sz();
- let expected: u64 = out_sect_sz.try_into().unwrap();
- let data = vec![1u8; out_sect_sz];
- stream.write(&data).expect("first write failed");
- stream.write(&data).expect("second write failed");
- let actual = stream.seek(SeekFrom::Start(expected)).expect("seek failed");
- assert_eq!(expected, actual);
- let expected: u64 = in_sect_sz.try_into().unwrap();
- let actual = stream
- .into_inner()
- .stream_position()
- .expect("stream_position failed");
- assert_eq!(expected, actual);
- }
- #[test]
- fn write_read_once() {
- let mut stream = sign_stream();
- let sect_sz = stream.sector_sz();
- let expected = vec![1u8; sect_sz];
- stream.write(&expected).expect("write failed");
- stream.seek(SeekFrom::Start(0)).expect("seek failed");
- let mut actual = vec![0u8; sect_sz];
- let read = stream.read(&mut actual).expect("read failed");
- assert_eq!(sect_sz, read);
- assert_eq!(expected, actual);
- }
- fn fill_vec<T: Clone>(vec: &mut Vec<T>, value: T) -> &mut Vec<T> {
- vec.clear();
- vec.extend(std::iter::repeat(value).take(vec.capacity()));
- vec
- }
- #[test]
- fn write_read_many() {
- const ITER: u8 = 16;
- let mut stream = sign_stream();
- let sect_sz = stream.sector_sz();
- let mut expected = Vec::with_capacity(sect_sz);
- let mut actual = Vec::with_capacity(sect_sz);
- for k in 0..ITER {
- fill_vec(&mut expected, k);
- stream.write(&expected).expect("write failed");
- }
- stream.seek(SeekFrom::Start(0)).expect("seek failed");
- for k in 0..ITER {
- let read = stream.read(fill_vec(&mut actual, 0)).expect("read failed");
- assert_eq!(sect_sz, read);
- fill_vec(&mut expected, k);
- assert_eq!(expected, actual);
- }
- }
- #[test]
- fn write_read_random() {
- const ITER: usize = 16;
- let mut stream = sign_stream();
- let sect_sz = stream.sector_sz();
- let rando = Randomizer::new([37; Randomizer::HASH.len()]);
- let indices: Vec<u64> = rando.take(ITER).map(|e| (e % ITER) as u64).collect();
- let mut expected = Vec::with_capacity(sect_sz);
- let mut actual = Vec::with_capacity(sect_sz);
- // Fill the stream with zeros.
- for _ in 0..ITER {
- stream
- .write(fill_vec(&mut expected, 0))
- .expect("write failed");
- }
- for index in indices.iter().map(|e| *e) {
- let offset = stream.offset_at(index);
- stream
- .seek(SeekFrom::Start(offset as u64))
- .expect("seek failed");
- fill_vec(&mut expected, (index + 1) as u8);
- stream.write(&expected).expect("write failed");
- }
- for index in indices.iter().map(|e| *e) {
- let offset = stream.offset_at(index);
- stream
- .seek(SeekFrom::Start(offset as u64))
- .expect("seek failed");
- fill_vec(&mut expected, (index + 1) as u8);
- stream.read(fill_vec(&mut actual, 0)).expect("read failed");
- assert_eq!(expected, actual);
- }
- }
- #[test]
- fn modify_inner_is_err() {
- let in_sect_sz = SECTOR_SZ_DEFAULT;
- let inner = SectoredCursor::new(Vec::new(), in_sect_sz).require_sect_sz(false);
- let mut stream = SignStream::new(inner, node_creds()).expect("SignStream::new failed");
- let out_sect_sz = stream.sector_sz();
- let sect = vec![0u8; out_sect_sz];
- stream.write(§).expect("write failed");
- let mut inner = stream.into_inner();
- inner.seek(SeekFrom::Start(0)).expect("seek failed");
- inner.write(&[1u8]).expect("second write failed");
- inner.seek(SeekFrom::Start(0)).expect("seek failed");
- let mut stream =
- SignStream::new(inner, node_creds()).expect("second SignStream::new failed");
- let mut buf = vec![0u8; out_sect_sz];
- let result = stream.read(&mut buf);
- let actual_err = result.err().unwrap().into_inner().unwrap();
- let actual = format!("{actual_err}");
- assert_eq!("invalid signature", actual);
- }
- }
|