// SPDX-License-Identifier: AGPL-3.0-or-later pub use private::SignStream; mod private { use crate::{ bterr, crypto::{Error, Result, Signature, Signer, Verifier}, Block, BlockMeta, Decompose, MetaAccess, ReadExt, Sectored, }; use anyhow::anyhow; use btserde::{read_from, to_vec, write_to}; use std::io::{self, Read, Seek, SeekFrom, Write}; /// The length of the array in the `SignStream::index_bytes` field. const INDEX_BYTES_LEN: usize = std::mem::size_of::(); /// A stream which signs each sector of data written and verifies each sector read. /// /// Note that the correct [BlockId] needs to be configured for the inner stream before the /// first call to `read` or `write`. Upon the first such call the `id_bytes` field will be /// initialized by serializing the current value of the [BlockId] obtained from the inner stream /// using the [MetaAccess] trait. pub struct SignStream { inner: T, creds: C, /// The `btserde` serialization of the `BlockId` of this stream. This data is signed /// along with the index of each sector to ensure sectors cannot be reordered or moved /// between blocks. id_bytes: Option>, /// The 0-based index of the next sector to be read or written. index: u64, /// The `btserde` serialization of index. index_bytes: [u8; INDEX_BYTES_LEN], /// The sector size of this stream. This is the size of the buffer expected by the /// `read` and `write` methods. out_sz: usize, /// The length of the `data` field expected from [Signature] instances. sig_len: usize, } impl SignStream { #[allow(dead_code)] pub fn new(inner: T, creds: C) -> Result> { // TODO: This is way too brittle. If creds that produce a different sized signature // are ever used in the future, then this will break. let (extra, sig_len) = { let sig = creds.sign(std::iter::empty())?; let vec = to_vec(&sig)?; (vec.len(), sig.data.len()) }; let in_sz = inner.sector_sz(); if in_sz < extra { return Err(bterr!("sector size is too small")); } let out_sz = in_sz - extra; Ok(SignStream { inner, creds, id_bytes: None, index: 0, index_bytes: [0u8; INDEX_BYTES_LEN], out_sz, sig_len, }) } } impl SignStream { fn out_to_index(&self, outer_offset: u64) -> u64 { outer_offset / self.out_sz as u64 } /// Asserts that the `data` field in the given [Signature] is the correct length. fn assert_sig_len(&self, sig: &Signature) -> Result<()> { let actual = sig.data.len(); if self.sig_len != actual { Err(bterr!(Error::IncorrectSize { expected: self.sig_len, actual, })) } else { Ok(()) } } fn set_index(&mut self, index: u64) -> Result<()> { let mut slice = self.index_bytes.as_mut_slice(); write_to(&index, &mut slice)?; self.index = index; Ok(()) } fn incr_index(&mut self) -> Result<()> { self.set_index(self.index + 1) } } impl SignStream { fn sig_input<'b, 's: 'b>( &'s self, buf: &'b [u8], ) -> Result> { let id_bytes = self .id_bytes .as_ref() .ok_or_else(|| bterr!("id_bytes has not been initialized"))?; Ok([id_bytes, self.index_bytes.as_slice(), buf].into_iter()) } fn init_id_bytes(&mut self) -> Result<()> { if self.id_bytes.is_none() { let vec = to_vec(self.inner.meta_body().block_id()?)?; self.id_bytes = Some(vec); } Ok(()) } } impl SignStream { fn index_to_in(&self, index: u64) -> u64 { self.inner.offset_at(index) } fn index_to_out(&self, index: u64) -> u64 { self.out_sz as u64 * index } } impl SignStream { fn reset_inner_pos(&mut self) -> io::Result { self.inner .seek(SeekFrom::Start(self.index_to_in(self.index))) } } impl Sectored for SignStream { fn sector_sz(&self) -> usize { self.out_sz } } impl Write for SignStream { fn write(&mut self, buf: &[u8]) -> std::io::Result { self.assert_sector_sz(buf.len())?; self.inner.write_all(buf)?; self.init_id_bytes()?; let sig = self.creds.sign(self.sig_input(buf)?)?; self.assert_sig_len(&sig)?; write_to(&sig, &mut self.inner)?; self.incr_index()?; Ok(self.out_sz) } fn flush(&mut self) -> std::io::Result<()> { self.inner.flush() } } impl Read for SignStream { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { self.assert_at_least_sector_sz(buf.len())?; let buf = &mut buf[..self.sector_sz()]; let read = self.inner.fill_buf(buf)?; if 0 == read { return Ok(0); } self.assert_sector_sz(read)?; let sig: Signature = match read_from(&mut self.inner) { Ok(sig) => sig, Err(err) => { self.reset_inner_pos()?; return Err(err.into()); } }; self.init_id_bytes()?; let result = self.creds.verify(self.sig_input(buf)?, sig.as_slice()); if let Err(err) = result { self.reset_inner_pos()?; return Err(bterr!(err).into()); } if let Err(err) = self.incr_index() { self.reset_inner_pos()?; return Err(bterr!(err).into()); } Ok(read) } } impl Seek for SignStream { fn seek(&mut self, out_pos: std::io::SeekFrom) -> std::io::Result { let out_pos = match out_pos { SeekFrom::Start(from_start) => from_start, SeekFrom::Current(from_curr) => { self.index_to_out(self.index).wrapping_add_signed(from_curr) } SeekFrom::End(_) => { return Err(crate::Error::new( anyhow!("seek from end is not supported") .context(io::ErrorKind::Unsupported), ) .into()); } }; let index = self.out_to_index(out_pos); let in_pos = self.index_to_in(index); self.inner.seek(SeekFrom::Start(in_pos))?; self.set_index(index)?; Ok(out_pos) } } impl, C> AsRef for SignStream { fn as_ref(&self) -> &BlockMeta { self.inner.as_ref() } } impl, C> AsMut for SignStream { fn as_mut(&mut self) -> &mut BlockMeta { self.inner.as_mut() } } impl MetaAccess for SignStream {} impl Block for SignStream { fn flush_meta(&mut self) -> crate::Result<()> { self.inner.flush_meta() } } impl Decompose for SignStream { fn into_inner(self) -> T { self.inner } } } #[cfg(test)] mod tests { use std::io::{Read, Seek, SeekFrom, Write}; use super::*; use crate::{ crypto::ConcreteCreds, test_helpers::{node_creds, Randomizer, SectoredCursor}, Decompose, Sectored, SECTOR_SZ_DEFAULT, }; fn sign_stream_with_block_id( sect_sz: usize, ) -> SignStream>, &'static ConcreteCreds> { let cursor = SectoredCursor::new(Vec::new(), sect_sz).require_sect_sz(false); SignStream::new(cursor, node_creds()).unwrap() } fn sign_stream_with_sz( sect_sz: usize, ) -> SignStream>, &'static ConcreteCreds> { sign_stream_with_block_id(sect_sz) } fn sign_stream() -> SignStream>, &'static ConcreteCreds> { sign_stream_with_sz(SECTOR_SZ_DEFAULT) } #[test] fn new_empty() { let _ = sign_stream(); } #[test] fn write() { let mut stream = sign_stream(); let data = vec![1u8; stream.sector_sz()]; stream.write(&data).expect("write failed"); } #[test] fn seek() { let in_sect_sz = SECTOR_SZ_DEFAULT; let mut stream = sign_stream_with_sz(in_sect_sz); let out_sect_sz = stream.sector_sz(); let expected: u64 = out_sect_sz.try_into().unwrap(); let data = vec![1u8; out_sect_sz]; stream.write(&data).expect("first write failed"); stream.write(&data).expect("second write failed"); let actual = stream.seek(SeekFrom::Start(expected)).expect("seek failed"); assert_eq!(expected, actual); let expected: u64 = in_sect_sz.try_into().unwrap(); let actual = stream .into_inner() .stream_position() .expect("stream_position failed"); assert_eq!(expected, actual); } #[test] fn write_read_once() { let mut stream = sign_stream(); let sect_sz = stream.sector_sz(); let expected = vec![1u8; sect_sz]; stream.write(&expected).expect("write failed"); stream.seek(SeekFrom::Start(0)).expect("seek failed"); let mut actual = vec![0u8; sect_sz]; let read = stream.read(&mut actual).expect("read failed"); assert_eq!(sect_sz, read); assert_eq!(expected, actual); } fn fill_vec(vec: &mut Vec, value: T) -> &mut Vec { vec.clear(); vec.extend(std::iter::repeat(value).take(vec.capacity())); vec } #[test] fn write_read_many() { const ITER: u8 = 16; let mut stream = sign_stream(); let sect_sz = stream.sector_sz(); let mut expected = Vec::with_capacity(sect_sz); let mut actual = Vec::with_capacity(sect_sz); for k in 0..ITER { fill_vec(&mut expected, k); stream.write(&expected).expect("write failed"); } stream.seek(SeekFrom::Start(0)).expect("seek failed"); for k in 0..ITER { let read = stream.read(fill_vec(&mut actual, 0)).expect("read failed"); assert_eq!(sect_sz, read); fill_vec(&mut expected, k); assert_eq!(expected, actual); } } #[test] fn write_read_random() { const ITER: usize = 16; let mut stream = sign_stream(); let sect_sz = stream.sector_sz(); let rando = Randomizer::new([37; Randomizer::HASH.len()]); let indices: Vec = rando.take(ITER).map(|e| (e % ITER) as u64).collect(); let mut expected = Vec::with_capacity(sect_sz); let mut actual = Vec::with_capacity(sect_sz); // Fill the stream with zeros. for _ in 0..ITER { stream .write(fill_vec(&mut expected, 0)) .expect("write failed"); } for index in indices.iter().map(|e| *e) { let offset = stream.offset_at(index); stream .seek(SeekFrom::Start(offset as u64)) .expect("seek failed"); fill_vec(&mut expected, (index + 1) as u8); stream.write(&expected).expect("write failed"); } for index in indices.iter().map(|e| *e) { let offset = stream.offset_at(index); stream .seek(SeekFrom::Start(offset as u64)) .expect("seek failed"); fill_vec(&mut expected, (index + 1) as u8); stream.read(fill_vec(&mut actual, 0)).expect("read failed"); assert_eq!(expected, actual); } } #[test] fn modify_inner_is_err() { let in_sect_sz = SECTOR_SZ_DEFAULT; let inner = SectoredCursor::new(Vec::new(), in_sect_sz).require_sect_sz(false); let mut stream = SignStream::new(inner, node_creds()).expect("SignStream::new failed"); let out_sect_sz = stream.sector_sz(); let sect = vec![0u8; out_sect_sz]; stream.write(§).expect("write failed"); let mut inner = stream.into_inner(); inner.seek(SeekFrom::Start(0)).expect("seek failed"); inner.write(&[1u8]).expect("second write failed"); inner.seek(SeekFrom::Start(0)).expect("seek failed"); let mut stream = SignStream::new(inner, node_creds()).expect("second SignStream::new failed"); let mut buf = vec![0u8; out_sect_sz]; let result = stream.read(&mut buf); let actual_err = result.err().unwrap().into_inner().unwrap(); let actual = format!("{actual_err}"); assert_eq!("invalid signature", actual); } }