123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- // SPDX-License-Identifier: AGPL-3.0-or-later
- pub use private::Trailered;
- mod private {
- use std::{
- io::{self, BufReader, Seek, SeekFrom},
- marker::PhantomData,
- };
- use btserde::{read_from, write_to};
- use positioned_io::{ReadAt, Size, WriteAt};
- use serde::{de::DeserializeOwned, Serialize};
- use crate::{
- bterr, BoxInIoErr, Cursor, Decompose, FlushMeta, Result, Sectored, SizeExt, WriteInteg,
- };
- /// A struct which wraps a stream and which writes a trailing data structure to it when flushed.
- pub struct Trailered<T, D> {
- inner: T,
- body_len: u64,
- phantom: PhantomData<D>,
- write_buf: Vec<u8>,
- }
- impl<T: ReadAt + Size, D: DeserializeOwned> Trailered<T, D> {
- pub fn empty(inner: T) -> Trailered<T, D> {
- Trailered {
- inner,
- body_len: 0,
- phantom: PhantomData,
- write_buf: Vec::new(),
- }
- }
- /// Creates a new `Trailered<T>` containing the given `T`. This method requires that the given
- /// stream is either empty, or contains a valid serialization of `D` and the offset at which
- /// `D` is stored.
- pub fn new(inner: T) -> Result<(Trailered<T, D>, Option<D>)> {
- let end = inner.size_or_err()?;
- if 0 == end {
- return Ok((Self::empty(inner), None));
- }
- let mut reader = BufReader::new(Cursor::new(inner));
- let offset: i64 = std::mem::size_of::<i64>() as i64;
- if end < offset as u64 {
- return Err(bterr!("inner stream is non-empty but too small"));
- }
- reader.seek(SeekFrom::End(-offset))?;
- let offset: i64 = read_from(&mut reader)?;
- if end < offset.unsigned_abs() {
- return Err(bterr!("inner stream is non-empty but too small"));
- }
- let body_len = reader.seek(SeekFrom::End(offset))?;
- let trailer: D = read_from(&mut reader)?;
- let inner = reader.into_inner().into_inner();
- Ok((
- Trailered {
- inner,
- body_len,
- phantom: PhantomData,
- write_buf: Vec::new(),
- },
- Some(trailer),
- ))
- }
- }
- impl<T, D> Trailered<T, D> {
- fn update_body_len(&mut self, pos: u64, written: usize) -> usize {
- let new_pos = pos + written as u64;
- self.body_len = self.body_len.max(new_pos);
- written
- }
- }
- impl<T: ReadAt, D> ReadAt for Trailered<T, D> {
- fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result<usize> {
- if pos > self.body_len {
- return Err(bterr!("pos {pos} is past the end of body ({})", self.body_len).into());
- }
- let available_u64 = self.body_len - pos;
- let available: usize = available_u64.try_into().box_err()?;
- let limit = buf.len().min(available);
- self.inner.read_at(pos, &mut buf[..limit])
- }
- }
- impl<T: WriteAt, D: Serialize> Trailered<T, D> {
- pub fn write_at(&mut self, pos: u64, buf: &[u8]) -> io::Result<usize> {
- let written = self.inner.write_at(pos, buf)?;
- Ok(self.update_body_len(pos, written))
- }
- /// Writes the trailer followed by the offset relative to the end of the inner stream where
- /// the trailer was written. See the `Trailered::new` method for how this is read out of
- /// the inner stream.
- fn write_trailer(&mut self, trailer: &D) -> io::Result<()> {
- self.write_buf.clear();
- write_to(trailer, &mut self.write_buf)?;
- // This is the length of `write_buf` after writing an additional `i64` into it.
- let write_buf_len = (self.write_buf.len() + std::mem::size_of::<i64>()) as u64;
- let offset: i64 = write_buf_len.try_into().box_err()?;
- write_to(&(-offset), &mut self.write_buf)?;
- self.inner.write_all_at(self.body_len, &self.write_buf)?;
- Ok(())
- }
- pub fn flush(&mut self, trailer: &D) -> io::Result<()> {
- self.write_trailer(trailer)?;
- self.inner.flush()
- }
- }
- impl<T: WriteInteg + Size, D: Serialize> Trailered<T, D> {
- pub fn flush_integ(&mut self, trailer: &D, integrity: &[u8]) -> io::Result<()> {
- self.write_trailer(trailer)?;
- self.inner.flush_integ(integrity)
- }
- }
- impl<T, D> Decompose<T> for Trailered<T, D> {
- fn into_inner(self) -> T {
- self.inner
- }
- }
- impl<U, T: AsRef<U>, D> AsRef<U> for Trailered<T, D> {
- fn as_ref(&self) -> &U {
- self.inner.as_ref()
- }
- }
- impl<U, T: AsMut<U>, D> AsMut<U> for Trailered<T, D> {
- fn as_mut(&mut self) -> &mut U {
- self.inner.as_mut()
- }
- }
- impl<T: FlushMeta, D> Trailered<T, D> {
- pub fn flush_meta(&mut self) -> Result<()> {
- self.inner.flush_meta()
- }
- }
- impl<T: Sectored, D> Sectored for Trailered<T, D> {
- fn sector_sz(&self) -> usize {
- self.inner.sector_sz()
- }
- }
- impl<T, D> Size for Trailered<T, D> {
- fn size(&self) -> io::Result<Option<u64>> {
- Ok(Some(self.body_len))
- }
- }
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- use positioned_io::ReadAt;
- use crate::{test_helpers::BtCursor as Cursor, Decompose};
- /// Tests that a new `Trailered<T>` can be created from an empty stream.
- #[test]
- fn trailered_new_empty() {
- let cursor = Cursor::new(Vec::new());
- let (_, trailer): (_, Option<String>) =
- Trailered::new(cursor).expect("Trailered::new failed");
- assert_eq!(None, trailer);
- }
- /// Tests that an error is returned when an attempt is made to create a `Trailered<T>` from a
- /// non-empty stream which is too short.
- #[test]
- fn trailered_new_inner_too_short_is_error() {
- let cursor = Cursor::new([0u8; 5]);
- let result = Trailered::<_, u128>::new(cursor);
- assert!(result.is_err())
- }
- /// Checks that the trailer is persisted to the inner stream.
- #[test]
- fn trailered_trailer_persisted() {
- const EXPECTED: &str = "Everyone deserves to be remembered,";
- let cursor = {
- let cursor = Cursor::new(Vec::new());
- let (mut trailered, trailer) =
- Trailered::<_, String>::new(cursor).expect("Trailered::new failed");
- assert!(trailer.is_none());
- trailered
- .flush(&EXPECTED.to_string())
- .expect("flush failed");
- trailered.into_inner()
- };
- let (_, trailer) = Trailered::<_, String>::new(cursor).expect("Trailered::new failed");
- assert_eq!(EXPECTED, trailer.unwrap());
- }
- #[test]
- fn trailered_written_data_persisted() {
- const EXPECTED: &[u8] = b"and every life has something to teach us.";
- let cursor = {
- let (mut trailered, _) = Trailered::<_, u8>::new(Cursor::new(Vec::new()))
- .expect("failed to create first trailered");
- trailered.write_at(0, EXPECTED).expect("write failed");
- trailered.flush(&1).expect("flush failed");
- trailered.into_inner()
- };
- let (trailered, _) =
- Trailered::<_, u8>::new(cursor).expect("failed to created second trailered");
- let mut actual = vec![0u8; EXPECTED.len()];
- trailered.read_at(0, &mut actual).expect("read failed");
- assert_eq!(EXPECTED, actual);
- }
- /// Tests that a read past the end of the body in a `Trailered<T>` is not allowed.
- #[test]
- fn trailered_read_limited_to_body_len() {
- const EXPECTED: &[u8] = &[1, 1, 1, 1, 1, 0, 0, 0];
- let (mut trailered, ..) =
- Trailered::new(Cursor::new(Vec::new())).expect("failed to create Trailered");
- trailered.write_at(0, &[1u8; 5]).expect("write failed");
- trailered.flush(&1u8).expect("flush failed");
- let mut actual = vec![0u8; EXPECTED.len()];
- trailered.read_at(0, &mut actual).expect("read failed");
- assert_eq!(EXPECTED, actual);
- }
- }
|