pub use private::SectoredBuf; mod private { use log::{error, warn}; use std::io::{self, Read, Seek, SeekFrom, Write}; use crate::{ bterr, Block, BlockError, BlockMeta, BoxInIoErr, Decompose, MetaAccess, ReadExt, Result, Sectored, TryCompose, }; /// A stream which buffers writes and read such that the inner stream only sees reads and writes /// of sector length buffers. pub struct SectoredBuf { inner: T, buf: Vec, /// The offset into the inner stream which the zero offset byte in `buf` corresponds to. buf_start: usize, /// Indicates if the contents of `buf` have been written to, and so whether `buf` needs to /// be written back to `inner` before it is refilled. dirty: bool, /// The current position of this stream, expressed as an offset into the inner stream. pos: usize, } impl SectoredBuf<()> { pub fn new() -> SectoredBuf<()> { SectoredBuf { inner: (), buf: Vec::new(), buf_start: 0, dirty: false, pos: 0, } } } impl SectoredBuf { /// Returns the offset into the internal buffer that corresponds to the current position. fn buf_pos(&self) -> usize { self.pos - self.buf_start } /// Returns the index of the sector which is currently loaded into the buffer. fn sector_index(&self, pos: u64) -> u64 { pos / (self.sector_sz() as u64) } } impl SectoredBuf { fn len(&self) -> usize { self.inner .meta_body() .secrets() .unwrap() .size .try_into() .unwrap() } /// Returns one more than the last index in the internal buffer which can be read. fn buf_end(&self) -> usize { let limit = self.len().min(self.buf_start + self.sector_sz()); limit - self.buf_start } } impl SectoredBuf { /// Fills the internal buffer by reading from the inner stream at the current position /// and updates `self.buf_start` with the position read from. fn fill_internal_buf(&mut self) -> Result { self.buf_start = self.inner.stream_position()?.try_into().box_err()?; let read_bytes = if self.buf_start < self.len() { let read_bytes = self.inner.fill_buf(&mut self.buf)?; if read_bytes < self.buf.len() { return Err(bterr!(BlockError::IncorrectSize { expected: self.buf.len(), actual: read_bytes, })); } read_bytes } else { 0 }; Ok(read_bytes) } } impl Default for SectoredBuf<()> { fn default() -> Self { Self::new() } } impl Decompose for SectoredBuf { fn into_inner(self) -> T { self.inner } } impl TryCompose> for SectoredBuf<()> { type Error = crate::Error; fn try_compose(self, inner: T) -> Result> { let sect_sz = inner.sector_sz(); let mut sectored = SectoredBuf { inner, buf: self.buf, buf_start: 0, dirty: false, pos: 0, }; sectored.buf.resize(sect_sz, 0); sectored.inner.seek(SeekFrom::Start(0))?; sectored.fill_internal_buf()?; Ok(sectored) } } impl Sectored for SectoredBuf { fn sector_sz(&self) -> usize { self.buf.len() } } impl Write for SectoredBuf { fn write(&mut self, mut src: &[u8]) -> io::Result { let src_len_start = src.len(); let mut dest = { let buf_pos = self.buf_pos(); &mut self.buf[buf_pos..] }; while !src.is_empty() { if dest.is_empty() { if let Err(err) = self.flush() { error!("A call to SectoredBuf::flush returned an error: {}", err); break; } dest = &mut self.buf[..]; } let sz = src.len().min(dest.len()); dest[..sz].copy_from_slice(&src[..sz]); dest = &mut dest[sz..]; src = &src[sz..]; self.dirty = sz > 0; self.pos += sz; self.inner.mut_meta_body().access_secrets(|secrets| { secrets.size = secrets.size.max(self.pos as u64); Ok(()) })?; } Ok(src_len_start - src.len()) } fn flush(&mut self) -> io::Result<()> { if !self.dirty { return Ok(()); } // Write out the contents of the buffer. let sect_sz: u64 = self.sector_sz().try_into().box_err()?; let inner_pos = self.inner.stream_position()?; let inner_pos_usize: usize = inner_pos.try_into().box_err()?; let is_new_sector = self.pos > inner_pos_usize; let is_full = (self.buf.len() - self.buf_pos()) == 0; let (seek_to, fill_internal_buf) = if is_new_sector { if is_full { (inner_pos + sect_sz, true) } else { (inner_pos, true) } } else { // The contents of the buffer were previously read from inner, so we write the // updated contents to the same offset. let sect_start: u64 = self.buf_start.try_into().box_err()?; self.inner.seek(SeekFrom::Start(sect_start))?; if is_full { (inner_pos, true) } else { // This is the one case were we don't have to refill the internal buffer. (inner_pos - sect_sz, false) } }; self.inner.write_all(&self.buf)?; self.inner.flush()?; // Seek to the next position. self.inner.seek(SeekFrom::Start(seek_to))?; if fill_internal_buf { self.fill_internal_buf()?; } self.dirty = false; Ok(()) } } impl Read for SectoredBuf { fn read(&mut self, mut dest: &mut [u8]) -> io::Result { if self.pos == self.len() { return Ok(0); } let dest_len_start = dest.len(); let mut src = { let start = self.buf_pos(); let end = self.buf_end(); &self.buf[start..end] }; while !dest.is_empty() { if src.is_empty() { if self.pos >= self.len() { break; } let byte_ct = match self.fill_internal_buf() { Ok(byte_ct) => byte_ct, Err(err) => { warn!("SectoredBuf::full_internal_buf returned an error: {}", err); break; } }; if 0 == byte_ct { break; } src = &self.buf[..byte_ct]; } let sz = src.len().min(dest.len()); dest[..sz].copy_from_slice(&src[..sz]); dest = &mut dest[sz..]; src = &src[sz..]; self.pos += sz; } Ok(dest_len_start - dest.len()) } } impl Seek for SectoredBuf { fn seek(&mut self, seek_from: SeekFrom) -> io::Result { let inner_pos = self.inner.stream_position()?; let inner_pos_new = match seek_from { SeekFrom::Start(rel_start) => rel_start, SeekFrom::Current(rel_curr) => { if rel_curr > 0 { inner_pos + rel_curr as u64 } else { inner_pos - rel_curr as u64 } } SeekFrom::End(_) => { return Err(io::Error::new( io::ErrorKind::Unsupported, "seeking relative to the end of the stream is not supported", )) } }; let sect_index = self.sector_index(inner_pos); let sect_index_new = self.sector_index(inner_pos_new); let pos: u64 = self.pos.try_into().box_err()?; if sect_index != sect_index_new || pos == inner_pos { self.flush()?; let sect_sz: u64 = self.sector_sz().try_into().box_err()?; let seek_to = sect_index_new * sect_sz; self.inner.seek(SeekFrom::Start(seek_to))?; self.fill_internal_buf()?; } self.pos = inner_pos_new.try_into().box_err()?; Ok(inner_pos_new) } } impl> AsRef for SectoredBuf { fn as_ref(&self) -> &BlockMeta { self.inner.as_ref() } } impl> AsMut for SectoredBuf { fn as_mut(&mut self) -> &mut BlockMeta { self.inner.as_mut() } } impl MetaAccess for SectoredBuf {} impl Block for SectoredBuf { fn flush_meta(&mut self) -> Result<()> { self.inner.flush_meta() } } } #[cfg(test)] mod tests { use std::io::{Read, Seek, SeekFrom, Write}; use crate::{ test_helpers::{read_check, write_fill, Randomizer, SectoredCursor}, Decompose, ReadExt, Sectored, TryCompose, SECTOR_SZ_DEFAULT, }; use super::*; fn make_sectored_buf(sect_sz: usize, sect_ct: usize) -> SectoredBuf>> { SectoredBuf::new() .try_compose(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz)) .expect("compose for sectored buffer failed") } #[test] fn sectored_buf_fill_inner() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); let sect_sz = sectored.sector_sz(); assert_eq!(0, sect_sz % 16); let chunk_sz = sect_sz / 16; let chunk_ct = SECT_CT * 16; write_fill(&mut sectored, chunk_sz, chunk_ct); } #[test] fn sectored_buf_write_read_sequential() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); let sect_sz = sectored.sector_sz(); assert_eq!(0, sect_sz % 16); let chunk_sz = sect_sz / 16; // We subtract one here so that the underlying buffer is not completely filled. This // exercises the length limiting capability of the sectored buffer. let chunk_ct = SECT_CT * 16 - 1; write_fill(&mut sectored, chunk_sz, chunk_ct); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); read_check(&mut sectored, chunk_sz, chunk_ct); } #[test] fn sectored_buf_len_preserved() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); let expected = vec![42u8; 12]; // We need to ensure that writing expected will not fill up the buffer in sectored. assert!(expected.len() < sectored.sector_sz()); sectored.write_all(&expected).expect("write failed"); sectored.flush().expect("flush failed"); let inner = sectored.into_inner(); let mut sectored = SectoredBuf::new() .try_compose(inner) .expect("failed to compose sectored buffer"); let mut actual = vec![0u8; expected.len()]; sectored .fill_buf(actual.as_mut_slice()) .expect("failed to fill actual"); assert_eq!(expected, actual); } #[test] fn sectored_buf_seek() { let sect_sz = 16usize; let sect_ct = 16usize; let cap = sect_sz * sect_ct - std::mem::size_of::(); let source = { let mut source = Vec::with_capacity(cap); source.extend( std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None }) .take(cap), ); source }; let mut sectored = make_sectored_buf(sect_sz, sect_ct); sectored.write(&source).expect("write failed"); let mut buf = [0u8; 1]; let end = cap.try_into().expect("cap cannot fit into a u8"); for pos in (0..end).rev() { sectored .seek(SeekFrom::Start(pos as u64)) .expect("seek failed"); sectored.read(&mut buf).expect("read failed"); assert_eq!(pos, buf[0]); } } /// Tests that data written can be read from the buffer without an intervening call to `flush`. #[test] fn sectored_buf_write_then_read() { const EXPECTED: &[u8] = b"alpha"; let mut sectored = make_sectored_buf(4096, 1); sectored.write(EXPECTED).expect("write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut actual = [0u8; EXPECTED.len()]; sectored.read(&mut actual).expect("read failed"); assert_eq!(EXPECTED, actual); } #[test] fn sectored_buf_write_read_random() { const SECT_SZ: usize = 16; const SECT_CT: usize = 16; const CAP: usize = SECT_SZ * SECT_CT - std::mem::size_of::(); let source = { let mut expected = Vec::with_capacity(CAP); expected.extend( std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None }) .take(CAP), ); expected }; let indices: Vec<(usize, usize)> = { let rando = Randomizer::new([3u8; Randomizer::HASH.len()]); let rando2 = Randomizer::new([5u8; Randomizer::HASH.len()]); rando .zip(rando2) .take(SECT_CT) .map(|(mut first, mut second)| { first %= source.len(); second &= source.len(); let low = first.min(second); let high = first.max(second); (low, high) }) .collect() }; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); sectored .write_all(&[0u8; CAP]) .expect("failed to fill sectored"); sectored.flush().expect("flush failed"); for (_k, (low, high)) in indices.iter().enumerate() { sectored .seek(SeekFrom::Start(*low as u64)) .expect("seek failed"); let src = &source[*low..*high]; sectored.write(src).expect("write failed"); } sectored.flush().expect("flush failed"); let mut buf = vec![0u8; CAP]; for (_k, (low, high)) in indices.iter().enumerate() { sectored .seek(SeekFrom::Start(*low as u64)) .expect("seek failed"); let actual = &mut buf[*low..*high]; sectored.fill_buf(actual).expect("read failed"); let expected = &source[*low..*high]; assert_eq!(expected, actual); } } #[test] fn sectored_buf_read_past_end() { const LEN: usize = 32; let mut sectored = SectoredBuf::new() .try_compose(SectoredCursor::new([0u8; LEN], LEN)) .expect("compose failed"); const BUF_LEN: usize = LEN + 1; sectored.write(&[1u8; BUF_LEN - 1]).expect("write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut buf = [0u8; BUF_LEN]; // Note that buf is one byte longer than the available capacity in the cursor. sectored.read(&mut buf).expect("read failed"); assert_eq!(&[1u8; BUF_LEN - 1], &buf[..(BUF_LEN - 1)]); assert_eq!(0u8, buf[BUF_LEN - 1]); } /// Tests that the data written in try_compose is actually written back to the underlying stream. #[test] fn sectored_buf_write_back() { let mut sectored = SectoredBuf::new() .try_compose(SectoredCursor::new(vec![0u8; 24], 16)) .expect("compose failed"); let expected = [1u8; 8]; sectored.write(&expected).expect("first write failed"); sectored.write(&[2u8; 8]).expect("second write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut actual = [0u8; 8]; sectored.read(&mut actual).expect("read failed"); assert_eq!(expected, actual); } #[test] fn sectored_buf_write_past_end() { const LEN: usize = 8; let mut sectored = SectoredBuf::new() .try_compose(SectoredCursor::new(vec![0u8; 0], LEN)) .expect("compos failed"); let expected = [1u8; LEN + 1]; sectored.write(&expected).expect("write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut actual = [0u8; LEN + 1]; sectored.read(&mut actual).expect("read failed"); assert_eq!(expected, actual); } }