|  | @@ -1,14 +1,19 @@
 | 
	
		
			
				|  |  | +use log::error;
 | 
	
		
			
				|  |  | +use positioned_io::Size;
 | 
	
		
			
				|  |  | +use std::io::{self, Read, Seek, SeekFrom, Write};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +use crate::{
 | 
	
		
			
				|  |  | +    bterr, suppress_err_if_non_zero, BlockError, BlockMeta, BoxInIoErr, Decompose, MetaAccess,
 | 
	
		
			
				|  |  | +    Positioned, ReadDual, ReadExt, Result, Sectored, SizeExt, Split, TryCompose, TrySeek,
 | 
	
		
			
				|  |  | +    EMPTY_SLICE,
 | 
	
		
			
				|  |  | +};
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  pub use private::SectoredBuf;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  mod private {
 | 
	
		
			
				|  |  | -    use log::{error, warn};
 | 
	
		
			
				|  |  | -    use positioned_io::Size;
 | 
	
		
			
				|  |  | -    use std::io::{self, Read, Seek, SeekFrom, Write};
 | 
	
		
			
				|  |  | +    use crate::SeekFromExt;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    use crate::{
 | 
	
		
			
				|  |  | -        bterr, BlockError, BlockMeta, BoxInIoErr, Decompose, MetaAccess, ReadExt, Result, Sectored,
 | 
	
		
			
				|  |  | -        Split, TryCompose, EMPTY_SLICE,
 | 
	
		
			
				|  |  | -    };
 | 
	
		
			
				|  |  | +    use super::*;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      /// A stream which buffers writes and read such that the inner stream only sees reads and writes
 | 
	
		
			
				|  |  |      /// of sector length buffers.
 | 
	
	
		
			
				|  | @@ -36,6 +41,25 @@ mod private {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    /// Returns the slice of the internal buffer which is ready to be read from.
 | 
	
		
			
				|  |  | +    /// If the buffer all the bytes in the buffer have been consumed, then the buffer is refilled.
 | 
	
		
			
				|  |  | +    /// The returned slice will be empty if and only if there are no additional bytes in the
 | 
	
		
			
				|  |  | +    /// inner stream.
 | 
	
		
			
				|  |  | +    macro_rules! readable_slice {
 | 
	
		
			
				|  |  | +        ($self:expr) => {{
 | 
	
		
			
				|  |  | +            let pos = $self.buf_pos();
 | 
	
		
			
				|  |  | +            let end = $self.buf_end();
 | 
	
		
			
				|  |  | +            if pos == end {
 | 
	
		
			
				|  |  | +                match $self.fill_internal_buf() {
 | 
	
		
			
				|  |  | +                    Ok(byte_ct) => Ok(&$self.buf[..byte_ct]),
 | 
	
		
			
				|  |  | +                    Err(err) => Err(err),
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            } else {
 | 
	
		
			
				|  |  | +                Ok(&$self.buf[pos..end])
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }};
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      impl<T> SectoredBuf<T> {
 | 
	
		
			
				|  |  |          /// Returns a reference to the inner stream.
 | 
	
		
			
				|  |  |          pub fn get_ref(&self) -> &T {
 | 
	
	
		
			
				|  | @@ -238,27 +262,17 @@ mod private {
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |              let dest_len_start = dest.len();
 | 
	
		
			
				|  |  | -            let mut src = {
 | 
	
		
			
				|  |  | -                let start = self.buf_pos();
 | 
	
		
			
				|  |  | -                let end = self.buf_end();
 | 
	
		
			
				|  |  | -                &self.buf[start..end]
 | 
	
		
			
				|  |  | -            };
 | 
	
		
			
				|  |  | +            let mut src = readable_slice!(self)?;
 | 
	
		
			
				|  |  |              while !dest.is_empty() {
 | 
	
		
			
				|  |  |                  if src.is_empty() {
 | 
	
		
			
				|  |  | -                    if self.pos >= self.len() {
 | 
	
		
			
				|  |  | -                        break;
 | 
	
		
			
				|  |  | -                    }
 | 
	
		
			
				|  |  | -                    let byte_ct = match self.fill_internal_buf() {
 | 
	
		
			
				|  |  | -                        Ok(byte_ct) => byte_ct,
 | 
	
		
			
				|  |  | -                        Err(err) => {
 | 
	
		
			
				|  |  | -                            warn!("SectoredBuf::full_internal_buf returned an error: {}", err);
 | 
	
		
			
				|  |  | -                            break;
 | 
	
		
			
				|  |  | -                        }
 | 
	
		
			
				|  |  | -                    };
 | 
	
		
			
				|  |  | -                    if 0 == byte_ct {
 | 
	
		
			
				|  |  | +                    src = suppress_err_if_non_zero!(
 | 
	
		
			
				|  |  | +                        dest_len_start - dest.len(),
 | 
	
		
			
				|  |  | +                        readable_slice!(self)
 | 
	
		
			
				|  |  | +                    );
 | 
	
		
			
				|  |  | +                    // If `src` is still empty, then we've reached the end of the stream.
 | 
	
		
			
				|  |  | +                    if src.is_empty() {
 | 
	
		
			
				|  |  |                          break;
 | 
	
		
			
				|  |  |                      }
 | 
	
		
			
				|  |  | -                    src = &self.buf[..byte_ct];
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  |                  let sz = src.len().min(dest.len());
 | 
	
		
			
				|  |  |                  dest[..sz].copy_from_slice(&src[..sz]);
 | 
	
	
		
			
				|  | @@ -270,30 +284,23 @@ mod private {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    impl<T: Seek + Read + Write + MetaAccess> Seek for SectoredBuf<T> {
 | 
	
		
			
				|  |  | -        fn seek(&mut self, seek_from: SeekFrom) -> io::Result<u64> {
 | 
	
		
			
				|  |  | +    impl<T: Seek + Read + AsRef<BlockMeta>> SectoredBuf<T> {
 | 
	
		
			
				|  |  | +        /// Seeks this stream to the given position.
 | 
	
		
			
				|  |  | +        /// If a seek to a different sector is needed then `pre_seek` is called before this seek
 | 
	
		
			
				|  |  | +        /// is performed. This can be used to flush buffered data, or to prevent the seek if a
 | 
	
		
			
				|  |  | +        /// flush can't be performed.
 | 
	
		
			
				|  |  | +        fn seek_impl<F: FnOnce(&mut Self) -> io::Result<()>>(
 | 
	
		
			
				|  |  | +            &mut self,
 | 
	
		
			
				|  |  | +            seek_from: SeekFrom,
 | 
	
		
			
				|  |  | +            pre_seek: F,
 | 
	
		
			
				|  |  | +        ) -> io::Result<u64> {
 | 
	
		
			
				|  |  |              let inner_pos = self.inner.stream_position()?;
 | 
	
		
			
				|  |  | -            let inner_pos_new = match seek_from {
 | 
	
		
			
				|  |  | -                SeekFrom::Start(rel_start) => rel_start,
 | 
	
		
			
				|  |  | -                SeekFrom::Current(rel_curr) => {
 | 
	
		
			
				|  |  | -                    if rel_curr > 0 {
 | 
	
		
			
				|  |  | -                        inner_pos + rel_curr as u64
 | 
	
		
			
				|  |  | -                    } else {
 | 
	
		
			
				|  |  | -                        inner_pos - rel_curr as u64
 | 
	
		
			
				|  |  | -                    }
 | 
	
		
			
				|  |  | -                }
 | 
	
		
			
				|  |  | -                SeekFrom::End(_) => {
 | 
	
		
			
				|  |  | -                    return Err(io::Error::new(
 | 
	
		
			
				|  |  | -                        io::ErrorKind::Unsupported,
 | 
	
		
			
				|  |  | -                        "seeking relative to the end of the stream is not supported",
 | 
	
		
			
				|  |  | -                    ))
 | 
	
		
			
				|  |  | -                }
 | 
	
		
			
				|  |  | -            };
 | 
	
		
			
				|  |  | +            let inner_pos_new = seek_from.abs(|| Ok(inner_pos), || self.size_or_err())?;
 | 
	
		
			
				|  |  |              let sect_index = self.sector_index(inner_pos);
 | 
	
		
			
				|  |  |              let sect_index_new = self.sector_index(inner_pos_new);
 | 
	
		
			
				|  |  |              let pos: u64 = self.pos.try_into().box_err()?;
 | 
	
		
			
				|  |  |              if sect_index != sect_index_new || pos == inner_pos {
 | 
	
		
			
				|  |  | -                self.flush()?;
 | 
	
		
			
				|  |  | +                pre_seek(self)?;
 | 
	
		
			
				|  |  |                  let sect_sz: u64 = self.sector_sz().try_into().box_err()?;
 | 
	
		
			
				|  |  |                  let seek_to = sect_index_new * sect_sz;
 | 
	
		
			
				|  |  |                  self.inner.seek(SeekFrom::Start(seek_to))?;
 | 
	
	
		
			
				|  | @@ -304,6 +311,12 @@ mod private {
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    impl<T: Seek + Read + Write + MetaAccess> Seek for SectoredBuf<T> {
 | 
	
		
			
				|  |  | +        fn seek(&mut self, seek_from: SeekFrom) -> io::Result<u64> {
 | 
	
		
			
				|  |  | +            self.seek_impl(seek_from, |sich| sich.flush())
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      impl<U, T: AsRef<U>> AsRef<U> for SectoredBuf<T> {
 | 
	
		
			
				|  |  |          fn as_ref(&self) -> &U {
 | 
	
		
			
				|  |  |              self.inner.as_ref()
 | 
	
	
		
			
				|  | @@ -323,19 +336,62 @@ mod private {
 | 
	
		
			
				|  |  |              Ok(Some(self.inner.as_ref().body.secrets()?.size))
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    impl<T: Read + Seek + AsRef<BlockMeta>> ReadDual for SectoredBuf<T> {
 | 
	
		
			
				|  |  | +        fn read_into<W: Write>(&mut self, mut write: W, mut count: usize) -> io::Result<usize> {
 | 
	
		
			
				|  |  | +            let pos_start = self.pos;
 | 
	
		
			
				|  |  | +            let mut src = readable_slice!(self)?;
 | 
	
		
			
				|  |  | +            src = &src[..src.len().min(count)];
 | 
	
		
			
				|  |  | +            while count > 0 {
 | 
	
		
			
				|  |  | +                if src.is_empty() {
 | 
	
		
			
				|  |  | +                    src = suppress_err_if_non_zero!(self.pos - pos_start, readable_slice!(self));
 | 
	
		
			
				|  |  | +                    src = &src[..src.len().min(count)];
 | 
	
		
			
				|  |  | +                    // If `src` is still empty, then we've reached the end of the stream.
 | 
	
		
			
				|  |  | +                    if src.is_empty() {
 | 
	
		
			
				|  |  | +                        break;
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                let written = write.write(src)?;
 | 
	
		
			
				|  |  | +                src = &src[written..];
 | 
	
		
			
				|  |  | +                self.pos += written;
 | 
	
		
			
				|  |  | +                count -= written;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +            Ok(self.pos - pos_start)
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    impl<T> Positioned for SectoredBuf<T> {
 | 
	
		
			
				|  |  | +        fn pos(&self) -> usize {
 | 
	
		
			
				|  |  | +            self.pos
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    impl<T: Read + Seek + AsRef<BlockMeta>> TrySeek for SectoredBuf<T> {
 | 
	
		
			
				|  |  | +        fn try_seek(&mut self, seek_from: SeekFrom) -> io::Result<()> {
 | 
	
		
			
				|  |  | +            self.seek_impl(seek_from, |sich| {
 | 
	
		
			
				|  |  | +                if sich.dirty {
 | 
	
		
			
				|  |  | +                    Err(io::Error::new(
 | 
	
		
			
				|  |  | +                        io::ErrorKind::Unsupported,
 | 
	
		
			
				|  |  | +                        "SectoredBuf::try_seek failed because it has unwritten data",
 | 
	
		
			
				|  |  | +                    ))
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    Ok(())
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +            })?;
 | 
	
		
			
				|  |  | +            Ok(())
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #[cfg(test)]
 | 
	
		
			
				|  |  |  mod tests {
 | 
	
		
			
				|  |  | -    use std::io::{Read, Seek, SeekFrom, Write};
 | 
	
		
			
				|  |  | +    use super::*;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      use crate::{
 | 
	
		
			
				|  |  | -        test_helpers::{read_check, write_fill, Randomizer, SectoredCursor},
 | 
	
		
			
				|  |  | -        Decompose, ReadExt, Sectored, TryCompose, SECTOR_SZ_DEFAULT,
 | 
	
		
			
				|  |  | +        test_helpers::{read_check, write_fill, BtCursor, Randomizer, SectoredCursor},
 | 
	
		
			
				|  |  | +        SECTOR_SZ_DEFAULT,
 | 
	
		
			
				|  |  |      };
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    use super::*;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |      fn make_sectored_buf(sect_sz: usize, sect_ct: usize) -> SectoredBuf<SectoredCursor<Vec<u8>>> {
 | 
	
		
			
				|  |  |          SectoredBuf::new()
 | 
	
		
			
				|  |  |              .try_compose(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz))
 | 
	
	
		
			
				|  | @@ -529,4 +585,57 @@ mod tests {
 | 
	
		
			
				|  |  |          sectored.read(&mut actual).expect("read failed");
 | 
	
		
			
				|  |  |          assert_eq!(expected, actual);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    #[test]
 | 
	
		
			
				|  |  | +    fn read_into_count_limits_read_bytes() {
 | 
	
		
			
				|  |  | +        const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
 | 
	
		
			
				|  |  | +        let mut sectored = SectoredBuf::new()
 | 
	
		
			
				|  |  | +            .try_compose(SectoredCursor::new(Vec::new(), DATA.len()))
 | 
	
		
			
				|  |  | +            .unwrap();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        sectored.write(&DATA).unwrap();
 | 
	
		
			
				|  |  | +        sectored.rewind().unwrap();
 | 
	
		
			
				|  |  | +        let mut actual = BtCursor::new([0u8; DATA.len()]);
 | 
	
		
			
				|  |  | +        let read = sectored.read_into(&mut actual, DATA.len() - 1).unwrap();
 | 
	
		
			
				|  |  | +        assert_eq!(DATA.len() - 1, read);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        assert_eq!([0, 1, 2, 3, 4, 5, 6, 0], actual.into_inner());
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    #[test]
 | 
	
		
			
				|  |  | +    fn read_into_read_spans_multiple_sectors() {
 | 
	
		
			
				|  |  | +        const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
 | 
	
		
			
				|  |  | +        let mut sectored = SectoredBuf::new()
 | 
	
		
			
				|  |  | +            .try_compose(SectoredCursor::new(Vec::new(), DATA.len()))
 | 
	
		
			
				|  |  | +            .unwrap();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        sectored.write(&DATA).unwrap();
 | 
	
		
			
				|  |  | +        sectored.write(&DATA).unwrap();
 | 
	
		
			
				|  |  | +        sectored.rewind().unwrap();
 | 
	
		
			
				|  |  | +        const ACTUAL_LEN: usize = DATA.len() + DATA.len() / 2;
 | 
	
		
			
				|  |  | +        let mut actual = BtCursor::new([0u8; ACTUAL_LEN]);
 | 
	
		
			
				|  |  | +        let read = sectored.read_into(&mut actual, ACTUAL_LEN).unwrap();
 | 
	
		
			
				|  |  | +        assert_eq!(ACTUAL_LEN, read);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3], actual.into_inner());
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    /// Tests that a read asking for more bytes than the number available returns the number
 | 
	
		
			
				|  |  | +    /// available.
 | 
	
		
			
				|  |  | +    #[test]
 | 
	
		
			
				|  |  | +    fn read_into_read_past_len() {
 | 
	
		
			
				|  |  | +        const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
 | 
	
		
			
				|  |  | +        let mut sectored = SectoredBuf::new()
 | 
	
		
			
				|  |  | +            .try_compose(SectoredCursor::new(Vec::new(), DATA.len()))
 | 
	
		
			
				|  |  | +            .unwrap();
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        sectored.write(&DATA).unwrap();
 | 
	
		
			
				|  |  | +        sectored.rewind().unwrap();
 | 
	
		
			
				|  |  | +        const ACTUAL_LEN: usize = DATA.len() + 1;
 | 
	
		
			
				|  |  | +        let mut actual = BtCursor::new([0u8; ACTUAL_LEN]);
 | 
	
		
			
				|  |  | +        let read = sectored.read_into(&mut actual, ACTUAL_LEN).unwrap();
 | 
	
		
			
				|  |  | +        assert_eq!(DATA.len(), read);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 0], actual.into_inner());
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |  }
 |