123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945 |
- // SPDX-License-Identifier: AGPL-3.0-or-later
- //! Contains the [SectoredBuf] type.
- use log::error;
- use positioned_io::Size;
- use safemem::write_bytes;
- use std::io::{self, Read, Seek, SeekFrom, Write};
- use crate::{
- bterr, error::DisplayErr, suppress_err_if_non_zero, BlockError, BlockMeta, BoxInIoErr,
- Decompose, MetaAccess, Positioned, ReadDual, ReadExt, Result, Sectored, SeekFromExt, SizeExt,
- Split, TrySeek, WriteDual, ZeroExtendable, EMPTY_SLICE,
- };
- pub use private::SectoredBuf;
- mod private {
- use super::*;
- /// A stream which buffers writes and read such that the inner stream only sees reads and writes
- /// of sector length buffers.
- pub struct SectoredBuf<T> {
- inner: T,
- buf: Vec<u8>,
- /// The offset into the inner stream which the zero offset byte in `buf` corresponds to.
- buf_start: usize,
- /// Indicates if the contents of `buf` have been written to, and so whether `buf` needs to
- /// be written back to `inner` before it is refilled.
- dirty: bool,
- /// The current position of this stream, expressed as an offset into the inner stream.
- pos: usize,
- }
- impl<T: Sectored + Read + Seek + AsRef<BlockMeta>> SectoredBuf<T> {
- /// Creates a new [SectoredBuf] which buffers the given stream.
- pub fn new(inner: T) -> Result<SectoredBuf<T>> {
- let sect_sz = inner.sector_sz();
- let mut sectored = SectoredBuf {
- inner,
- buf: Vec::new(),
- buf_start: 0,
- dirty: false,
- pos: 0,
- };
- sectored.buf.resize(sect_sz, 0);
- sectored.inner.rewind()?;
- sectored.fill_internal_buf()?;
- Ok(sectored)
- }
- }
- impl<T: Read + Write + Seek + MetaAccess> SectoredBuf<T> {
- /// Updates the size stored in the metadata of the block.
- pub fn update_size(inner: &mut T, size: usize) -> Result<()> {
- inner.mut_meta_body().access_secrets(|secrets| {
- secrets.size = secrets.size.max(size as u64);
- Ok(())
- })
- }
- }
- impl<T> SectoredBuf<T> {
- /// Returns a reference to the inner stream.
- pub fn get_ref(&self) -> &T {
- &self.inner
- }
- /// Returns a mutable reference to the inner stream.
- pub fn get_mut(&mut self) -> &mut T {
- &mut self.inner
- }
- /// Returns the offset into the internal buffer that corresponds to the current position.
- fn buf_pos(&self) -> usize {
- let buf_pos = self.pos - self.buf_start;
- debug_assert!(buf_pos <= self.buf.len());
- buf_pos
- }
- }
- impl<T: AsRef<BlockMeta>> SectoredBuf<T> {
- fn len(&self) -> usize {
- self.inner
- .as_ref()
- .body
- .secrets()
- .unwrap()
- .size
- .try_into()
- .unwrap()
- }
- /// Returns one more than the last index in the internal buffer which can be read.
- fn buf_end(&self) -> usize {
- let len = self.len();
- let sect_sz = self.sector_sz();
- let limit = len.min(self.buf_start + sect_sz);
- limit - self.buf_start
- }
- }
- impl<T: Read + Seek + AsRef<BlockMeta>> SectoredBuf<T> {
- /// Fills the internal buffer by reading from the inner stream at the current position
- /// and updates `self.buf_start` with the position read from.
- fn fill_internal_buf(&mut self) -> Result<usize> {
- self.buf_start = self.inner.stream_position()?.try_into().box_err()?;
- let read_bytes = if self.buf_start < self.len() {
- let read_bytes = self.inner.fill_buf(&mut self.buf)?;
- if read_bytes < self.buf.len() {
- return Err(bterr!(BlockError::IncorrectSize {
- expected: self.buf.len(),
- actual: read_bytes,
- }));
- }
- read_bytes
- } else {
- 0
- };
- Ok(read_bytes)
- }
- }
- impl<T> Split<SectoredBuf<&'static [u8]>, T> for SectoredBuf<T> {
- fn split(self) -> (SectoredBuf<&'static [u8]>, T) {
- let new_self = SectoredBuf {
- inner: EMPTY_SLICE,
- buf: self.buf,
- buf_start: self.buf_start,
- dirty: self.dirty,
- pos: self.pos,
- };
- (new_self, self.inner)
- }
- fn combine(left: SectoredBuf<&'static [u8]>, right: T) -> Self {
- SectoredBuf {
- inner: right,
- buf: left.buf,
- buf_start: left.buf_start,
- dirty: left.dirty,
- pos: left.pos,
- }
- }
- }
- impl<T> Decompose<T> for SectoredBuf<T> {
- fn into_inner(self) -> T {
- self.inner
- }
- }
- impl<T> Sectored for SectoredBuf<T> {
- fn sector_sz(&self) -> usize {
- self.buf.len()
- }
- }
- impl<T: Seek + Read + Write + MetaAccess> Write for SectoredBuf<T> {
- fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
- let src_len_start = src.len();
- let mut dest = {
- let buf_pos = self.buf_pos();
- &mut self.buf[buf_pos..]
- };
- while !src.is_empty() {
- if dest.is_empty() {
- suppress_err_if_non_zero!(src_len_start - src.len(), self.flush());
- dest = &mut self.buf[..];
- }
- let sz = src.len().min(dest.len());
- dest[..sz].copy_from_slice(&src[..sz]);
- dest = &mut dest[sz..];
- src = &src[sz..];
- self.dirty = sz > 0;
- self.pos += sz;
- Self::update_size(&mut self.inner, self.pos)?;
- }
- Ok(src_len_start - src.len())
- }
- fn flush(&mut self) -> io::Result<()> {
- if !self.dirty {
- return Ok(());
- }
- // Write out the contents of the buffer.
- let sect_sz: u64 = self.sector_sz().try_into().box_err()?;
- let inner_pos = self.inner.stream_position()?;
- let inner_pos_usize: usize = inner_pos.try_into().box_err()?;
- let is_new_sector = self.pos > inner_pos_usize;
- let is_full = (self.buf.len() - self.buf_pos()) == 0;
- let (seek_to, fill_internal_buf) = if is_new_sector {
- if is_full {
- (inner_pos + sect_sz, true)
- } else {
- (inner_pos, true)
- }
- } else {
- // The contents of the buffer were previously read from inner, so we write the
- // updated contents to the same offset.
- let sect_start: u64 = self.buf_start.try_into().box_err()?;
- self.inner.seek(SeekFrom::Start(sect_start))?;
- if is_full {
- (inner_pos, true)
- } else {
- // This is the one case were we don't have to refill the internal buffer.
- (inner_pos - sect_sz, false)
- }
- };
- self.inner.write_all(&self.buf)?;
- self.inner.flush()?;
- // Seek to the next position.
- self.inner.seek(SeekFrom::Start(seek_to))?;
- if fill_internal_buf {
- self.fill_internal_buf()?;
- }
- self.dirty = false;
- Ok(())
- }
- }
- impl<T: Read + Write + Seek + MetaAccess> ZeroExtendable for SectoredBuf<T> {
- fn zero_extend(&mut self, num_zeros: u64) -> io::Result<()> {
- if num_zeros == 0 {
- return Ok(());
- }
- let prev_pos = self.pos;
- let num_zeros_sz: usize = num_zeros.try_into().display_err()?;
- self.seek(SeekFrom::End(0))?;
- let end_pos = self.pos + num_zeros_sz;
- {
- let start = self.buf_pos();
- let end = self.buf.len().min(start + num_zeros_sz);
- write_bytes(&mut self.buf[start..end], 0);
- self.dirty = self.dirty || end > start;
- self.pos += end - start;
- Self::update_size(&mut self.inner, self.pos)?;
- self.flush()?;
- }
- if self.pos >= end_pos {
- self.seek(SeekFrom::Start(prev_pos as u64))?;
- return Ok(());
- }
- write_bytes(&mut self.buf, 0);
- let iters = (end_pos - self.pos) / self.buf.len();
- for _ in 0..iters {
- self.dirty = true;
- self.pos += self.buf.len();
- Self::update_size(&mut self.inner, self.pos)?;
- self.flush()?;
- }
- let remain = (end_pos - self.pos) % self.buf.len();
- self.pos += remain;
- self.dirty = remain > 0;
- Self::update_size(&mut self.inner, self.pos)?;
- self.flush()?;
- self.seek(SeekFrom::Start(prev_pos as u64))?;
- Ok(())
- }
- }
- /// Returns the slice of the internal buffer which is ready to be read from.
- /// If the buffer all the bytes in the buffer have been consumed, then the buffer is refilled.
- /// The returned slice will be empty if and only if there are no additional bytes in the
- /// inner stream.
- macro_rules! readable_slice {
- ($self:expr) => {{
- let pos = $self.buf_pos();
- let end = $self.buf_end();
- if pos == end && $self.pos < $self.len() {
- match $self.fill_internal_buf() {
- Ok(nread) => {
- if nread > 0 {
- Ok(&$self.buf[..$self.buf_end()])
- } else {
- Ok(&$self.buf[..0])
- }
- }
- Err(err) => Err(err),
- }
- } else {
- Ok(&$self.buf[pos..end])
- }
- }};
- }
- impl<T: Read + Seek + AsRef<BlockMeta>> Read for SectoredBuf<T> {
- fn read(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
- if self.pos == self.len() {
- return Ok(0);
- }
- let dest_len_start = dest.len();
- let mut src = readable_slice!(self)?;
- while !dest.is_empty() {
- if src.is_empty() {
- src = suppress_err_if_non_zero!(
- dest_len_start - dest.len(),
- readable_slice!(self)
- );
- // If `src` is still empty, then we've reached the end of the stream.
- if src.is_empty() {
- break;
- }
- }
- let sz = src.len().min(dest.len());
- dest[..sz].copy_from_slice(&src[..sz]);
- dest = &mut dest[sz..];
- src = &src[sz..];
- self.pos += sz;
- }
- Ok(dest_len_start - dest.len())
- }
- }
- impl<T: Seek + Read + AsRef<BlockMeta>> SectoredBuf<T> {
- /// Seeks this stream to the given position.
- /// If a seek to a different sector is needed then `pre_seek` is called before this seek
- /// is performed. This can be used to flush buffered data, or to prevent the seek if a
- /// flush can't be performed.
- fn seek_impl<F: FnOnce(&mut Self) -> io::Result<()>>(
- &mut self,
- seek_from: SeekFrom,
- pre_seek: F,
- ) -> io::Result<u64> {
- let pos = self.pos as u64;
- let pos_new = seek_from.abs(|| Ok(pos), || self.size_or_err())?;
- let len = self.len();
- if pos_new > len as u64 {
- return Err(io::Error::new(
- io::ErrorKind::InvalidInput,
- format!("can't seek to {pos_new}, only {len} bytes total"),
- ));
- }
- let sect_sz = self.sector_sz() as u64;
- let sect_index = pos / sect_sz;
- let sect_index_new = pos_new / sect_sz;
- if sect_index != sect_index_new || sect_sz == pos - self.buf_start as u64 {
- pre_seek(self)?;
- let seek_to = sect_index_new * sect_sz;
- self.inner.seek(SeekFrom::Start(seek_to))?;
- self.fill_internal_buf()?;
- }
- self.pos = pos_new.try_into().box_err()?;
- Ok(pos_new)
- }
- /// Returns a slice of the internal buffer that starts at the given offset in the inner
- /// stream, and which is no longer than the given size. Note that this slice may
- /// be shorter than the `size` parameter if the number of bytes in the internal buffer is
- /// less than `size`.
- pub fn get_buf(&self, offset: u64, size: u64) -> Result<&[u8]> {
- let offset: usize = offset.try_into().unwrap();
- let size: usize = size.try_into().unwrap();
- let sect_sz = self.sector_sz();
- let index = offset / sect_sz;
- if self.buf_start != sect_sz * index {
- return Err(bterr!(
- "SectoredBuf in wrong position to return buf for offset {offset}, size {size}"
- ));
- }
- let start = offset % sect_sz;
- let end = self.buf.len().min(start + size);
- Ok(&self.buf[start..end])
- }
- }
- impl<T: Seek + Read + Write + MetaAccess> Seek for SectoredBuf<T> {
- fn seek(&mut self, seek_from: SeekFrom) -> io::Result<u64> {
- self.seek_impl(seek_from, |sich| sich.flush())
- }
- }
- impl<T: Read + Seek + AsRef<BlockMeta>> TrySeek for SectoredBuf<T> {
- fn try_seek(&mut self, seek_from: SeekFrom) -> io::Result<()> {
- self.seek_impl(seek_from, |sich| {
- if sich.dirty {
- Err(io::Error::new(
- io::ErrorKind::Unsupported,
- "SectoredBuf::try_seek failed because it has unwritten data",
- ))
- } else {
- Ok(())
- }
- })?;
- Ok(())
- }
- }
- impl<U, T: AsRef<U>> AsRef<U> for SectoredBuf<T> {
- fn as_ref(&self) -> &U {
- self.inner.as_ref()
- }
- }
- impl<U, T: AsMut<U>> AsMut<U> for SectoredBuf<T> {
- fn as_mut(&mut self) -> &mut U {
- self.inner.as_mut()
- }
- }
- impl<T: AsRef<BlockMeta>> Size for SectoredBuf<T> {
- /// Returns the size of the block, which the value stored in the block's metadata, not
- /// the length of the inner stream.
- fn size(&self) -> io::Result<Option<u64>> {
- Ok(Some(self.inner.as_ref().body.secrets()?.size))
- }
- }
- impl<T: Read + Write + Seek + MetaAccess> WriteDual for SectoredBuf<T> {
- fn write_from<R: Read>(&mut self, mut read: R, mut count: usize) -> io::Result<usize> {
- let pos_start = self.pos;
- let mut dest = {
- let pos = self.buf_pos();
- &mut self.buf[pos..]
- };
- let dest_len = dest.len();
- dest = &mut dest[..dest_len.min(count)];
- while count > 0 {
- if dest.is_empty() {
- suppress_err_if_non_zero!(self.pos - pos_start, self.flush());
- dest = &mut self.buf[..];
- let dest_len = dest.len();
- dest = &mut dest[..dest_len.min(count)];
- }
- let nread = suppress_err_if_non_zero!(self.pos - pos_start, read.read(dest));
- if 0 == nread {
- break;
- }
- self.dirty = true;
- dest = &mut dest[nread..];
- self.pos += nread;
- count -= nread;
- self.inner.mut_meta_body().access_secrets(|secrets| {
- secrets.size = secrets.size.max(self.pos as u64);
- Ok(())
- })?;
- }
- Ok(self.pos - pos_start)
- }
- }
- impl<T: Read + Seek + AsRef<BlockMeta>> ReadDual for SectoredBuf<T> {
- fn read_into<W: Write>(&mut self, mut write: W, mut count: usize) -> io::Result<usize> {
- let pos_start = self.pos;
- let mut src = readable_slice!(self)?;
- src = &src[..src.len().min(count)];
- while count > 0 {
- if src.is_empty() {
- src = suppress_err_if_non_zero!(self.pos - pos_start, readable_slice!(self));
- src = &src[..src.len().min(count)];
- // If `src` is still empty, then we've reached the end of the stream.
- if src.is_empty() {
- break;
- }
- }
- let written = write.write(src)?;
- src = &src[written..];
- self.pos += written;
- count -= written;
- }
- Ok(self.pos - pos_start)
- }
- }
- impl<T> Positioned for SectoredBuf<T> {
- fn pos(&self) -> usize {
- self.pos
- }
- }
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- use crate::{
- test_helpers::{
- integer_array, read_check, write_fill, BtCursor, Randomizer, SectoredCursor,
- SECTOR_SZ_DEFAULT,
- },
- Cursor,
- };
- fn make_sectored_buf(sect_sz: usize, sect_ct: usize) -> SectoredBuf<SectoredCursor<Vec<u8>>> {
- SectoredBuf::new(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz))
- .expect("compose for sectored buffer failed")
- }
- #[test]
- fn sectored_buf_fill_inner() {
- const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
- const SECT_CT: usize = 16;
- let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
- let sect_sz = sectored.sector_sz();
- assert_eq!(0, sect_sz % 16);
- let chunk_sz = sect_sz / 16;
- let chunk_ct = SECT_CT * 16;
- write_fill(&mut sectored, chunk_sz, chunk_ct);
- }
- #[test]
- fn sectored_buf_write_read_sequential() {
- const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
- const SECT_CT: usize = 16;
- let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
- let sect_sz = sectored.sector_sz();
- assert_eq!(0, sect_sz % 16);
- let chunk_sz = sect_sz / 16;
- // We subtract one here so that the underlying buffer is not completely filled. This
- // exercises the length limiting capability of the sectored buffer.
- let chunk_ct = SECT_CT * 16 - 1;
- write_fill(&mut sectored, chunk_sz, chunk_ct);
- sectored.seek(SeekFrom::Start(0)).expect("seek failed");
- read_check(&mut sectored, chunk_sz, chunk_ct);
- }
- #[test]
- fn sectored_buf_len_preserved() {
- const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
- const SECT_CT: usize = 16;
- let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
- let expected = vec![42u8; 12];
- // We need to ensure that writing expected will not fill up the buffer in sectored.
- assert!(expected.len() < sectored.sector_sz());
- sectored.write_all(&expected).expect("write failed");
- sectored.flush().expect("flush failed");
- let inner = sectored.into_inner();
- let mut sectored = SectoredBuf::new(inner).expect("failed to compose sectored buffer");
- let mut actual = vec![0u8; expected.len()];
- sectored
- .fill_buf(actual.as_mut_slice())
- .expect("failed to fill actual");
- assert_eq!(expected, actual);
- }
- #[test]
- fn sectored_buf_seek() {
- let sect_sz = 16usize;
- let sect_ct = 16usize;
- let cap = sect_sz * sect_ct - std::mem::size_of::<usize>();
- let source = {
- let mut source = Vec::with_capacity(cap);
- source.extend(
- std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
- .take(cap),
- );
- source
- };
- let mut sectored = make_sectored_buf(sect_sz, sect_ct);
- sectored.write(&source).expect("write failed");
- let mut buf = [0u8; 1];
- let end = cap.try_into().expect("cap cannot fit into a u8");
- for pos in (0..end).rev() {
- sectored
- .seek(SeekFrom::Start(pos as u64))
- .expect("seek failed");
- sectored.read(&mut buf).expect("read failed");
- assert_eq!(pos, buf[0]);
- }
- }
- /// Tests that data written can be read from the buffer without an intervening call to `flush`.
- #[test]
- fn sectored_buf_write_then_read() {
- const EXPECTED: &[u8] = b"alpha";
- let mut sectored = make_sectored_buf(4096, 1);
- sectored.write(EXPECTED).expect("write failed");
- sectored.seek(SeekFrom::Start(0)).expect("seek failed");
- let mut actual = [0u8; EXPECTED.len()];
- sectored.read(&mut actual).expect("read failed");
- assert_eq!(EXPECTED, actual);
- }
- #[test]
- fn sectored_buf_write_read_random() {
- const SECT_SZ: usize = 16;
- const SECT_CT: usize = 16;
- const CAP: usize = SECT_SZ * SECT_CT - std::mem::size_of::<usize>();
- let source = {
- let mut expected = Vec::with_capacity(CAP);
- expected.extend(
- std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
- .take(CAP),
- );
- expected
- };
- let indices: Vec<(usize, usize)> = {
- let rando = Randomizer::new([3u8; Randomizer::HASH.len()]);
- let rando2 = Randomizer::new([5u8; Randomizer::HASH.len()]);
- rando
- .zip(rando2)
- .take(SECT_CT)
- .map(|(mut first, mut second)| {
- first %= source.len();
- second &= source.len();
- let low = first.min(second);
- let high = first.max(second);
- (low, high)
- })
- .collect()
- };
- let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
- sectored
- .write_all(&[0u8; CAP])
- .expect("failed to fill sectored");
- sectored.flush().expect("flush failed");
- for (_k, (low, high)) in indices.iter().enumerate() {
- sectored
- .seek(SeekFrom::Start(*low as u64))
- .expect("seek failed");
- let src = &source[*low..*high];
- sectored.write(src).expect("write failed");
- }
- sectored.flush().expect("flush failed");
- let mut buf = vec![0u8; CAP];
- for (_k, (low, high)) in indices.iter().enumerate() {
- sectored
- .seek(SeekFrom::Start(*low as u64))
- .expect("seek failed");
- let actual = &mut buf[*low..*high];
- sectored.fill_buf(actual).expect("read failed");
- let expected = &source[*low..*high];
- assert_eq!(expected, actual);
- }
- }
- #[test]
- fn sectored_buf_read_past_end() {
- const LEN: usize = 32;
- let mut sectored =
- SectoredBuf::new(SectoredCursor::new([0u8; LEN], LEN)).expect("compose failed");
- const BUF_LEN: usize = LEN + 1;
- sectored.write(&[1u8; BUF_LEN - 1]).expect("write failed");
- sectored.seek(SeekFrom::Start(0)).expect("seek failed");
- let mut buf = [0u8; BUF_LEN];
- // Note that buf is one byte longer than the available capacity in the cursor.
- sectored.read(&mut buf).expect("read failed");
- assert_eq!(&[1u8; BUF_LEN - 1], &buf[..(BUF_LEN - 1)]);
- assert_eq!(0u8, buf[BUF_LEN - 1]);
- }
- /// Tests that the data written in try_compose is actually written back to the underlying stream.
- #[test]
- fn sectored_buf_write_back() {
- let mut sectored =
- SectoredBuf::new(SectoredCursor::new(vec![0u8; 24], 16)).expect("compose failed");
- let expected = [1u8; 8];
- sectored.write(&expected).expect("first write failed");
- sectored.write(&[2u8; 8]).expect("second write failed");
- sectored.seek(SeekFrom::Start(0)).expect("seek failed");
- let mut actual = [0u8; 8];
- sectored.read(&mut actual).expect("read failed");
- assert_eq!(expected, actual);
- }
- #[test]
- fn sectored_buf_write_past_end() {
- const LEN: usize = 8;
- let mut sectored =
- SectoredBuf::new(SectoredCursor::new(vec![0u8; 0], LEN)).expect("compos failed");
- let expected = [1u8; LEN + 1];
- sectored.write(&expected).expect("write failed");
- sectored.seek(SeekFrom::Start(0)).expect("seek failed");
- let mut actual = [0u8; LEN + 1];
- sectored.read(&mut actual).expect("read failed");
- assert_eq!(expected, actual);
- }
- #[test]
- fn read_into_count_limits_read_bytes() {
- const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap();
- sectored.write(&DATA).unwrap();
- sectored.rewind().unwrap();
- let mut actual = BtCursor::new([0u8; DATA.len()]);
- let read = sectored.read_into(&mut actual, DATA.len() - 1).unwrap();
- assert_eq!(DATA.len() - 1, read);
- assert_eq!([0, 1, 2, 3, 4, 5, 6, 0], actual.into_inner());
- }
- #[test]
- fn read_into_read_spans_multiple_sectors() {
- const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap();
- sectored.write(&DATA).unwrap();
- sectored.write(&DATA).unwrap();
- sectored.rewind().unwrap();
- const ACTUAL_LEN: usize = DATA.len() + DATA.len() / 2;
- let mut actual = BtCursor::new([0u8; ACTUAL_LEN]);
- let read = sectored.read_into(&mut actual, ACTUAL_LEN).unwrap();
- assert_eq!(ACTUAL_LEN, read);
- assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3], actual.into_inner());
- }
- /// Tests that a read asking for more bytes than the number available returns the number
- /// available.
- #[test]
- fn read_into_read_past_len() {
- const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap();
- sectored.write(&DATA).unwrap();
- sectored.rewind().unwrap();
- const ACTUAL_LEN: usize = DATA.len() + 1;
- let mut actual = BtCursor::new([0u8; ACTUAL_LEN]);
- let read = sectored.read_into(&mut actual, ACTUAL_LEN).unwrap();
- assert_eq!(DATA.len(), read);
- assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 0], actual.into_inner());
- }
- #[test]
- fn write_from_full_cursor() {
- const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap();
- let written = sectored
- .write_from(BtCursor::new(DATA), DATA.len())
- .unwrap();
- assert_eq!(DATA.len(), written);
- sectored.flush().unwrap();
- assert_eq!(&DATA, sectored.into_inner().into_inner().as_slice());
- }
- #[test]
- fn write_from_count_limits_bytes_read() {
- const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap();
- let mut cursor = BtCursor::new(DATA);
- let written = sectored.write_from(&mut cursor, DATA.len() / 2).unwrap();
- assert_eq!(DATA.len() / 2, written);
- sectored.flush().unwrap();
- assert_eq!(
- &[0, 1, 2, 3, 0, 0, 0, 0],
- sectored.into_inner().into_inner().as_slice()
- );
- let mut remaining = Vec::new();
- cursor.read_to_end(&mut remaining).unwrap();
- assert_eq!(&[4, 5, 6, 7], remaining.as_slice());
- }
- #[test]
- fn write_from_write_spans_multiple_sectors() {
- const SECT_SZ: usize = 4;
- const DATA: [u8; SECT_SZ + 1] = [0, 1, 2, 3, 4];
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
- let written = sectored
- .write_from(BtCursor::new(DATA), DATA.len())
- .unwrap();
- assert_eq!(DATA.len(), written);
- sectored.rewind().unwrap();
- let mut actual = Vec::new();
- sectored.read_to_end(&mut actual).unwrap();
- assert_eq!(&[0, 1, 2, 3, 4], actual.as_slice());
- }
- #[test]
- fn try_seek_to_second_sector() {
- const SECT_SZ: usize = 4;
- const DATA_LEN: usize = 2 * SECT_SZ;
- const DATA: [u8; DATA_LEN] = integer_array::<DATA_LEN>(0);
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
- let written = sectored
- .write_from(BtCursor::new(DATA), DATA.len())
- .unwrap();
- assert_eq!(DATA.len(), written);
- sectored.rewind().unwrap();
- const OFFSET: u64 = SECT_SZ as u64 + 1;
- sectored.try_seek(SeekFrom::Start(OFFSET)).unwrap();
- let mut actual = BtCursor::new(Vec::new());
- sectored.read_into(&mut actual, SECT_SZ).unwrap();
- const EXPECTED_LEN: usize = SECT_SZ - 1;
- const EXPECTED: [u8; EXPECTED_LEN] = integer_array::<EXPECTED_LEN>(OFFSET as u8);
- assert_eq!(&EXPECTED, actual.into_inner().as_slice());
- }
- #[test]
- fn seek_past_end_is_error() {
- const SECT_SZ: usize = 4;
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
- let result = sectored.seek(SeekFrom::Start(1));
- let matched = if let Err(err) = result {
- io::ErrorKind::InvalidInput == err.kind()
- } else {
- false
- };
- assert!(matched);
- }
- #[test]
- fn seek_to_zero_when_empty() {
- const SECT_SZ: usize = 4;
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
- let pos = sectored.seek(SeekFrom::Start(0)).unwrap();
- assert_eq!(0, pos);
- }
- #[test]
- fn read_into_consumes_remaining_sector() {
- // SECT_SZ % 2 is assumed be 0.
- const SECT_SZ: usize = 4;
- const DATA_LEN: usize = 2 * SECT_SZ;
- const DATA: [u8; DATA_LEN] = integer_array::<DATA_LEN>(0);
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
- sectored.write(&DATA).unwrap();
- const MID_FIRST: usize = SECT_SZ / 2;
- sectored.seek(SeekFrom::Start(MID_FIRST as u64)).unwrap();
- // Notice that the `count` argument plus the current `sectored.pos` equals `SECT_SZ`.
- // This will cause `pos / sect_sz` to increase by one, but without incrementing
- // `buf_start`. This creates a special case that `seek_impl` has to take into account.
- sectored
- .read_into(&mut BtCursor::new([0u8; MID_FIRST]), MID_FIRST)
- .unwrap();
- const MID_SECOND: u64 = 3 * SECT_SZ as u64 / 2;
- sectored.try_seek(SeekFrom::Start(MID_SECOND)).unwrap();
- const EXPECTED_LEN: usize = SECT_SZ / 2;
- const EXPECTED: [u8; EXPECTED_LEN] = integer_array::<EXPECTED_LEN>(MID_SECOND as u8);
- let mut actual = BtCursor::new(Vec::new());
- let nread = sectored.read_into(&mut actual, EXPECTED_LEN).unwrap();
- assert_eq!(EXPECTED_LEN, nread);
- assert_eq!(&EXPECTED, actual.into_inner().as_slice());
- }
- #[test]
- fn read_into_reads_nothing_when_at_end() {
- const SECT_SZ: usize = 8;
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
- sectored.write([1u8; 6].as_slice()).unwrap();
- let mut actual = Cursor::new(Vec::new());
- sectored.read_into(&mut actual, SECT_SZ).unwrap();
- assert_eq!(&[0u8; 0], actual.get_ref().as_slice());
- }
- #[test]
- fn zero_extend_less_than_sect_sz() {
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), 8)).unwrap();
- let written = sectored.write([1u8; 4].as_slice()).unwrap();
- assert_eq!(4, written);
- sectored.zero_extend(2).unwrap();
- sectored.rewind().unwrap();
- let mut actual = Cursor::new(Vec::new());
- sectored.read_into(&mut actual, 8).unwrap();
- assert_eq!(&[1, 1, 1, 1, 0, 0], actual.get_ref().as_slice());
- }
- #[test]
- fn zero_extend_multiple_sectors() {
- const SECT_SZ: usize = 8;
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
- let written = sectored.write([1u8; SECT_SZ / 2].as_slice()).unwrap();
- assert_eq!(SECT_SZ / 2, written);
- const EXPECTED_LEN: usize = 3 * SECT_SZ / 2;
- sectored.rewind().unwrap();
- // Note that zero_extend is called when the current position is 0. The current position
- // must not affect zero extension.
- sectored.zero_extend(EXPECTED_LEN as u64).unwrap();
- let mut actual = Cursor::new(Vec::new());
- sectored
- .read_into(&mut actual, EXPECTED_LEN + SECT_SZ / 2)
- .unwrap();
- let actual = actual.into_inner();
- assert_eq!(&[1, 1, 1, 1], &actual[..(SECT_SZ / 2)]);
- assert_eq!(&[0u8; EXPECTED_LEN], &actual[(SECT_SZ / 2)..]);
- }
- #[test]
- fn zero_extend_multiple_sectors_with_remainder() {
- const SECT_SZ: usize = 8;
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
- let written = sectored.write([1u8; SECT_SZ / 2].as_slice()).unwrap();
- assert_eq!(SECT_SZ / 2, written);
- // Notice that the total length of the inner stream will be 2 * SECT_SZ + 1.
- const EXPECTED_LEN: usize = 3 * SECT_SZ / 2 + 1;
- sectored.rewind().unwrap();
- sectored.zero_extend(EXPECTED_LEN as u64).unwrap();
- let mut actual = Cursor::new(Vec::new());
- sectored
- .read_into(&mut actual, EXPECTED_LEN + SECT_SZ / 2)
- .unwrap();
- let actual = actual.into_inner();
- assert_eq!(&[1, 1, 1, 1], &actual[..(SECT_SZ / 2)]);
- assert_eq!(&[0u8; EXPECTED_LEN], &actual[(SECT_SZ / 2)..]);
- }
- #[test]
- fn get_buf() {
- const SECT_SZ: usize = crate::SECTOR_SZ_DEFAULT;
- const DIVISOR: usize = 8;
- const READ_SZ: usize = SECT_SZ / DIVISOR;
- let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
- let mut expected = vec![0u8; READ_SZ];
- for index in 0..(DIVISOR as u8 + 1) {
- expected.fill(index + 1);
- sectored.write(&expected).unwrap();
- }
- sectored.rewind().unwrap();
- for index in 0..(DIVISOR as u8 + 1) {
- let offset = (READ_SZ * index as usize) as u64;
- sectored.try_seek(SeekFrom::Start(offset)).unwrap();
- let actual = sectored.get_buf(offset, READ_SZ as u64).unwrap();
- expected.fill(index + 1);
- assert!(actual == expected);
- }
- }
- }
|