|
@@ -4,8 +4,6 @@ mod private {
|
|
|
use log::{error, warn};
|
|
|
use std::io::{self, Read, Seek, SeekFrom, Write};
|
|
|
|
|
|
- use btserde::{read_from, write_to};
|
|
|
-
|
|
|
use crate::{
|
|
|
bterr, Block, BlockError, BlockMeta, BoxInIoErr, Decompose, MetaAccess, ReadExt, Result,
|
|
|
Sectored, TryCompose,
|
|
@@ -21,9 +19,6 @@ mod private {
|
|
|
/// Indicates if the contents of `buf` have been written to, and so whether `buf` needs to
|
|
|
/// be written back to `inner` before it is refilled.
|
|
|
dirty: bool,
|
|
|
- /// The total number of bytes that have been written to the inner stream, including the
|
|
|
- /// reserved bytes at the beginning.
|
|
|
- len: usize,
|
|
|
/// The current position of this stream, expressed as an offset into the inner stream.
|
|
|
pos: usize,
|
|
|
}
|
|
@@ -35,55 +30,47 @@ mod private {
|
|
|
buf: Vec::new(),
|
|
|
buf_start: 0,
|
|
|
dirty: false,
|
|
|
- len: 0,
|
|
|
pos: 0,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
impl<T> SectoredBuf<T> {
|
|
|
- /// The number of bytes at the beginning of the inner stream which are reserved to store the
|
|
|
- /// length of data written. All offsets into the stored data must be shifted by this amount
|
|
|
- /// to be translated to an offset in the inner stream.
|
|
|
- pub(crate) const RESERVED: usize = std::mem::size_of::<usize>();
|
|
|
-
|
|
|
- /// Returns the position in the inner stream which the given position in this stream
|
|
|
- /// corresponds to.
|
|
|
- fn inner_pos(self_pos: u64) -> u64 {
|
|
|
- let offset: u64 = Self::RESERVED.try_into().unwrap();
|
|
|
- self_pos + offset
|
|
|
+ /// Returns the offset into the internal buffer that corresponds to the current position.
|
|
|
+ fn buf_pos(&self) -> usize {
|
|
|
+ self.pos - self.buf_start
|
|
|
}
|
|
|
|
|
|
- /// Returns the position in this stream which the given position in the inner stream
|
|
|
- /// corresponds to.
|
|
|
- fn self_pos(inner_pos: u64) -> u64 {
|
|
|
- let offset: u64 = Self::RESERVED.try_into().unwrap();
|
|
|
- inner_pos - offset
|
|
|
+ /// Returns the index of the sector which is currently loaded into the buffer.
|
|
|
+ fn sector_index(&self, pos: u64) -> u64 {
|
|
|
+ pos / (self.sector_sz() as u64)
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- /// Returns the offset into the internal buffer that corresponds to the current position.
|
|
|
- fn buf_pos(&self) -> usize {
|
|
|
- self.pos - self.buf_start
|
|
|
+ impl<T: MetaAccess> SectoredBuf<T> {
|
|
|
+ fn len(&self) -> usize {
|
|
|
+ self.inner
|
|
|
+ .meta_body()
|
|
|
+ .secrets()
|
|
|
+ .unwrap()
|
|
|
+ .size
|
|
|
+ .try_into()
|
|
|
+ .unwrap()
|
|
|
}
|
|
|
|
|
|
/// Returns one more than the last index in the internal buffer which can be read.
|
|
|
fn buf_end(&self) -> usize {
|
|
|
- let limit = self.len.min(self.buf_start + self.sector_sz());
|
|
|
+ let limit = self.len().min(self.buf_start + self.sector_sz());
|
|
|
limit - self.buf_start
|
|
|
}
|
|
|
-
|
|
|
- /// Returns the index of the sector which is currently loaded into the buffer.
|
|
|
- fn buf_sector_index(&self) -> usize {
|
|
|
- self.pos / self.sector_sz()
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
- impl<T: Read + Seek> SectoredBuf<T> {
|
|
|
+ impl<T: Read + Seek + MetaAccess> SectoredBuf<T> {
|
|
|
/// Fills the internal buffer by reading from the inner stream at the current position
|
|
|
/// and updates `self.buf_start` with the position read from.
|
|
|
fn fill_internal_buf(&mut self) -> Result<usize> {
|
|
|
self.buf_start = self.inner.stream_position()?.try_into().box_err()?;
|
|
|
- let read_bytes = if self.buf_start < self.len {
|
|
|
+ let read_bytes = if self.buf_start < self.len() {
|
|
|
let read_bytes = self.inner.fill_buf(&mut self.buf)?;
|
|
|
if read_bytes < self.buf.len() {
|
|
|
return Err(bterr!(BlockError::IncorrectSize {
|
|
@@ -111,52 +98,20 @@ mod private {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- impl<T: Sectored + Read + Seek> TryCompose<T, SectoredBuf<T>> for SectoredBuf<()> {
|
|
|
+ impl<T: Sectored + Read + Seek + MetaAccess> TryCompose<T, SectoredBuf<T>> for SectoredBuf<()> {
|
|
|
type Error = crate::Error;
|
|
|
fn try_compose(self, inner: T) -> Result<SectoredBuf<T>> {
|
|
|
let sect_sz = inner.sector_sz();
|
|
|
- if sect_sz < Self::RESERVED {
|
|
|
- return Err(bterr!(
|
|
|
- "a sector size of at least {} is required. Got {}",
|
|
|
- Self::RESERVED,
|
|
|
- sect_sz,
|
|
|
- ));
|
|
|
- }
|
|
|
let mut sectored = SectoredBuf {
|
|
|
inner,
|
|
|
buf: self.buf,
|
|
|
buf_start: 0,
|
|
|
dirty: false,
|
|
|
- len: Self::RESERVED,
|
|
|
- pos: Self::RESERVED,
|
|
|
+ pos: 0,
|
|
|
};
|
|
|
- sectored.inner.seek(SeekFrom::Start(0))?;
|
|
|
sectored.buf.resize(sect_sz, 0);
|
|
|
- let len_stored = match sectored.fill_internal_buf() {
|
|
|
- Ok(bytes_read) => bytes_read >= Self::RESERVED,
|
|
|
- Err(err) => {
|
|
|
- let err = err.downcast::<BlockError>()?;
|
|
|
- match err {
|
|
|
- BlockError::IncorrectSize { actual, expected } => {
|
|
|
- if actual > 0 {
|
|
|
- return Err(bterr!(BlockError::IncorrectSize { expected, actual }));
|
|
|
- }
|
|
|
- // When the actual size was 0 that just means the inner stream was
|
|
|
- // empty, which is not an error.
|
|
|
- false
|
|
|
- }
|
|
|
- err => return Err(bterr!(err)),
|
|
|
- }
|
|
|
- }
|
|
|
- };
|
|
|
- if len_stored {
|
|
|
- if let Ok(len) = read_from::<u64, _>(&mut sectored.buf.as_slice()) {
|
|
|
- sectored.len = len.try_into()?;
|
|
|
- }
|
|
|
- } else {
|
|
|
- write_to(&Self::RESERVED, &mut sectored.buf.as_mut_slice())?;
|
|
|
- sectored.dirty = true;
|
|
|
- }
|
|
|
+ sectored.inner.seek(SeekFrom::Start(0))?;
|
|
|
+ sectored.fill_internal_buf()?;
|
|
|
Ok(sectored)
|
|
|
}
|
|
|
}
|
|
@@ -188,7 +143,10 @@ mod private {
|
|
|
src = &src[sz..];
|
|
|
self.dirty = sz > 0;
|
|
|
self.pos += sz;
|
|
|
- self.len = self.len.max(self.pos);
|
|
|
+ self.inner.mut_meta_body().access_secrets(|secrets| {
|
|
|
+ secrets.size = secrets.size.max(self.pos as u64);
|
|
|
+ Ok(())
|
|
|
+ })?;
|
|
|
}
|
|
|
Ok(src_len_start - src.len())
|
|
|
}
|
|
@@ -204,11 +162,11 @@ mod private {
|
|
|
let inner_pos_usize: usize = inner_pos.try_into().box_err()?;
|
|
|
let is_new_sector = self.pos > inner_pos_usize;
|
|
|
let is_full = (self.buf.len() - self.buf_pos()) == 0;
|
|
|
- let seek_to = if is_new_sector {
|
|
|
+ let (seek_to, fill_internal_buf) = if is_new_sector {
|
|
|
if is_full {
|
|
|
- inner_pos + sect_sz
|
|
|
+ (inner_pos + sect_sz, true)
|
|
|
} else {
|
|
|
- inner_pos
|
|
|
+ (inner_pos, true)
|
|
|
}
|
|
|
} else {
|
|
|
// The contents of the buffer were previously read from inner, so we write the
|
|
@@ -216,38 +174,29 @@ mod private {
|
|
|
let sect_start: u64 = self.buf_start.try_into().box_err()?;
|
|
|
self.inner.seek(SeekFrom::Start(sect_start))?;
|
|
|
if is_full {
|
|
|
- inner_pos
|
|
|
+ (inner_pos, true)
|
|
|
} else {
|
|
|
- inner_pos - sect_sz
|
|
|
+ // This is the one case were we don't have to refill the internal buffer.
|
|
|
+ (inner_pos - sect_sz, false)
|
|
|
}
|
|
|
};
|
|
|
self.inner.write_all(&self.buf)?;
|
|
|
-
|
|
|
- // Update the stored length.
|
|
|
- self.inner.mut_meta_body().access_secrets(|secrets| {
|
|
|
- secrets.size = (self.len - Self::RESERVED).try_into().box_err()?;
|
|
|
- Ok(())
|
|
|
- })?;
|
|
|
- self.inner.seek(SeekFrom::Start(0))?;
|
|
|
- self.fill_internal_buf()?;
|
|
|
- let len: u64 = self.len.try_into().box_err()?;
|
|
|
- write_to(&len, &mut self.buf.as_mut_slice()).box_err()?;
|
|
|
- self.inner.seek(SeekFrom::Start(0))?;
|
|
|
- self.inner.write_all(&self.buf)?;
|
|
|
self.inner.flush()?;
|
|
|
|
|
|
// Seek to the next position.
|
|
|
self.inner.seek(SeekFrom::Start(seek_to))?;
|
|
|
- self.fill_internal_buf()?;
|
|
|
- self.dirty = false;
|
|
|
+ if fill_internal_buf {
|
|
|
+ self.fill_internal_buf()?;
|
|
|
+ }
|
|
|
|
|
|
+ self.dirty = false;
|
|
|
Ok(())
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- impl<T: Read + Seek> Read for SectoredBuf<T> {
|
|
|
+ impl<T: Read + Seek + MetaAccess> Read for SectoredBuf<T> {
|
|
|
fn read(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
|
|
|
- if self.pos == self.len {
|
|
|
+ if self.pos == self.len() {
|
|
|
return Ok(0);
|
|
|
}
|
|
|
|
|
@@ -259,7 +208,7 @@ mod private {
|
|
|
};
|
|
|
while !dest.is_empty() {
|
|
|
if src.is_empty() {
|
|
|
- if self.pos >= self.len {
|
|
|
+ if self.pos >= self.len() {
|
|
|
break;
|
|
|
}
|
|
|
let byte_ct = match self.fill_internal_buf() {
|
|
@@ -285,10 +234,10 @@ mod private {
|
|
|
}
|
|
|
|
|
|
impl<T: Seek + Read + Write + MetaAccess> Seek for SectoredBuf<T> {
|
|
|
- fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
|
|
|
+ fn seek(&mut self, seek_from: SeekFrom) -> io::Result<u64> {
|
|
|
let inner_pos = self.inner.stream_position()?;
|
|
|
- let inner_pos_new = match pos {
|
|
|
- SeekFrom::Start(rel_start) => Self::inner_pos(rel_start),
|
|
|
+ let inner_pos_new = match seek_from {
|
|
|
+ SeekFrom::Start(rel_start) => rel_start,
|
|
|
SeekFrom::Current(rel_curr) => {
|
|
|
if rel_curr > 0 {
|
|
|
inner_pos + rel_curr as u64
|
|
@@ -303,18 +252,18 @@ mod private {
|
|
|
))
|
|
|
}
|
|
|
};
|
|
|
- let sect_sz = self.sector_sz();
|
|
|
- let sect_index = self.buf_sector_index();
|
|
|
- let sect_index_new = TryInto::<usize>::try_into(inner_pos_new).box_err()? / sect_sz;
|
|
|
+ let sect_index = self.sector_index(inner_pos);
|
|
|
+ let sect_index_new = self.sector_index(inner_pos_new);
|
|
|
let pos: u64 = self.pos.try_into().box_err()?;
|
|
|
if sect_index != sect_index_new || pos == inner_pos {
|
|
|
self.flush()?;
|
|
|
- let seek_to: u64 = (sect_index_new * sect_sz).try_into().box_err()?;
|
|
|
+ let sect_sz: u64 = self.sector_sz().try_into().box_err()?;
|
|
|
+ let seek_to = sect_index_new * sect_sz;
|
|
|
self.inner.seek(SeekFrom::Start(seek_to))?;
|
|
|
self.fill_internal_buf()?;
|
|
|
}
|
|
|
self.pos = inner_pos_new.try_into().box_err()?;
|
|
|
- Ok(Self::self_pos(inner_pos_new))
|
|
|
+ Ok(inner_pos_new)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -384,13 +333,6 @@ mod tests {
|
|
|
read_check(&mut sectored, chunk_sz, chunk_ct);
|
|
|
}
|
|
|
|
|
|
- #[test]
|
|
|
- fn sectored_buf_sect_sz_too_small_is_error() {
|
|
|
- const MIN: usize = SectoredBuf::<()>::RESERVED;
|
|
|
- let result = SectoredBuf::new().try_compose(SectoredCursor::new([0u8; MIN], MIN - 1));
|
|
|
- assert!(result.is_err());
|
|
|
- }
|
|
|
-
|
|
|
#[test]
|
|
|
fn sectored_buf_len_preserved() {
|
|
|
const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
|
|
@@ -398,7 +340,7 @@ mod tests {
|
|
|
let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
|
|
|
let expected = vec![42u8; 12];
|
|
|
// We need to ensure that writing expected will not fill up the buffer in sectored.
|
|
|
- assert!(expected.len() < sectored.sector_sz() - SectoredBuf::<()>::RESERVED);
|
|
|
+ assert!(expected.len() < sectored.sector_sz());
|
|
|
|
|
|
sectored.write_all(&expected).expect("write failed");
|
|
|
sectored.flush().expect("flush failed");
|
|
@@ -512,7 +454,7 @@ mod tests {
|
|
|
let mut sectored = SectoredBuf::new()
|
|
|
.try_compose(SectoredCursor::new([0u8; LEN], LEN))
|
|
|
.expect("compose failed");
|
|
|
- const BUF_LEN: usize = LEN - SectoredBuf::<()>::RESERVED + 1;
|
|
|
+ const BUF_LEN: usize = LEN + 1;
|
|
|
sectored.write(&[1u8; BUF_LEN - 1]).expect("write failed");
|
|
|
sectored.seek(SeekFrom::Start(0)).expect("seek failed");
|
|
|
let mut buf = [0u8; BUF_LEN];
|