// SPDX-License-Identifier: AGPL-3.0-or-later pub use private::Trailered; mod private { use std::{ io::{self, BufReader, Seek, SeekFrom}, marker::PhantomData, }; use btserde::{read_from, write_to}; use positioned_io::{ReadAt, Size, WriteAt}; use serde::{de::DeserializeOwned, Serialize}; use crate::{ bterr, BoxInIoErr, Cursor, Decompose, FlushMeta, Result, Sectored, SizeExt, WriteInteg, }; /// A struct which wraps a stream and which writes a trailing data structure to it when flushed. pub struct Trailered { inner: T, body_len: u64, phantom: PhantomData, write_buf: Vec, } impl Trailered { pub fn empty(inner: T) -> Trailered { Trailered { inner, body_len: 0, phantom: PhantomData, write_buf: Vec::new(), } } /// Creates a new `Trailered` containing the given `T`. This method requires that the given /// stream is either empty, or contains a valid serialization of `D` and the offset at which /// `D` is stored. pub fn new(inner: T) -> Result<(Trailered, Option)> { let end = inner.size_or_err()?; if 0 == end { return Ok((Self::empty(inner), None)); } let mut reader = BufReader::new(Cursor::new(inner)); let offset: i64 = std::mem::size_of::() as i64; if end < offset as u64 { return Err(bterr!("inner stream is non-empty but too small")); } reader.seek(SeekFrom::End(-offset))?; let offset: i64 = read_from(&mut reader)?; if end < offset.unsigned_abs() { return Err(bterr!("inner stream is non-empty but too small")); } let body_len = reader.seek(SeekFrom::End(offset))?; let trailer: D = read_from(&mut reader)?; let inner = reader.into_inner().into_inner(); Ok(( Trailered { inner, body_len, phantom: PhantomData, write_buf: Vec::new(), }, Some(trailer), )) } } impl Trailered { fn update_body_len(&mut self, pos: u64, written: usize) -> usize { let new_pos = pos + written as u64; self.body_len = self.body_len.max(new_pos); written } } impl ReadAt for Trailered { fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result { if pos > self.body_len { return Err(bterr!("pos {pos} is past the end of body ({})", self.body_len).into()); } let available_u64 = self.body_len - pos; let available: usize = available_u64.try_into().box_err()?; let limit = buf.len().min(available); self.inner.read_at(pos, &mut buf[..limit]) } } impl Trailered { pub fn write_at(&mut self, pos: u64, buf: &[u8]) -> io::Result { let written = self.inner.write_at(pos, buf)?; Ok(self.update_body_len(pos, written)) } /// Writes the trailer followed by the offset relative to the end of the inner stream where /// the trailer was written. See the `Trailered::new` method for how this is read out of /// the inner stream. fn write_trailer(&mut self, trailer: &D) -> io::Result<()> { self.write_buf.clear(); write_to(trailer, &mut self.write_buf)?; // This is the length of `write_buf` after writing an additional `i64` into it. let write_buf_len = (self.write_buf.len() + std::mem::size_of::()) as u64; let offset: i64 = write_buf_len.try_into().box_err()?; write_to(&(-offset), &mut self.write_buf)?; self.inner.write_all_at(self.body_len, &self.write_buf)?; Ok(()) } pub fn flush(&mut self, trailer: &D) -> io::Result<()> { self.write_trailer(trailer)?; self.inner.flush() } } impl Trailered { pub fn flush_integ(&mut self, trailer: &D, integrity: &[u8]) -> io::Result<()> { self.write_trailer(trailer)?; self.inner.flush_integ(integrity) } } impl Decompose for Trailered { fn into_inner(self) -> T { self.inner } } impl, D> AsRef for Trailered { fn as_ref(&self) -> &U { self.inner.as_ref() } } impl, D> AsMut for Trailered { fn as_mut(&mut self) -> &mut U { self.inner.as_mut() } } impl Trailered { pub fn flush_meta(&mut self) -> Result<()> { self.inner.flush_meta() } } impl Sectored for Trailered { fn sector_sz(&self) -> usize { self.inner.sector_sz() } } impl Size for Trailered { fn size(&self) -> io::Result> { Ok(Some(self.body_len)) } } } #[cfg(test)] mod tests { use super::*; use positioned_io::ReadAt; use crate::{test_helpers::BtCursor as Cursor, Decompose}; /// Tests that a new `Trailered` can be created from an empty stream. #[test] fn trailered_new_empty() { let cursor = Cursor::new(Vec::new()); let (_, trailer): (_, Option) = Trailered::new(cursor).expect("Trailered::new failed"); assert_eq!(None, trailer); } /// Tests that an error is returned when an attempt is made to create a `Trailered` from a /// non-empty stream which is too short. #[test] fn trailered_new_inner_too_short_is_error() { let cursor = Cursor::new([0u8; 5]); let result = Trailered::<_, u128>::new(cursor); assert!(result.is_err()) } /// Checks that the trailer is persisted to the inner stream. #[test] fn trailered_trailer_persisted() { const EXPECTED: &str = "Everyone deserves to be remembered,"; let cursor = { let cursor = Cursor::new(Vec::new()); let (mut trailered, trailer) = Trailered::<_, String>::new(cursor).expect("Trailered::new failed"); assert!(trailer.is_none()); trailered .flush(&EXPECTED.to_string()) .expect("flush failed"); trailered.into_inner() }; let (_, trailer) = Trailered::<_, String>::new(cursor).expect("Trailered::new failed"); assert_eq!(EXPECTED, trailer.unwrap()); } #[test] fn trailered_written_data_persisted() { const EXPECTED: &[u8] = b"and every life has something to teach us."; let cursor = { let (mut trailered, _) = Trailered::<_, u8>::new(Cursor::new(Vec::new())) .expect("failed to create first trailered"); trailered.write_at(0, EXPECTED).expect("write failed"); trailered.flush(&1).expect("flush failed"); trailered.into_inner() }; let (trailered, _) = Trailered::<_, u8>::new(cursor).expect("failed to created second trailered"); let mut actual = vec![0u8; EXPECTED.len()]; trailered.read_at(0, &mut actual).expect("read failed"); assert_eq!(EXPECTED, actual); } /// Tests that a read past the end of the body in a `Trailered` is not allowed. #[test] fn trailered_read_limited_to_body_len() { const EXPECTED: &[u8] = &[1, 1, 1, 1, 1, 0, 0, 0]; let (mut trailered, ..) = Trailered::new(Cursor::new(Vec::new())).expect("failed to create Trailered"); trailered.write_at(0, &[1u8; 5]).expect("write failed"); trailered.flush(&1u8).expect("flush failed"); let mut actual = vec![0u8; EXPECTED.len()]; trailered.read_at(0, &mut actual).expect("read failed"); assert_eq!(EXPECTED, actual); } }