// SPDX-License-Identifier: AGPL-3.0-or-later //! Contains the [SectoredBuf] type. use log::error; use positioned_io::Size; use safemem::write_bytes; use std::io::{self, Read, Seek, SeekFrom, Write}; use crate::{ bterr, error::DisplayErr, suppress_err_if_non_zero, BlockError, BlockMeta, BoxInIoErr, Decompose, MetaAccess, Positioned, ReadDual, ReadExt, Result, Sectored, SeekFromExt, SizeExt, Split, TrySeek, WriteDual, ZeroExtendable, EMPTY_SLICE, }; pub use private::SectoredBuf; mod private { use super::*; /// A stream which buffers writes and read such that the inner stream only sees reads and writes /// of sector length buffers. pub struct SectoredBuf { inner: T, buf: Vec, /// The offset into the inner stream which the zero offset byte in `buf` corresponds to. buf_start: usize, /// Indicates if the contents of `buf` have been written to, and so whether `buf` needs to /// be written back to `inner` before it is refilled. dirty: bool, /// The current position of this stream, expressed as an offset into the inner stream. pos: usize, } impl> SectoredBuf { /// Creates a new [SectoredBuf] which buffers the given stream. pub fn new(inner: T) -> Result> { let sect_sz = inner.sector_sz(); let mut sectored = SectoredBuf { inner, buf: Vec::new(), buf_start: 0, dirty: false, pos: 0, }; sectored.buf.resize(sect_sz, 0); sectored.inner.rewind()?; sectored.fill_internal_buf()?; Ok(sectored) } } impl SectoredBuf { /// Updates the size stored in the metadata of the block. pub fn update_size(inner: &mut T, size: usize) -> Result<()> { inner.mut_meta_body().access_secrets(|secrets| { secrets.size = secrets.size.max(size as u64); Ok(()) }) } } impl SectoredBuf { /// Returns a reference to the inner stream. pub fn get_ref(&self) -> &T { &self.inner } /// Returns a mutable reference to the inner stream. pub fn get_mut(&mut self) -> &mut T { &mut self.inner } /// Returns the offset into the internal buffer that corresponds to the current position. fn buf_pos(&self) -> usize { let buf_pos = self.pos - self.buf_start; debug_assert!(buf_pos <= self.buf.len()); buf_pos } } impl> SectoredBuf { fn len(&self) -> usize { self.inner .as_ref() .body .secrets() .unwrap() .size .try_into() .unwrap() } /// Returns one more than the last index in the internal buffer which can be read. fn buf_end(&self) -> usize { let len = self.len(); let sect_sz = self.sector_sz(); let limit = len.min(self.buf_start + sect_sz); limit - self.buf_start } } impl> SectoredBuf { /// Fills the internal buffer by reading from the inner stream at the current position /// and updates `self.buf_start` with the position read from. fn fill_internal_buf(&mut self) -> Result { self.buf_start = self.inner.stream_position()?.try_into().box_err()?; let read_bytes = if self.buf_start < self.len() { let read_bytes = self.inner.fill_buf(&mut self.buf)?; if read_bytes < self.buf.len() { return Err(bterr!(BlockError::IncorrectSize { expected: self.buf.len(), actual: read_bytes, })); } read_bytes } else { 0 }; Ok(read_bytes) } } impl Split, T> for SectoredBuf { fn split(self) -> (SectoredBuf<&'static [u8]>, T) { let new_self = SectoredBuf { inner: EMPTY_SLICE, buf: self.buf, buf_start: self.buf_start, dirty: self.dirty, pos: self.pos, }; (new_self, self.inner) } fn combine(left: SectoredBuf<&'static [u8]>, right: T) -> Self { SectoredBuf { inner: right, buf: left.buf, buf_start: left.buf_start, dirty: left.dirty, pos: left.pos, } } } impl Decompose for SectoredBuf { fn into_inner(self) -> T { self.inner } } impl Sectored for SectoredBuf { fn sector_sz(&self) -> usize { self.buf.len() } } impl Write for SectoredBuf { fn write(&mut self, mut src: &[u8]) -> io::Result { let src_len_start = src.len(); let mut dest = { let buf_pos = self.buf_pos(); &mut self.buf[buf_pos..] }; while !src.is_empty() { if dest.is_empty() { suppress_err_if_non_zero!(src_len_start - src.len(), self.flush()); dest = &mut self.buf[..]; } let sz = src.len().min(dest.len()); dest[..sz].copy_from_slice(&src[..sz]); dest = &mut dest[sz..]; src = &src[sz..]; self.dirty = sz > 0; self.pos += sz; Self::update_size(&mut self.inner, self.pos)?; } Ok(src_len_start - src.len()) } fn flush(&mut self) -> io::Result<()> { if !self.dirty { return Ok(()); } // Write out the contents of the buffer. let sect_sz: u64 = self.sector_sz().try_into().box_err()?; let inner_pos = self.inner.stream_position()?; let inner_pos_usize: usize = inner_pos.try_into().box_err()?; let is_new_sector = self.pos > inner_pos_usize; let is_full = (self.buf.len() - self.buf_pos()) == 0; let (seek_to, fill_internal_buf) = if is_new_sector { if is_full { (inner_pos + sect_sz, true) } else { (inner_pos, true) } } else { // The contents of the buffer were previously read from inner, so we write the // updated contents to the same offset. let sect_start: u64 = self.buf_start.try_into().box_err()?; self.inner.seek(SeekFrom::Start(sect_start))?; if is_full { (inner_pos, true) } else { // This is the one case were we don't have to refill the internal buffer. (inner_pos - sect_sz, false) } }; self.inner.write_all(&self.buf)?; self.inner.flush()?; // Seek to the next position. self.inner.seek(SeekFrom::Start(seek_to))?; if fill_internal_buf { self.fill_internal_buf()?; } self.dirty = false; Ok(()) } } impl ZeroExtendable for SectoredBuf { fn zero_extend(&mut self, num_zeros: u64) -> io::Result<()> { if num_zeros == 0 { return Ok(()); } let prev_pos = self.pos; let num_zeros_sz: usize = num_zeros.try_into().display_err()?; self.seek(SeekFrom::End(0))?; let end_pos = self.pos + num_zeros_sz; { let start = self.buf_pos(); let end = self.buf.len().min(start + num_zeros_sz); write_bytes(&mut self.buf[start..end], 0); self.dirty = self.dirty || end > start; self.pos += end - start; Self::update_size(&mut self.inner, self.pos)?; self.flush()?; } if self.pos >= end_pos { self.seek(SeekFrom::Start(prev_pos as u64))?; return Ok(()); } write_bytes(&mut self.buf, 0); let iters = (end_pos - self.pos) / self.buf.len(); for _ in 0..iters { self.dirty = true; self.pos += self.buf.len(); Self::update_size(&mut self.inner, self.pos)?; self.flush()?; } let remain = (end_pos - self.pos) % self.buf.len(); self.pos += remain; self.dirty = remain > 0; Self::update_size(&mut self.inner, self.pos)?; self.flush()?; self.seek(SeekFrom::Start(prev_pos as u64))?; Ok(()) } } /// Returns the slice of the internal buffer which is ready to be read from. /// If the buffer all the bytes in the buffer have been consumed, then the buffer is refilled. /// The returned slice will be empty if and only if there are no additional bytes in the /// inner stream. macro_rules! readable_slice { ($self:expr) => {{ let pos = $self.buf_pos(); let end = $self.buf_end(); if pos == end && $self.pos < $self.len() { match $self.fill_internal_buf() { Ok(nread) => { if nread > 0 { Ok(&$self.buf[..$self.buf_end()]) } else { Ok(&$self.buf[..0]) } } Err(err) => Err(err), } } else { Ok(&$self.buf[pos..end]) } }}; } impl> Read for SectoredBuf { fn read(&mut self, mut dest: &mut [u8]) -> io::Result { if self.pos == self.len() { return Ok(0); } let dest_len_start = dest.len(); let mut src = readable_slice!(self)?; while !dest.is_empty() { if src.is_empty() { src = suppress_err_if_non_zero!( dest_len_start - dest.len(), readable_slice!(self) ); // If `src` is still empty, then we've reached the end of the stream. if src.is_empty() { break; } } let sz = src.len().min(dest.len()); dest[..sz].copy_from_slice(&src[..sz]); dest = &mut dest[sz..]; src = &src[sz..]; self.pos += sz; } Ok(dest_len_start - dest.len()) } } impl> SectoredBuf { /// Seeks this stream to the given position. /// If a seek to a different sector is needed then `pre_seek` is called before this seek /// is performed. This can be used to flush buffered data, or to prevent the seek if a /// flush can't be performed. fn seek_impl io::Result<()>>( &mut self, seek_from: SeekFrom, pre_seek: F, ) -> io::Result { let pos = self.pos as u64; let pos_new = seek_from.abs(|| Ok(pos), || self.size_or_err())?; let len = self.len(); if pos_new > len as u64 { return Err(io::Error::new( io::ErrorKind::InvalidInput, format!("can't seek to {pos_new}, only {len} bytes total"), )); } let sect_sz = self.sector_sz() as u64; let sect_index = pos / sect_sz; let sect_index_new = pos_new / sect_sz; if sect_index != sect_index_new || sect_sz == pos - self.buf_start as u64 { pre_seek(self)?; let seek_to = sect_index_new * sect_sz; self.inner.seek(SeekFrom::Start(seek_to))?; self.fill_internal_buf()?; } self.pos = pos_new.try_into().box_err()?; Ok(pos_new) } /// Returns a slice of the internal buffer that starts at the given offset in the inner /// stream, and which is no longer than the given size. Note that this slice may /// be shorter than the `size` parameter if the number of bytes in the internal buffer is /// less than `size`. pub fn get_buf(&self, offset: u64, size: u64) -> Result<&[u8]> { let offset: usize = offset.try_into().unwrap(); let size: usize = size.try_into().unwrap(); let sect_sz = self.sector_sz(); let index = offset / sect_sz; if self.buf_start != sect_sz * index { return Err(bterr!( "SectoredBuf in wrong position to return buf for offset {offset}, size {size}" )); } let start = offset % sect_sz; let end = self.buf.len().min(start + size); Ok(&self.buf[start..end]) } } impl Seek for SectoredBuf { fn seek(&mut self, seek_from: SeekFrom) -> io::Result { self.seek_impl(seek_from, |sich| sich.flush()) } } impl> TrySeek for SectoredBuf { fn try_seek(&mut self, seek_from: SeekFrom) -> io::Result<()> { self.seek_impl(seek_from, |sich| { if sich.dirty { Err(io::Error::new( io::ErrorKind::Unsupported, "SectoredBuf::try_seek failed because it has unwritten data", )) } else { Ok(()) } })?; Ok(()) } } impl> AsRef for SectoredBuf { fn as_ref(&self) -> &U { self.inner.as_ref() } } impl> AsMut for SectoredBuf { fn as_mut(&mut self) -> &mut U { self.inner.as_mut() } } impl> Size for SectoredBuf { /// Returns the size of the block, which the value stored in the block's metadata, not /// the length of the inner stream. fn size(&self) -> io::Result> { Ok(Some(self.inner.as_ref().body.secrets()?.size)) } } impl WriteDual for SectoredBuf { fn write_from(&mut self, mut read: R, mut count: usize) -> io::Result { let pos_start = self.pos; let mut dest = { let pos = self.buf_pos(); &mut self.buf[pos..] }; let dest_len = dest.len(); dest = &mut dest[..dest_len.min(count)]; while count > 0 { if dest.is_empty() { suppress_err_if_non_zero!(self.pos - pos_start, self.flush()); dest = &mut self.buf[..]; let dest_len = dest.len(); dest = &mut dest[..dest_len.min(count)]; } let nread = suppress_err_if_non_zero!(self.pos - pos_start, read.read(dest)); if 0 == nread { break; } self.dirty = true; dest = &mut dest[nread..]; self.pos += nread; count -= nread; self.inner.mut_meta_body().access_secrets(|secrets| { secrets.size = secrets.size.max(self.pos as u64); Ok(()) })?; } Ok(self.pos - pos_start) } } impl> ReadDual for SectoredBuf { fn read_into(&mut self, mut write: W, mut count: usize) -> io::Result { let pos_start = self.pos; let mut src = readable_slice!(self)?; src = &src[..src.len().min(count)]; while count > 0 { if src.is_empty() { src = suppress_err_if_non_zero!(self.pos - pos_start, readable_slice!(self)); src = &src[..src.len().min(count)]; // If `src` is still empty, then we've reached the end of the stream. if src.is_empty() { break; } } let written = write.write(src)?; src = &src[written..]; self.pos += written; count -= written; } Ok(self.pos - pos_start) } } impl Positioned for SectoredBuf { fn pos(&self) -> usize { self.pos } } } #[cfg(test)] mod tests { use super::*; use crate::{ test_helpers::{ integer_array, read_check, write_fill, BtCursor, Randomizer, SectoredCursor, SECTOR_SZ_DEFAULT, }, Cursor, }; fn make_sectored_buf(sect_sz: usize, sect_ct: usize) -> SectoredBuf>> { SectoredBuf::new(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz)) .expect("compose for sectored buffer failed") } #[test] fn sectored_buf_fill_inner() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); let sect_sz = sectored.sector_sz(); assert_eq!(0, sect_sz % 16); let chunk_sz = sect_sz / 16; let chunk_ct = SECT_CT * 16; write_fill(&mut sectored, chunk_sz, chunk_ct); } #[test] fn sectored_buf_write_read_sequential() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); let sect_sz = sectored.sector_sz(); assert_eq!(0, sect_sz % 16); let chunk_sz = sect_sz / 16; // We subtract one here so that the underlying buffer is not completely filled. This // exercises the length limiting capability of the sectored buffer. let chunk_ct = SECT_CT * 16 - 1; write_fill(&mut sectored, chunk_sz, chunk_ct); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); read_check(&mut sectored, chunk_sz, chunk_ct); } #[test] fn sectored_buf_len_preserved() { const SECT_SZ: usize = SECTOR_SZ_DEFAULT; const SECT_CT: usize = 16; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); let expected = vec![42u8; 12]; // We need to ensure that writing expected will not fill up the buffer in sectored. assert!(expected.len() < sectored.sector_sz()); sectored.write_all(&expected).expect("write failed"); sectored.flush().expect("flush failed"); let inner = sectored.into_inner(); let mut sectored = SectoredBuf::new(inner).expect("failed to compose sectored buffer"); let mut actual = vec![0u8; expected.len()]; sectored .fill_buf(actual.as_mut_slice()) .expect("failed to fill actual"); assert_eq!(expected, actual); } #[test] fn sectored_buf_seek() { let sect_sz = 16usize; let sect_ct = 16usize; let cap = sect_sz * sect_ct - std::mem::size_of::(); let source = { let mut source = Vec::with_capacity(cap); source.extend( std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None }) .take(cap), ); source }; let mut sectored = make_sectored_buf(sect_sz, sect_ct); sectored.write(&source).expect("write failed"); let mut buf = [0u8; 1]; let end = cap.try_into().expect("cap cannot fit into a u8"); for pos in (0..end).rev() { sectored .seek(SeekFrom::Start(pos as u64)) .expect("seek failed"); sectored.read(&mut buf).expect("read failed"); assert_eq!(pos, buf[0]); } } /// Tests that data written can be read from the buffer without an intervening call to `flush`. #[test] fn sectored_buf_write_then_read() { const EXPECTED: &[u8] = b"alpha"; let mut sectored = make_sectored_buf(4096, 1); sectored.write(EXPECTED).expect("write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut actual = [0u8; EXPECTED.len()]; sectored.read(&mut actual).expect("read failed"); assert_eq!(EXPECTED, actual); } #[test] fn sectored_buf_write_read_random() { const SECT_SZ: usize = 16; const SECT_CT: usize = 16; const CAP: usize = SECT_SZ * SECT_CT - std::mem::size_of::(); let source = { let mut expected = Vec::with_capacity(CAP); expected.extend( std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None }) .take(CAP), ); expected }; let indices: Vec<(usize, usize)> = { let rando = Randomizer::new([3u8; Randomizer::HASH.len()]); let rando2 = Randomizer::new([5u8; Randomizer::HASH.len()]); rando .zip(rando2) .take(SECT_CT) .map(|(mut first, mut second)| { first %= source.len(); second &= source.len(); let low = first.min(second); let high = first.max(second); (low, high) }) .collect() }; let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT); sectored .write_all(&[0u8; CAP]) .expect("failed to fill sectored"); sectored.flush().expect("flush failed"); for (_k, (low, high)) in indices.iter().enumerate() { sectored .seek(SeekFrom::Start(*low as u64)) .expect("seek failed"); let src = &source[*low..*high]; sectored.write(src).expect("write failed"); } sectored.flush().expect("flush failed"); let mut buf = vec![0u8; CAP]; for (_k, (low, high)) in indices.iter().enumerate() { sectored .seek(SeekFrom::Start(*low as u64)) .expect("seek failed"); let actual = &mut buf[*low..*high]; sectored.fill_buf(actual).expect("read failed"); let expected = &source[*low..*high]; assert_eq!(expected, actual); } } #[test] fn sectored_buf_read_past_end() { const LEN: usize = 32; let mut sectored = SectoredBuf::new(SectoredCursor::new([0u8; LEN], LEN)).expect("compose failed"); const BUF_LEN: usize = LEN + 1; sectored.write(&[1u8; BUF_LEN - 1]).expect("write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut buf = [0u8; BUF_LEN]; // Note that buf is one byte longer than the available capacity in the cursor. sectored.read(&mut buf).expect("read failed"); assert_eq!(&[1u8; BUF_LEN - 1], &buf[..(BUF_LEN - 1)]); assert_eq!(0u8, buf[BUF_LEN - 1]); } /// Tests that the data written in try_compose is actually written back to the underlying stream. #[test] fn sectored_buf_write_back() { let mut sectored = SectoredBuf::new(SectoredCursor::new(vec![0u8; 24], 16)).expect("compose failed"); let expected = [1u8; 8]; sectored.write(&expected).expect("first write failed"); sectored.write(&[2u8; 8]).expect("second write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut actual = [0u8; 8]; sectored.read(&mut actual).expect("read failed"); assert_eq!(expected, actual); } #[test] fn sectored_buf_write_past_end() { const LEN: usize = 8; let mut sectored = SectoredBuf::new(SectoredCursor::new(vec![0u8; 0], LEN)).expect("compos failed"); let expected = [1u8; LEN + 1]; sectored.write(&expected).expect("write failed"); sectored.seek(SeekFrom::Start(0)).expect("seek failed"); let mut actual = [0u8; LEN + 1]; sectored.read(&mut actual).expect("read failed"); assert_eq!(expected, actual); } #[test] fn read_into_count_limits_read_bytes() { const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7]; let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap(); sectored.write(&DATA).unwrap(); sectored.rewind().unwrap(); let mut actual = BtCursor::new([0u8; DATA.len()]); let read = sectored.read_into(&mut actual, DATA.len() - 1).unwrap(); assert_eq!(DATA.len() - 1, read); assert_eq!([0, 1, 2, 3, 4, 5, 6, 0], actual.into_inner()); } #[test] fn read_into_read_spans_multiple_sectors() { const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7]; let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap(); sectored.write(&DATA).unwrap(); sectored.write(&DATA).unwrap(); sectored.rewind().unwrap(); const ACTUAL_LEN: usize = DATA.len() + DATA.len() / 2; let mut actual = BtCursor::new([0u8; ACTUAL_LEN]); let read = sectored.read_into(&mut actual, ACTUAL_LEN).unwrap(); assert_eq!(ACTUAL_LEN, read); assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3], actual.into_inner()); } /// Tests that a read asking for more bytes than the number available returns the number /// available. #[test] fn read_into_read_past_len() { const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7]; let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap(); sectored.write(&DATA).unwrap(); sectored.rewind().unwrap(); const ACTUAL_LEN: usize = DATA.len() + 1; let mut actual = BtCursor::new([0u8; ACTUAL_LEN]); let read = sectored.read_into(&mut actual, ACTUAL_LEN).unwrap(); assert_eq!(DATA.len(), read); assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 0], actual.into_inner()); } #[test] fn write_from_full_cursor() { const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7]; let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap(); let written = sectored .write_from(BtCursor::new(DATA), DATA.len()) .unwrap(); assert_eq!(DATA.len(), written); sectored.flush().unwrap(); assert_eq!(&DATA, sectored.into_inner().into_inner().as_slice()); } #[test] fn write_from_count_limits_bytes_read() { const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7]; let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap(); let mut cursor = BtCursor::new(DATA); let written = sectored.write_from(&mut cursor, DATA.len() / 2).unwrap(); assert_eq!(DATA.len() / 2, written); sectored.flush().unwrap(); assert_eq!( &[0, 1, 2, 3, 0, 0, 0, 0], sectored.into_inner().into_inner().as_slice() ); let mut remaining = Vec::new(); cursor.read_to_end(&mut remaining).unwrap(); assert_eq!(&[4, 5, 6, 7], remaining.as_slice()); } #[test] fn write_from_write_spans_multiple_sectors() { const SECT_SZ: usize = 4; const DATA: [u8; SECT_SZ + 1] = [0, 1, 2, 3, 4]; let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap(); let written = sectored .write_from(BtCursor::new(DATA), DATA.len()) .unwrap(); assert_eq!(DATA.len(), written); sectored.rewind().unwrap(); let mut actual = Vec::new(); sectored.read_to_end(&mut actual).unwrap(); assert_eq!(&[0, 1, 2, 3, 4], actual.as_slice()); } #[test] fn try_seek_to_second_sector() { const SECT_SZ: usize = 4; const DATA_LEN: usize = 2 * SECT_SZ; const DATA: [u8; DATA_LEN] = integer_array::(0); let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap(); let written = sectored .write_from(BtCursor::new(DATA), DATA.len()) .unwrap(); assert_eq!(DATA.len(), written); sectored.rewind().unwrap(); const OFFSET: u64 = SECT_SZ as u64 + 1; sectored.try_seek(SeekFrom::Start(OFFSET)).unwrap(); let mut actual = BtCursor::new(Vec::new()); sectored.read_into(&mut actual, SECT_SZ).unwrap(); const EXPECTED_LEN: usize = SECT_SZ - 1; const EXPECTED: [u8; EXPECTED_LEN] = integer_array::(OFFSET as u8); assert_eq!(&EXPECTED, actual.into_inner().as_slice()); } #[test] fn seek_past_end_is_error() { const SECT_SZ: usize = 4; let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap(); let result = sectored.seek(SeekFrom::Start(1)); let matched = if let Err(err) = result { io::ErrorKind::InvalidInput == err.kind() } else { false }; assert!(matched); } #[test] fn seek_to_zero_when_empty() { const SECT_SZ: usize = 4; let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap(); let pos = sectored.seek(SeekFrom::Start(0)).unwrap(); assert_eq!(0, pos); } #[test] fn read_into_consumes_remaining_sector() { // SECT_SZ % 2 is assumed be 0. const SECT_SZ: usize = 4; const DATA_LEN: usize = 2 * SECT_SZ; const DATA: [u8; DATA_LEN] = integer_array::(0); let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap(); sectored.write(&DATA).unwrap(); const MID_FIRST: usize = SECT_SZ / 2; sectored.seek(SeekFrom::Start(MID_FIRST as u64)).unwrap(); // Notice that the `count` argument plus the current `sectored.pos` equals `SECT_SZ`. // This will cause `pos / sect_sz` to increase by one, but without incrementing // `buf_start`. This creates a special case that `seek_impl` has to take into account. sectored .read_into(&mut BtCursor::new([0u8; MID_FIRST]), MID_FIRST) .unwrap(); const MID_SECOND: u64 = 3 * SECT_SZ as u64 / 2; sectored.try_seek(SeekFrom::Start(MID_SECOND)).unwrap(); const EXPECTED_LEN: usize = SECT_SZ / 2; const EXPECTED: [u8; EXPECTED_LEN] = integer_array::(MID_SECOND as u8); let mut actual = BtCursor::new(Vec::new()); let nread = sectored.read_into(&mut actual, EXPECTED_LEN).unwrap(); assert_eq!(EXPECTED_LEN, nread); assert_eq!(&EXPECTED, actual.into_inner().as_slice()); } #[test] fn read_into_reads_nothing_when_at_end() { const SECT_SZ: usize = 8; let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap(); sectored.write([1u8; 6].as_slice()).unwrap(); let mut actual = Cursor::new(Vec::new()); sectored.read_into(&mut actual, SECT_SZ).unwrap(); assert_eq!(&[0u8; 0], actual.get_ref().as_slice()); } #[test] fn zero_extend_less_than_sect_sz() { let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), 8)).unwrap(); let written = sectored.write([1u8; 4].as_slice()).unwrap(); assert_eq!(4, written); sectored.zero_extend(2).unwrap(); sectored.rewind().unwrap(); let mut actual = Cursor::new(Vec::new()); sectored.read_into(&mut actual, 8).unwrap(); assert_eq!(&[1, 1, 1, 1, 0, 0], actual.get_ref().as_slice()); } #[test] fn zero_extend_multiple_sectors() { const SECT_SZ: usize = 8; let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap(); let written = sectored.write([1u8; SECT_SZ / 2].as_slice()).unwrap(); assert_eq!(SECT_SZ / 2, written); const EXPECTED_LEN: usize = 3 * SECT_SZ / 2; sectored.rewind().unwrap(); // Note that zero_extend is called when the current position is 0. The current position // must not affect zero extension. sectored.zero_extend(EXPECTED_LEN as u64).unwrap(); let mut actual = Cursor::new(Vec::new()); sectored .read_into(&mut actual, EXPECTED_LEN + SECT_SZ / 2) .unwrap(); let actual = actual.into_inner(); assert_eq!(&[1, 1, 1, 1], &actual[..(SECT_SZ / 2)]); assert_eq!(&[0u8; EXPECTED_LEN], &actual[(SECT_SZ / 2)..]); } #[test] fn zero_extend_multiple_sectors_with_remainder() { const SECT_SZ: usize = 8; let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap(); let written = sectored.write([1u8; SECT_SZ / 2].as_slice()).unwrap(); assert_eq!(SECT_SZ / 2, written); // Notice that the total length of the inner stream will be 2 * SECT_SZ + 1. const EXPECTED_LEN: usize = 3 * SECT_SZ / 2 + 1; sectored.rewind().unwrap(); sectored.zero_extend(EXPECTED_LEN as u64).unwrap(); let mut actual = Cursor::new(Vec::new()); sectored .read_into(&mut actual, EXPECTED_LEN + SECT_SZ / 2) .unwrap(); let actual = actual.into_inner(); assert_eq!(&[1, 1, 1, 1], &actual[..(SECT_SZ / 2)]); assert_eq!(&[0u8; EXPECTED_LEN], &actual[(SECT_SZ / 2)..]); } #[test] fn get_buf() { const SECT_SZ: usize = crate::SECTOR_SZ_DEFAULT; const DIVISOR: usize = 8; const READ_SZ: usize = SECT_SZ / DIVISOR; let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap(); let mut expected = vec![0u8; READ_SZ]; for index in 0..(DIVISOR as u8 + 1) { expected.fill(index + 1); sectored.write(&expected).unwrap(); } sectored.rewind().unwrap(); for index in 0..(DIVISOR as u8 + 1) { let offset = (READ_SZ * index as usize) as u64; sectored.try_seek(SeekFrom::Start(offset)).unwrap(); let actual = sectored.get_buf(offset, READ_SZ as u64).unwrap(); expected.fill(index + 1); assert!(actual == expected); } } }