|
@@ -4,15 +4,13 @@ use std::io::{self, Read, Seek, SeekFrom, Write};
|
|
|
|
|
|
use crate::{
|
|
|
bterr, suppress_err_if_non_zero, BlockError, BlockMeta, BoxInIoErr, Decompose, MetaAccess,
|
|
|
- Positioned, ReadDual, ReadExt, Result, Sectored, SizeExt, Split, TryCompose, TrySeek,
|
|
|
- EMPTY_SLICE,
|
|
|
+ Positioned, ReadDual, ReadExt, Result, Sectored, SeekFromExt, SizeExt, Split, TryCompose,
|
|
|
+ TrySeek, WriteDual, EMPTY_SLICE,
|
|
|
};
|
|
|
|
|
|
pub use private::SectoredBuf;
|
|
|
|
|
|
mod private {
|
|
|
- use crate::SeekFromExt;
|
|
|
-
|
|
|
use super::*;
|
|
|
|
|
|
/// A stream which buffers writes and read such that the inner stream only sees reads and writes
|
|
@@ -51,7 +49,13 @@ mod private {
|
|
|
let end = $self.buf_end();
|
|
|
if pos == end {
|
|
|
match $self.fill_internal_buf() {
|
|
|
- Ok(byte_ct) => Ok(&$self.buf[..byte_ct]),
|
|
|
+ Ok(nread) => {
|
|
|
+ if nread > 0 {
|
|
|
+ Ok(&$self.buf[..$self.buf_end()])
|
|
|
+ } else {
|
|
|
+ Ok(&$self.buf[..0])
|
|
|
+ }
|
|
|
+ }
|
|
|
Err(err) => Err(err),
|
|
|
}
|
|
|
} else {
|
|
@@ -192,10 +196,7 @@ mod private {
|
|
|
};
|
|
|
while !src.is_empty() {
|
|
|
if dest.is_empty() {
|
|
|
- if let Err(err) = self.flush() {
|
|
|
- error!("A call to SectoredBuf::flush returned an error: {}", err);
|
|
|
- break;
|
|
|
- }
|
|
|
+ suppress_err_if_non_zero!(src_len_start - src.len(), self.flush());
|
|
|
dest = &mut self.buf[..];
|
|
|
}
|
|
|
let sz = src.len().min(dest.len());
|
|
@@ -337,6 +338,39 @@ mod private {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ impl<T: Read + Write + Seek + MetaAccess> WriteDual for SectoredBuf<T> {
|
|
|
+ fn write_from<R: Read>(&mut self, mut read: R, mut count: usize) -> io::Result<usize> {
|
|
|
+ let pos_start = self.pos;
|
|
|
+ let mut dest = {
|
|
|
+ let pos = self.buf_pos();
|
|
|
+ &mut self.buf[pos..]
|
|
|
+ };
|
|
|
+ let dest_len = dest.len();
|
|
|
+ dest = &mut dest[..dest_len.min(count)];
|
|
|
+ while count > 0 {
|
|
|
+ if dest.is_empty() {
|
|
|
+ suppress_err_if_non_zero!(self.pos - pos_start, self.flush());
|
|
|
+ dest = &mut self.buf[..];
|
|
|
+ let dest_len = dest.len();
|
|
|
+ dest = &mut dest[..dest_len.min(count)];
|
|
|
+ }
|
|
|
+ let nread = suppress_err_if_non_zero!(self.pos - pos_start, read.read(dest));
|
|
|
+ if 0 == nread {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ self.dirty = true;
|
|
|
+ dest = &mut dest[nread..];
|
|
|
+ self.pos += nread;
|
|
|
+ count -= nread;
|
|
|
+ self.inner.mut_meta_body().access_secrets(|secrets| {
|
|
|
+ secrets.size = secrets.size.max(self.pos as u64);
|
|
|
+ Ok(())
|
|
|
+ })?;
|
|
|
+ }
|
|
|
+ Ok(self.pos - pos_start)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
impl<T: Read + Seek + AsRef<BlockMeta>> ReadDual for SectoredBuf<T> {
|
|
|
fn read_into<W: Write>(&mut self, mut write: W, mut count: usize) -> io::Result<usize> {
|
|
|
let pos_start = self.pos;
|
|
@@ -638,4 +672,60 @@ mod tests {
|
|
|
|
|
|
assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 0], actual.into_inner());
|
|
|
}
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn write_from_full_cursor() {
|
|
|
+ const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
|
|
|
+ let mut sectored = SectoredBuf::new()
|
|
|
+ .try_compose(SectoredCursor::new(Vec::new(), DATA.len()))
|
|
|
+ .unwrap();
|
|
|
+
|
|
|
+ let written = sectored
|
|
|
+ .write_from(BtCursor::new(DATA), DATA.len())
|
|
|
+ .unwrap();
|
|
|
+ assert_eq!(DATA.len(), written);
|
|
|
+ sectored.flush().unwrap();
|
|
|
+
|
|
|
+ assert_eq!(&DATA, sectored.into_inner().into_inner().as_slice());
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn write_from_count_limits_bytes_read() {
|
|
|
+ const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
|
|
|
+ let mut sectored = SectoredBuf::new()
|
|
|
+ .try_compose(SectoredCursor::new(Vec::new(), DATA.len()))
|
|
|
+ .unwrap();
|
|
|
+ let mut cursor = BtCursor::new(DATA);
|
|
|
+
|
|
|
+ let written = sectored.write_from(&mut cursor, DATA.len() / 2).unwrap();
|
|
|
+ assert_eq!(DATA.len() / 2, written);
|
|
|
+ sectored.flush().unwrap();
|
|
|
+
|
|
|
+ assert_eq!(
|
|
|
+ &[0, 1, 2, 3, 0, 0, 0, 0],
|
|
|
+ sectored.into_inner().into_inner().as_slice()
|
|
|
+ );
|
|
|
+ let mut remaining = Vec::new();
|
|
|
+ cursor.read_to_end(&mut remaining).unwrap();
|
|
|
+ assert_eq!(&[4, 5, 6, 7], remaining.as_slice());
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn write_from_write_spans_multiple_sectors() {
|
|
|
+ const SECT_SZ: usize = 4;
|
|
|
+ const DATA: [u8; SECT_SZ + 1] = [0, 1, 2, 3, 4];
|
|
|
+ let mut sectored = SectoredBuf::new()
|
|
|
+ .try_compose(SectoredCursor::new(Vec::new(), SECT_SZ))
|
|
|
+ .unwrap();
|
|
|
+
|
|
|
+ let written = sectored
|
|
|
+ .write_from(BtCursor::new(DATA), DATA.len())
|
|
|
+ .unwrap();
|
|
|
+ assert_eq!(DATA.len(), written);
|
|
|
+ sectored.rewind().unwrap();
|
|
|
+ let mut actual = Vec::new();
|
|
|
+ sectored.read_to_end(&mut actual).unwrap();
|
|
|
+
|
|
|
+ assert_eq!(&[0, 1, 2, 3, 4], actual.as_slice());
|
|
|
+ }
|
|
|
}
|