123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495 |
- pub use private::SectoredBuf;
- mod private {
- use log::{error, warn};
- use std::io::{self, Read, Seek, SeekFrom, Write};
- use crate::{
- bterr, Block, BlockError, BlockMeta, BoxInIoErr, Decompose, MetaAccess, ReadExt, Result,
- Sectored, TryCompose,
- };
- /// A stream which buffers writes and read such that the inner stream only sees reads and writes
- /// of sector length buffers.
- pub struct SectoredBuf<T> {
- inner: T,
- buf: Vec<u8>,
- /// The offset into the inner stream which the zero offset byte in `buf` corresponds to.
- buf_start: usize,
- /// Indicates if the contents of `buf` have been written to, and so whether `buf` needs to
- /// be written back to `inner` before it is refilled.
- dirty: bool,
- /// The current position of this stream, expressed as an offset into the inner stream.
- pos: usize,
- }
- impl SectoredBuf<()> {
- pub fn new() -> SectoredBuf<()> {
- SectoredBuf {
- inner: (),
- buf: Vec::new(),
- buf_start: 0,
- dirty: false,
- pos: 0,
- }
- }
- }
- impl<T> SectoredBuf<T> {
- /// Returns the offset into the internal buffer that corresponds to the current position.
- fn buf_pos(&self) -> usize {
- self.pos - self.buf_start
- }
- /// Returns the index of the sector which is currently loaded into the buffer.
- fn sector_index(&self, pos: u64) -> u64 {
- pos / (self.sector_sz() as u64)
- }
- }
- impl<T: MetaAccess> SectoredBuf<T> {
- fn len(&self) -> usize {
- self.inner
- .meta_body()
- .secrets()
- .unwrap()
- .size
- .try_into()
- .unwrap()
- }
- /// Returns one more than the last index in the internal buffer which can be read.
- fn buf_end(&self) -> usize {
- let limit = self.len().min(self.buf_start + self.sector_sz());
- limit - self.buf_start
- }
- }
- impl<T: Read + Seek + MetaAccess> SectoredBuf<T> {
- /// Fills the internal buffer by reading from the inner stream at the current position
- /// and updates `self.buf_start` with the position read from.
- fn fill_internal_buf(&mut self) -> Result<usize> {
- self.buf_start = self.inner.stream_position()?.try_into().box_err()?;
- let read_bytes = if self.buf_start < self.len() {
- let read_bytes = self.inner.fill_buf(&mut self.buf)?;
- if read_bytes < self.buf.len() {
- return Err(bterr!(BlockError::IncorrectSize {
- expected: self.buf.len(),
- actual: read_bytes,
- }));
- }
- read_bytes
- } else {
- 0
- };
- Ok(read_bytes)
- }
- }
- impl Default for SectoredBuf<()> {
- fn default() -> Self {
- Self::new()
- }
- }
- impl<T> Decompose<T> for SectoredBuf<T> {
- fn into_inner(self) -> T {
- self.inner
- }
- }
- impl<T: Sectored + Read + Seek + MetaAccess> TryCompose<T, SectoredBuf<T>> for SectoredBuf<()> {
- type Error = crate::Error;
- fn try_compose(self, inner: T) -> Result<SectoredBuf<T>> {
- let sect_sz = inner.sector_sz();
- let mut sectored = SectoredBuf {
- inner,
- buf: self.buf,
- buf_start: 0,
- dirty: false,
- pos: 0,
- };
- sectored.buf.resize(sect_sz, 0);
- sectored.inner.seek(SeekFrom::Start(0))?;
- sectored.fill_internal_buf()?;
- Ok(sectored)
- }
- }
- impl<T> Sectored for SectoredBuf<T> {
- fn sector_sz(&self) -> usize {
- self.buf.len()
- }
- }
- impl<T: Seek + Read + Write + MetaAccess> Write for SectoredBuf<T> {
- fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
- let src_len_start = src.len();
- let mut dest = {
- let buf_pos = self.buf_pos();
- &mut self.buf[buf_pos..]
- };
- while !src.is_empty() {
- if dest.is_empty() {
- if let Err(err) = self.flush() {
- error!("A call to SectoredBuf::flush returned an error: {}", err);
- break;
- }
- dest = &mut self.buf[..];
- }
- let sz = src.len().min(dest.len());
- dest[..sz].copy_from_slice(&src[..sz]);
- dest = &mut dest[sz..];
- src = &src[sz..];
- self.dirty = sz > 0;
- self.pos += sz;
- self.inner.mut_meta_body().access_secrets(|secrets| {
- secrets.size = secrets.size.max(self.pos as u64);
- Ok(())
- })?;
- }
- Ok(src_len_start - src.len())
- }
- fn flush(&mut self) -> io::Result<()> {
- if !self.dirty {
- return Ok(());
- }
- // Write out the contents of the buffer.
- let sect_sz: u64 = self.sector_sz().try_into().box_err()?;
- let inner_pos = self.inner.stream_position()?;
- let inner_pos_usize: usize = inner_pos.try_into().box_err()?;
- let is_new_sector = self.pos > inner_pos_usize;
- let is_full = (self.buf.len() - self.buf_pos()) == 0;
- let (seek_to, fill_internal_buf) = if is_new_sector {
- if is_full {
- (inner_pos + sect_sz, true)
- } else {
- (inner_pos, true)
- }
- } else {
- // The contents of the buffer were previously read from inner, so we write the
- // updated contents to the same offset.
- let sect_start: u64 = self.buf_start.try_into().box_err()?;
- self.inner.seek(SeekFrom::Start(sect_start))?;
- if is_full {
- (inner_pos, true)
- } else {
- // This is the one case were we don't have to refill the internal buffer.
- (inner_pos - sect_sz, false)
- }
- };
- self.inner.write_all(&self.buf)?;
- self.inner.flush()?;
- // Seek to the next position.
- self.inner.seek(SeekFrom::Start(seek_to))?;
- if fill_internal_buf {
- self.fill_internal_buf()?;
- }
- self.dirty = false;
- Ok(())
- }
- }
- impl<T: Read + Seek + MetaAccess> Read for SectoredBuf<T> {
- fn read(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
- if self.pos == self.len() {
- return Ok(0);
- }
- let dest_len_start = dest.len();
- let mut src = {
- let start = self.buf_pos();
- let end = self.buf_end();
- &self.buf[start..end]
- };
- while !dest.is_empty() {
- if src.is_empty() {
- if self.pos >= self.len() {
- break;
- }
- let byte_ct = match self.fill_internal_buf() {
- Ok(byte_ct) => byte_ct,
- Err(err) => {
- warn!("SectoredBuf::full_internal_buf returned an error: {}", err);
- break;
- }
- };
- if 0 == byte_ct {
- break;
- }
- src = &self.buf[..byte_ct];
- }
- let sz = src.len().min(dest.len());
- dest[..sz].copy_from_slice(&src[..sz]);
- dest = &mut dest[sz..];
- src = &src[sz..];
- self.pos += sz;
- }
- Ok(dest_len_start - dest.len())
- }
- }
- impl<T: Seek + Read + Write + MetaAccess> Seek for SectoredBuf<T> {
- fn seek(&mut self, seek_from: SeekFrom) -> io::Result<u64> {
- let inner_pos = self.inner.stream_position()?;
- let inner_pos_new = match seek_from {
- SeekFrom::Start(rel_start) => rel_start,
- SeekFrom::Current(rel_curr) => {
- if rel_curr > 0 {
- inner_pos + rel_curr as u64
- } else {
- inner_pos - rel_curr as u64
- }
- }
- SeekFrom::End(_) => {
- return Err(io::Error::new(
- io::ErrorKind::Unsupported,
- "seeking relative to the end of the stream is not supported",
- ))
- }
- };
- let sect_index = self.sector_index(inner_pos);
- let sect_index_new = self.sector_index(inner_pos_new);
- let pos: u64 = self.pos.try_into().box_err()?;
- if sect_index != sect_index_new || pos == inner_pos {
- self.flush()?;
- let sect_sz: u64 = self.sector_sz().try_into().box_err()?;
- let seek_to = sect_index_new * sect_sz;
- self.inner.seek(SeekFrom::Start(seek_to))?;
- self.fill_internal_buf()?;
- }
- self.pos = inner_pos_new.try_into().box_err()?;
- Ok(inner_pos_new)
- }
- }
- impl<T: AsRef<BlockMeta>> AsRef<BlockMeta> for SectoredBuf<T> {
- fn as_ref(&self) -> &BlockMeta {
- self.inner.as_ref()
- }
- }
- impl<T: AsMut<BlockMeta>> AsMut<BlockMeta> for SectoredBuf<T> {
- fn as_mut(&mut self) -> &mut BlockMeta {
- self.inner.as_mut()
- }
- }
- impl<T: MetaAccess> MetaAccess for SectoredBuf<T> {}
- impl<T: Block> Block for SectoredBuf<T> {
- fn flush_meta(&mut self) -> Result<()> {
- self.inner.flush_meta()
- }
- }
- }
- #[cfg(test)]
- mod tests {
- use std::io::{Read, Seek, SeekFrom, Write};
- use crate::{
- test_helpers::{read_check, write_fill, Randomizer, SectoredCursor},
- Decompose, ReadExt, Sectored, TryCompose, SECTOR_SZ_DEFAULT,
- };
- use super::*;
- fn make_sectored_buf(sect_sz: usize, sect_ct: usize) -> SectoredBuf<SectoredCursor<Vec<u8>>> {
- SectoredBuf::new()
- .try_compose(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz))
- .expect("compose for sectored buffer failed")
- }
- #[test]
- fn sectored_buf_fill_inner() {
- const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
- const SECT_CT: usize = 16;
- let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
- let sect_sz = sectored.sector_sz();
- assert_eq!(0, sect_sz % 16);
- let chunk_sz = sect_sz / 16;
- let chunk_ct = SECT_CT * 16;
- write_fill(&mut sectored, chunk_sz, chunk_ct);
- }
- #[test]
- fn sectored_buf_write_read_sequential() {
- const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
- const SECT_CT: usize = 16;
- let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
- let sect_sz = sectored.sector_sz();
- assert_eq!(0, sect_sz % 16);
- let chunk_sz = sect_sz / 16;
- // We subtract one here so that the underlying buffer is not completely filled. This
- // exercises the length limiting capability of the sectored buffer.
- let chunk_ct = SECT_CT * 16 - 1;
- write_fill(&mut sectored, chunk_sz, chunk_ct);
- sectored.seek(SeekFrom::Start(0)).expect("seek failed");
- read_check(&mut sectored, chunk_sz, chunk_ct);
- }
- #[test]
- fn sectored_buf_len_preserved() {
- const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
- const SECT_CT: usize = 16;
- let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
- let expected = vec![42u8; 12];
- // We need to ensure that writing expected will not fill up the buffer in sectored.
- assert!(expected.len() < sectored.sector_sz());
- sectored.write_all(&expected).expect("write failed");
- sectored.flush().expect("flush failed");
- let inner = sectored.into_inner();
- let mut sectored = SectoredBuf::new()
- .try_compose(inner)
- .expect("failed to compose sectored buffer");
- let mut actual = vec![0u8; expected.len()];
- sectored
- .fill_buf(actual.as_mut_slice())
- .expect("failed to fill actual");
- assert_eq!(expected, actual);
- }
- #[test]
- fn sectored_buf_seek() {
- let sect_sz = 16usize;
- let sect_ct = 16usize;
- let cap = sect_sz * sect_ct - std::mem::size_of::<usize>();
- let source = {
- let mut source = Vec::with_capacity(cap);
- source.extend(
- std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
- .take(cap),
- );
- source
- };
- let mut sectored = make_sectored_buf(sect_sz, sect_ct);
- sectored.write(&source).expect("write failed");
- let mut buf = [0u8; 1];
- let end = cap.try_into().expect("cap cannot fit into a u8");
- for pos in (0..end).rev() {
- sectored
- .seek(SeekFrom::Start(pos as u64))
- .expect("seek failed");
- sectored.read(&mut buf).expect("read failed");
- assert_eq!(pos, buf[0]);
- }
- }
- /// Tests that data written can be read from the buffer without an intervening call to `flush`.
- #[test]
- fn sectored_buf_write_then_read() {
- const EXPECTED: &[u8] = b"alpha";
- let mut sectored = make_sectored_buf(4096, 1);
- sectored.write(EXPECTED).expect("write failed");
- sectored.seek(SeekFrom::Start(0)).expect("seek failed");
- let mut actual = [0u8; EXPECTED.len()];
- sectored.read(&mut actual).expect("read failed");
- assert_eq!(EXPECTED, actual);
- }
- #[test]
- fn sectored_buf_write_read_random() {
- const SECT_SZ: usize = 16;
- const SECT_CT: usize = 16;
- const CAP: usize = SECT_SZ * SECT_CT - std::mem::size_of::<usize>();
- let source = {
- let mut expected = Vec::with_capacity(CAP);
- expected.extend(
- std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
- .take(CAP),
- );
- expected
- };
- let indices: Vec<(usize, usize)> = {
- let rando = Randomizer::new([3u8; Randomizer::HASH.len()]);
- let rando2 = Randomizer::new([5u8; Randomizer::HASH.len()]);
- rando
- .zip(rando2)
- .take(SECT_CT)
- .map(|(mut first, mut second)| {
- first %= source.len();
- second &= source.len();
- let low = first.min(second);
- let high = first.max(second);
- (low, high)
- })
- .collect()
- };
- let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
- sectored
- .write_all(&[0u8; CAP])
- .expect("failed to fill sectored");
- sectored.flush().expect("flush failed");
- for (_k, (low, high)) in indices.iter().enumerate() {
- sectored
- .seek(SeekFrom::Start(*low as u64))
- .expect("seek failed");
- let src = &source[*low..*high];
- sectored.write(src).expect("write failed");
- }
- sectored.flush().expect("flush failed");
- let mut buf = vec![0u8; CAP];
- for (_k, (low, high)) in indices.iter().enumerate() {
- sectored
- .seek(SeekFrom::Start(*low as u64))
- .expect("seek failed");
- let actual = &mut buf[*low..*high];
- sectored.fill_buf(actual).expect("read failed");
- let expected = &source[*low..*high];
- assert_eq!(expected, actual);
- }
- }
- #[test]
- fn sectored_buf_read_past_end() {
- const LEN: usize = 32;
- let mut sectored = SectoredBuf::new()
- .try_compose(SectoredCursor::new([0u8; LEN], LEN))
- .expect("compose failed");
- const BUF_LEN: usize = LEN + 1;
- sectored.write(&[1u8; BUF_LEN - 1]).expect("write failed");
- sectored.seek(SeekFrom::Start(0)).expect("seek failed");
- let mut buf = [0u8; BUF_LEN];
- // Note that buf is one byte longer than the available capacity in the cursor.
- sectored.read(&mut buf).expect("read failed");
- assert_eq!(&[1u8; BUF_LEN - 1], &buf[..(BUF_LEN - 1)]);
- assert_eq!(0u8, buf[BUF_LEN - 1]);
- }
- /// Tests that the data written in try_compose is actually written back to the underlying stream.
- #[test]
- fn sectored_buf_write_back() {
- let mut sectored = SectoredBuf::new()
- .try_compose(SectoredCursor::new(vec![0u8; 24], 16))
- .expect("compose failed");
- let expected = [1u8; 8];
- sectored.write(&expected).expect("first write failed");
- sectored.write(&[2u8; 8]).expect("second write failed");
- sectored.seek(SeekFrom::Start(0)).expect("seek failed");
- let mut actual = [0u8; 8];
- sectored.read(&mut actual).expect("read failed");
- assert_eq!(expected, actual);
- }
- #[test]
- fn sectored_buf_write_past_end() {
- const LEN: usize = 8;
- let mut sectored = SectoredBuf::new()
- .try_compose(SectoredCursor::new(vec![0u8; 0], LEN))
- .expect("compos failed");
- let expected = [1u8; LEN + 1];
- sectored.write(&expected).expect("write failed");
- sectored.seek(SeekFrom::Start(0)).expect("seek failed");
- let mut actual = [0u8; LEN + 1];
- sectored.read(&mut actual).expect("read failed");
- assert_eq!(expected, actual);
- }
- }
|