sectored_buf.rs 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945
  1. // SPDX-License-Identifier: AGPL-3.0-or-later
  2. //! Contains the [SectoredBuf] type.
  3. use log::error;
  4. use positioned_io::Size;
  5. use safemem::write_bytes;
  6. use std::io::{self, Read, Seek, SeekFrom, Write};
  7. use crate::{
  8. bterr, error::DisplayErr, suppress_err_if_non_zero, BlockError, BlockMeta, BoxInIoErr,
  9. Decompose, MetaAccess, Positioned, ReadDual, ReadExt, Result, Sectored, SeekFromExt, SizeExt,
  10. Split, TrySeek, WriteDual, ZeroExtendable, EMPTY_SLICE,
  11. };
  12. pub use private::SectoredBuf;
  13. mod private {
  14. use super::*;
  15. /// A stream which buffers writes and read such that the inner stream only sees reads and writes
  16. /// of sector length buffers.
  17. pub struct SectoredBuf<T> {
  18. inner: T,
  19. buf: Vec<u8>,
  20. /// The offset into the inner stream which the zero offset byte in `buf` corresponds to.
  21. buf_start: usize,
  22. /// Indicates if the contents of `buf` have been written to, and so whether `buf` needs to
  23. /// be written back to `inner` before it is refilled.
  24. dirty: bool,
  25. /// The current position of this stream, expressed as an offset into the inner stream.
  26. pos: usize,
  27. }
  28. impl<T: Sectored + Read + Seek + AsRef<BlockMeta>> SectoredBuf<T> {
  29. /// Creates a new [SectoredBuf] which buffers the given stream.
  30. pub fn new(inner: T) -> Result<SectoredBuf<T>> {
  31. let sect_sz = inner.sector_sz();
  32. let mut sectored = SectoredBuf {
  33. inner,
  34. buf: Vec::new(),
  35. buf_start: 0,
  36. dirty: false,
  37. pos: 0,
  38. };
  39. sectored.buf.resize(sect_sz, 0);
  40. sectored.inner.rewind()?;
  41. sectored.fill_internal_buf()?;
  42. Ok(sectored)
  43. }
  44. }
  45. impl<T: Read + Write + Seek + MetaAccess> SectoredBuf<T> {
  46. /// Updates the size stored in the metadata of the block.
  47. pub fn update_size(inner: &mut T, size: usize) -> Result<()> {
  48. inner.mut_meta_body().access_secrets(|secrets| {
  49. secrets.size = secrets.size.max(size as u64);
  50. Ok(())
  51. })
  52. }
  53. }
  54. impl<T> SectoredBuf<T> {
  55. /// Returns a reference to the inner stream.
  56. pub fn get_ref(&self) -> &T {
  57. &self.inner
  58. }
  59. /// Returns a mutable reference to the inner stream.
  60. pub fn get_mut(&mut self) -> &mut T {
  61. &mut self.inner
  62. }
  63. /// Returns the offset into the internal buffer that corresponds to the current position.
  64. fn buf_pos(&self) -> usize {
  65. let buf_pos = self.pos - self.buf_start;
  66. debug_assert!(buf_pos <= self.buf.len());
  67. buf_pos
  68. }
  69. }
  70. impl<T: AsRef<BlockMeta>> SectoredBuf<T> {
  71. fn len(&self) -> usize {
  72. self.inner
  73. .as_ref()
  74. .body
  75. .secrets()
  76. .unwrap()
  77. .size
  78. .try_into()
  79. .unwrap()
  80. }
  81. /// Returns one more than the last index in the internal buffer which can be read.
  82. fn buf_end(&self) -> usize {
  83. let len = self.len();
  84. let sect_sz = self.sector_sz();
  85. let limit = len.min(self.buf_start + sect_sz);
  86. limit - self.buf_start
  87. }
  88. }
  89. impl<T: Read + Seek + AsRef<BlockMeta>> SectoredBuf<T> {
  90. /// Fills the internal buffer by reading from the inner stream at the current position
  91. /// and updates `self.buf_start` with the position read from.
  92. fn fill_internal_buf(&mut self) -> Result<usize> {
  93. self.buf_start = self.inner.stream_position()?.try_into().box_err()?;
  94. let read_bytes = if self.buf_start < self.len() {
  95. let read_bytes = self.inner.fill_buf(&mut self.buf)?;
  96. if read_bytes < self.buf.len() {
  97. return Err(bterr!(BlockError::IncorrectSize {
  98. expected: self.buf.len(),
  99. actual: read_bytes,
  100. }));
  101. }
  102. read_bytes
  103. } else {
  104. 0
  105. };
  106. Ok(read_bytes)
  107. }
  108. }
  109. impl<T> Split<SectoredBuf<&'static [u8]>, T> for SectoredBuf<T> {
  110. fn split(self) -> (SectoredBuf<&'static [u8]>, T) {
  111. let new_self = SectoredBuf {
  112. inner: EMPTY_SLICE,
  113. buf: self.buf,
  114. buf_start: self.buf_start,
  115. dirty: self.dirty,
  116. pos: self.pos,
  117. };
  118. (new_self, self.inner)
  119. }
  120. fn combine(left: SectoredBuf<&'static [u8]>, right: T) -> Self {
  121. SectoredBuf {
  122. inner: right,
  123. buf: left.buf,
  124. buf_start: left.buf_start,
  125. dirty: left.dirty,
  126. pos: left.pos,
  127. }
  128. }
  129. }
  130. impl<T> Decompose<T> for SectoredBuf<T> {
  131. fn into_inner(self) -> T {
  132. self.inner
  133. }
  134. }
  135. impl<T> Sectored for SectoredBuf<T> {
  136. fn sector_sz(&self) -> usize {
  137. self.buf.len()
  138. }
  139. }
  140. impl<T: Seek + Read + Write + MetaAccess> Write for SectoredBuf<T> {
  141. fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
  142. let src_len_start = src.len();
  143. let mut dest = {
  144. let buf_pos = self.buf_pos();
  145. &mut self.buf[buf_pos..]
  146. };
  147. while !src.is_empty() {
  148. if dest.is_empty() {
  149. suppress_err_if_non_zero!(src_len_start - src.len(), self.flush());
  150. dest = &mut self.buf[..];
  151. }
  152. let sz = src.len().min(dest.len());
  153. dest[..sz].copy_from_slice(&src[..sz]);
  154. dest = &mut dest[sz..];
  155. src = &src[sz..];
  156. self.dirty = sz > 0;
  157. self.pos += sz;
  158. Self::update_size(&mut self.inner, self.pos)?;
  159. }
  160. Ok(src_len_start - src.len())
  161. }
  162. fn flush(&mut self) -> io::Result<()> {
  163. if !self.dirty {
  164. return Ok(());
  165. }
  166. // Write out the contents of the buffer.
  167. let sect_sz: u64 = self.sector_sz().try_into().box_err()?;
  168. let inner_pos = self.inner.stream_position()?;
  169. let inner_pos_usize: usize = inner_pos.try_into().box_err()?;
  170. let is_new_sector = self.pos > inner_pos_usize;
  171. let is_full = (self.buf.len() - self.buf_pos()) == 0;
  172. let (seek_to, fill_internal_buf) = if is_new_sector {
  173. if is_full {
  174. (inner_pos + sect_sz, true)
  175. } else {
  176. (inner_pos, true)
  177. }
  178. } else {
  179. // The contents of the buffer were previously read from inner, so we write the
  180. // updated contents to the same offset.
  181. let sect_start: u64 = self.buf_start.try_into().box_err()?;
  182. self.inner.seek(SeekFrom::Start(sect_start))?;
  183. if is_full {
  184. (inner_pos, true)
  185. } else {
  186. // This is the one case were we don't have to refill the internal buffer.
  187. (inner_pos - sect_sz, false)
  188. }
  189. };
  190. self.inner.write_all(&self.buf)?;
  191. self.inner.flush()?;
  192. // Seek to the next position.
  193. self.inner.seek(SeekFrom::Start(seek_to))?;
  194. if fill_internal_buf {
  195. self.fill_internal_buf()?;
  196. }
  197. self.dirty = false;
  198. Ok(())
  199. }
  200. }
  201. impl<T: Read + Write + Seek + MetaAccess> ZeroExtendable for SectoredBuf<T> {
  202. fn zero_extend(&mut self, num_zeros: u64) -> io::Result<()> {
  203. if num_zeros == 0 {
  204. return Ok(());
  205. }
  206. let prev_pos = self.pos;
  207. let num_zeros_sz: usize = num_zeros.try_into().display_err()?;
  208. self.seek(SeekFrom::End(0))?;
  209. let end_pos = self.pos + num_zeros_sz;
  210. {
  211. let start = self.buf_pos();
  212. let end = self.buf.len().min(start + num_zeros_sz);
  213. write_bytes(&mut self.buf[start..end], 0);
  214. self.dirty = self.dirty || end > start;
  215. self.pos += end - start;
  216. Self::update_size(&mut self.inner, self.pos)?;
  217. self.flush()?;
  218. }
  219. if self.pos >= end_pos {
  220. self.seek(SeekFrom::Start(prev_pos as u64))?;
  221. return Ok(());
  222. }
  223. write_bytes(&mut self.buf, 0);
  224. let iters = (end_pos - self.pos) / self.buf.len();
  225. for _ in 0..iters {
  226. self.dirty = true;
  227. self.pos += self.buf.len();
  228. Self::update_size(&mut self.inner, self.pos)?;
  229. self.flush()?;
  230. }
  231. let remain = (end_pos - self.pos) % self.buf.len();
  232. self.pos += remain;
  233. self.dirty = remain > 0;
  234. Self::update_size(&mut self.inner, self.pos)?;
  235. self.flush()?;
  236. self.seek(SeekFrom::Start(prev_pos as u64))?;
  237. Ok(())
  238. }
  239. }
  240. /// Returns the slice of the internal buffer which is ready to be read from.
  241. /// If the buffer all the bytes in the buffer have been consumed, then the buffer is refilled.
  242. /// The returned slice will be empty if and only if there are no additional bytes in the
  243. /// inner stream.
  244. macro_rules! readable_slice {
  245. ($self:expr) => {{
  246. let pos = $self.buf_pos();
  247. let end = $self.buf_end();
  248. if pos == end && $self.pos < $self.len() {
  249. match $self.fill_internal_buf() {
  250. Ok(nread) => {
  251. if nread > 0 {
  252. Ok(&$self.buf[..$self.buf_end()])
  253. } else {
  254. Ok(&$self.buf[..0])
  255. }
  256. }
  257. Err(err) => Err(err),
  258. }
  259. } else {
  260. Ok(&$self.buf[pos..end])
  261. }
  262. }};
  263. }
  264. impl<T: Read + Seek + AsRef<BlockMeta>> Read for SectoredBuf<T> {
  265. fn read(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
  266. if self.pos == self.len() {
  267. return Ok(0);
  268. }
  269. let dest_len_start = dest.len();
  270. let mut src = readable_slice!(self)?;
  271. while !dest.is_empty() {
  272. if src.is_empty() {
  273. src = suppress_err_if_non_zero!(
  274. dest_len_start - dest.len(),
  275. readable_slice!(self)
  276. );
  277. // If `src` is still empty, then we've reached the end of the stream.
  278. if src.is_empty() {
  279. break;
  280. }
  281. }
  282. let sz = src.len().min(dest.len());
  283. dest[..sz].copy_from_slice(&src[..sz]);
  284. dest = &mut dest[sz..];
  285. src = &src[sz..];
  286. self.pos += sz;
  287. }
  288. Ok(dest_len_start - dest.len())
  289. }
  290. }
  291. impl<T: Seek + Read + AsRef<BlockMeta>> SectoredBuf<T> {
  292. /// Seeks this stream to the given position.
  293. /// If a seek to a different sector is needed then `pre_seek` is called before this seek
  294. /// is performed. This can be used to flush buffered data, or to prevent the seek if a
  295. /// flush can't be performed.
  296. fn seek_impl<F: FnOnce(&mut Self) -> io::Result<()>>(
  297. &mut self,
  298. seek_from: SeekFrom,
  299. pre_seek: F,
  300. ) -> io::Result<u64> {
  301. let pos = self.pos as u64;
  302. let pos_new = seek_from.abs(|| Ok(pos), || self.size_or_err())?;
  303. let len = self.len();
  304. if pos_new > len as u64 {
  305. return Err(io::Error::new(
  306. io::ErrorKind::InvalidInput,
  307. format!("can't seek to {pos_new}, only {len} bytes total"),
  308. ));
  309. }
  310. let sect_sz = self.sector_sz() as u64;
  311. let sect_index = pos / sect_sz;
  312. let sect_index_new = pos_new / sect_sz;
  313. if sect_index != sect_index_new || sect_sz == pos - self.buf_start as u64 {
  314. pre_seek(self)?;
  315. let seek_to = sect_index_new * sect_sz;
  316. self.inner.seek(SeekFrom::Start(seek_to))?;
  317. self.fill_internal_buf()?;
  318. }
  319. self.pos = pos_new.try_into().box_err()?;
  320. Ok(pos_new)
  321. }
  322. /// Returns a slice of the internal buffer that starts at the given offset in the inner
  323. /// stream, and which is no longer than the given size. Note that this slice may
  324. /// be shorter than the `size` parameter if the number of bytes in the internal buffer is
  325. /// less than `size`.
  326. pub fn get_buf(&self, offset: u64, size: u64) -> Result<&[u8]> {
  327. let offset: usize = offset.try_into().unwrap();
  328. let size: usize = size.try_into().unwrap();
  329. let sect_sz = self.sector_sz();
  330. let index = offset / sect_sz;
  331. if self.buf_start != sect_sz * index {
  332. return Err(bterr!(
  333. "SectoredBuf in wrong position to return buf for offset {offset}, size {size}"
  334. ));
  335. }
  336. let start = offset % sect_sz;
  337. let end = self.buf.len().min(start + size);
  338. Ok(&self.buf[start..end])
  339. }
  340. }
  341. impl<T: Seek + Read + Write + MetaAccess> Seek for SectoredBuf<T> {
  342. fn seek(&mut self, seek_from: SeekFrom) -> io::Result<u64> {
  343. self.seek_impl(seek_from, |sich| sich.flush())
  344. }
  345. }
  346. impl<T: Read + Seek + AsRef<BlockMeta>> TrySeek for SectoredBuf<T> {
  347. fn try_seek(&mut self, seek_from: SeekFrom) -> io::Result<()> {
  348. self.seek_impl(seek_from, |sich| {
  349. if sich.dirty {
  350. Err(io::Error::new(
  351. io::ErrorKind::Unsupported,
  352. "SectoredBuf::try_seek failed because it has unwritten data",
  353. ))
  354. } else {
  355. Ok(())
  356. }
  357. })?;
  358. Ok(())
  359. }
  360. }
  361. impl<U, T: AsRef<U>> AsRef<U> for SectoredBuf<T> {
  362. fn as_ref(&self) -> &U {
  363. self.inner.as_ref()
  364. }
  365. }
  366. impl<U, T: AsMut<U>> AsMut<U> for SectoredBuf<T> {
  367. fn as_mut(&mut self) -> &mut U {
  368. self.inner.as_mut()
  369. }
  370. }
  371. impl<T: AsRef<BlockMeta>> Size for SectoredBuf<T> {
  372. /// Returns the size of the block, which the value stored in the block's metadata, not
  373. /// the length of the inner stream.
  374. fn size(&self) -> io::Result<Option<u64>> {
  375. Ok(Some(self.inner.as_ref().body.secrets()?.size))
  376. }
  377. }
  378. impl<T: Read + Write + Seek + MetaAccess> WriteDual for SectoredBuf<T> {
  379. fn write_from<R: Read>(&mut self, mut read: R, mut count: usize) -> io::Result<usize> {
  380. let pos_start = self.pos;
  381. let mut dest = {
  382. let pos = self.buf_pos();
  383. &mut self.buf[pos..]
  384. };
  385. let dest_len = dest.len();
  386. dest = &mut dest[..dest_len.min(count)];
  387. while count > 0 {
  388. if dest.is_empty() {
  389. suppress_err_if_non_zero!(self.pos - pos_start, self.flush());
  390. dest = &mut self.buf[..];
  391. let dest_len = dest.len();
  392. dest = &mut dest[..dest_len.min(count)];
  393. }
  394. let nread = suppress_err_if_non_zero!(self.pos - pos_start, read.read(dest));
  395. if 0 == nread {
  396. break;
  397. }
  398. self.dirty = true;
  399. dest = &mut dest[nread..];
  400. self.pos += nread;
  401. count -= nread;
  402. self.inner.mut_meta_body().access_secrets(|secrets| {
  403. secrets.size = secrets.size.max(self.pos as u64);
  404. Ok(())
  405. })?;
  406. }
  407. Ok(self.pos - pos_start)
  408. }
  409. }
  410. impl<T: Read + Seek + AsRef<BlockMeta>> ReadDual for SectoredBuf<T> {
  411. fn read_into<W: Write>(&mut self, mut write: W, mut count: usize) -> io::Result<usize> {
  412. let pos_start = self.pos;
  413. let mut src = readable_slice!(self)?;
  414. src = &src[..src.len().min(count)];
  415. while count > 0 {
  416. if src.is_empty() {
  417. src = suppress_err_if_non_zero!(self.pos - pos_start, readable_slice!(self));
  418. src = &src[..src.len().min(count)];
  419. // If `src` is still empty, then we've reached the end of the stream.
  420. if src.is_empty() {
  421. break;
  422. }
  423. }
  424. let written = write.write(src)?;
  425. src = &src[written..];
  426. self.pos += written;
  427. count -= written;
  428. }
  429. Ok(self.pos - pos_start)
  430. }
  431. }
  432. impl<T> Positioned for SectoredBuf<T> {
  433. fn pos(&self) -> usize {
  434. self.pos
  435. }
  436. }
  437. }
  438. #[cfg(test)]
  439. mod tests {
  440. use super::*;
  441. use crate::{
  442. test_helpers::{
  443. integer_array, read_check, write_fill, BtCursor, Randomizer, SectoredCursor,
  444. SECTOR_SZ_DEFAULT,
  445. },
  446. Cursor,
  447. };
  448. fn make_sectored_buf(sect_sz: usize, sect_ct: usize) -> SectoredBuf<SectoredCursor<Vec<u8>>> {
  449. SectoredBuf::new(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz))
  450. .expect("compose for sectored buffer failed")
  451. }
  452. #[test]
  453. fn sectored_buf_fill_inner() {
  454. const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
  455. const SECT_CT: usize = 16;
  456. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  457. let sect_sz = sectored.sector_sz();
  458. assert_eq!(0, sect_sz % 16);
  459. let chunk_sz = sect_sz / 16;
  460. let chunk_ct = SECT_CT * 16;
  461. write_fill(&mut sectored, chunk_sz, chunk_ct);
  462. }
  463. #[test]
  464. fn sectored_buf_write_read_sequential() {
  465. const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
  466. const SECT_CT: usize = 16;
  467. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  468. let sect_sz = sectored.sector_sz();
  469. assert_eq!(0, sect_sz % 16);
  470. let chunk_sz = sect_sz / 16;
  471. // We subtract one here so that the underlying buffer is not completely filled. This
  472. // exercises the length limiting capability of the sectored buffer.
  473. let chunk_ct = SECT_CT * 16 - 1;
  474. write_fill(&mut sectored, chunk_sz, chunk_ct);
  475. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  476. read_check(&mut sectored, chunk_sz, chunk_ct);
  477. }
  478. #[test]
  479. fn sectored_buf_len_preserved() {
  480. const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
  481. const SECT_CT: usize = 16;
  482. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  483. let expected = vec![42u8; 12];
  484. // We need to ensure that writing expected will not fill up the buffer in sectored.
  485. assert!(expected.len() < sectored.sector_sz());
  486. sectored.write_all(&expected).expect("write failed");
  487. sectored.flush().expect("flush failed");
  488. let inner = sectored.into_inner();
  489. let mut sectored = SectoredBuf::new(inner).expect("failed to compose sectored buffer");
  490. let mut actual = vec![0u8; expected.len()];
  491. sectored
  492. .fill_buf(actual.as_mut_slice())
  493. .expect("failed to fill actual");
  494. assert_eq!(expected, actual);
  495. }
  496. #[test]
  497. fn sectored_buf_seek() {
  498. let sect_sz = 16usize;
  499. let sect_ct = 16usize;
  500. let cap = sect_sz * sect_ct - std::mem::size_of::<usize>();
  501. let source = {
  502. let mut source = Vec::with_capacity(cap);
  503. source.extend(
  504. std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
  505. .take(cap),
  506. );
  507. source
  508. };
  509. let mut sectored = make_sectored_buf(sect_sz, sect_ct);
  510. sectored.write(&source).expect("write failed");
  511. let mut buf = [0u8; 1];
  512. let end = cap.try_into().expect("cap cannot fit into a u8");
  513. for pos in (0..end).rev() {
  514. sectored
  515. .seek(SeekFrom::Start(pos as u64))
  516. .expect("seek failed");
  517. sectored.read(&mut buf).expect("read failed");
  518. assert_eq!(pos, buf[0]);
  519. }
  520. }
  521. /// Tests that data written can be read from the buffer without an intervening call to `flush`.
  522. #[test]
  523. fn sectored_buf_write_then_read() {
  524. const EXPECTED: &[u8] = b"alpha";
  525. let mut sectored = make_sectored_buf(4096, 1);
  526. sectored.write(EXPECTED).expect("write failed");
  527. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  528. let mut actual = [0u8; EXPECTED.len()];
  529. sectored.read(&mut actual).expect("read failed");
  530. assert_eq!(EXPECTED, actual);
  531. }
  532. #[test]
  533. fn sectored_buf_write_read_random() {
  534. const SECT_SZ: usize = 16;
  535. const SECT_CT: usize = 16;
  536. const CAP: usize = SECT_SZ * SECT_CT - std::mem::size_of::<usize>();
  537. let source = {
  538. let mut expected = Vec::with_capacity(CAP);
  539. expected.extend(
  540. std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
  541. .take(CAP),
  542. );
  543. expected
  544. };
  545. let indices: Vec<(usize, usize)> = {
  546. let rando = Randomizer::new([3u8; Randomizer::HASH.len()]);
  547. let rando2 = Randomizer::new([5u8; Randomizer::HASH.len()]);
  548. rando
  549. .zip(rando2)
  550. .take(SECT_CT)
  551. .map(|(mut first, mut second)| {
  552. first %= source.len();
  553. second &= source.len();
  554. let low = first.min(second);
  555. let high = first.max(second);
  556. (low, high)
  557. })
  558. .collect()
  559. };
  560. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  561. sectored
  562. .write_all(&[0u8; CAP])
  563. .expect("failed to fill sectored");
  564. sectored.flush().expect("flush failed");
  565. for (_k, (low, high)) in indices.iter().enumerate() {
  566. sectored
  567. .seek(SeekFrom::Start(*low as u64))
  568. .expect("seek failed");
  569. let src = &source[*low..*high];
  570. sectored.write(src).expect("write failed");
  571. }
  572. sectored.flush().expect("flush failed");
  573. let mut buf = vec![0u8; CAP];
  574. for (_k, (low, high)) in indices.iter().enumerate() {
  575. sectored
  576. .seek(SeekFrom::Start(*low as u64))
  577. .expect("seek failed");
  578. let actual = &mut buf[*low..*high];
  579. sectored.fill_buf(actual).expect("read failed");
  580. let expected = &source[*low..*high];
  581. assert_eq!(expected, actual);
  582. }
  583. }
  584. #[test]
  585. fn sectored_buf_read_past_end() {
  586. const LEN: usize = 32;
  587. let mut sectored =
  588. SectoredBuf::new(SectoredCursor::new([0u8; LEN], LEN)).expect("compose failed");
  589. const BUF_LEN: usize = LEN + 1;
  590. sectored.write(&[1u8; BUF_LEN - 1]).expect("write failed");
  591. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  592. let mut buf = [0u8; BUF_LEN];
  593. // Note that buf is one byte longer than the available capacity in the cursor.
  594. sectored.read(&mut buf).expect("read failed");
  595. assert_eq!(&[1u8; BUF_LEN - 1], &buf[..(BUF_LEN - 1)]);
  596. assert_eq!(0u8, buf[BUF_LEN - 1]);
  597. }
  598. /// Tests that the data written in try_compose is actually written back to the underlying stream.
  599. #[test]
  600. fn sectored_buf_write_back() {
  601. let mut sectored =
  602. SectoredBuf::new(SectoredCursor::new(vec![0u8; 24], 16)).expect("compose failed");
  603. let expected = [1u8; 8];
  604. sectored.write(&expected).expect("first write failed");
  605. sectored.write(&[2u8; 8]).expect("second write failed");
  606. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  607. let mut actual = [0u8; 8];
  608. sectored.read(&mut actual).expect("read failed");
  609. assert_eq!(expected, actual);
  610. }
  611. #[test]
  612. fn sectored_buf_write_past_end() {
  613. const LEN: usize = 8;
  614. let mut sectored =
  615. SectoredBuf::new(SectoredCursor::new(vec![0u8; 0], LEN)).expect("compos failed");
  616. let expected = [1u8; LEN + 1];
  617. sectored.write(&expected).expect("write failed");
  618. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  619. let mut actual = [0u8; LEN + 1];
  620. sectored.read(&mut actual).expect("read failed");
  621. assert_eq!(expected, actual);
  622. }
  623. #[test]
  624. fn read_into_count_limits_read_bytes() {
  625. const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
  626. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap();
  627. sectored.write(&DATA).unwrap();
  628. sectored.rewind().unwrap();
  629. let mut actual = BtCursor::new([0u8; DATA.len()]);
  630. let read = sectored.read_into(&mut actual, DATA.len() - 1).unwrap();
  631. assert_eq!(DATA.len() - 1, read);
  632. assert_eq!([0, 1, 2, 3, 4, 5, 6, 0], actual.into_inner());
  633. }
  634. #[test]
  635. fn read_into_read_spans_multiple_sectors() {
  636. const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
  637. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap();
  638. sectored.write(&DATA).unwrap();
  639. sectored.write(&DATA).unwrap();
  640. sectored.rewind().unwrap();
  641. const ACTUAL_LEN: usize = DATA.len() + DATA.len() / 2;
  642. let mut actual = BtCursor::new([0u8; ACTUAL_LEN]);
  643. let read = sectored.read_into(&mut actual, ACTUAL_LEN).unwrap();
  644. assert_eq!(ACTUAL_LEN, read);
  645. assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3], actual.into_inner());
  646. }
  647. /// Tests that a read asking for more bytes than the number available returns the number
  648. /// available.
  649. #[test]
  650. fn read_into_read_past_len() {
  651. const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
  652. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap();
  653. sectored.write(&DATA).unwrap();
  654. sectored.rewind().unwrap();
  655. const ACTUAL_LEN: usize = DATA.len() + 1;
  656. let mut actual = BtCursor::new([0u8; ACTUAL_LEN]);
  657. let read = sectored.read_into(&mut actual, ACTUAL_LEN).unwrap();
  658. assert_eq!(DATA.len(), read);
  659. assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 0], actual.into_inner());
  660. }
  661. #[test]
  662. fn write_from_full_cursor() {
  663. const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
  664. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap();
  665. let written = sectored
  666. .write_from(BtCursor::new(DATA), DATA.len())
  667. .unwrap();
  668. assert_eq!(DATA.len(), written);
  669. sectored.flush().unwrap();
  670. assert_eq!(&DATA, sectored.into_inner().into_inner().as_slice());
  671. }
  672. #[test]
  673. fn write_from_count_limits_bytes_read() {
  674. const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
  675. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), DATA.len())).unwrap();
  676. let mut cursor = BtCursor::new(DATA);
  677. let written = sectored.write_from(&mut cursor, DATA.len() / 2).unwrap();
  678. assert_eq!(DATA.len() / 2, written);
  679. sectored.flush().unwrap();
  680. assert_eq!(
  681. &[0, 1, 2, 3, 0, 0, 0, 0],
  682. sectored.into_inner().into_inner().as_slice()
  683. );
  684. let mut remaining = Vec::new();
  685. cursor.read_to_end(&mut remaining).unwrap();
  686. assert_eq!(&[4, 5, 6, 7], remaining.as_slice());
  687. }
  688. #[test]
  689. fn write_from_write_spans_multiple_sectors() {
  690. const SECT_SZ: usize = 4;
  691. const DATA: [u8; SECT_SZ + 1] = [0, 1, 2, 3, 4];
  692. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
  693. let written = sectored
  694. .write_from(BtCursor::new(DATA), DATA.len())
  695. .unwrap();
  696. assert_eq!(DATA.len(), written);
  697. sectored.rewind().unwrap();
  698. let mut actual = Vec::new();
  699. sectored.read_to_end(&mut actual).unwrap();
  700. assert_eq!(&[0, 1, 2, 3, 4], actual.as_slice());
  701. }
  702. #[test]
  703. fn try_seek_to_second_sector() {
  704. const SECT_SZ: usize = 4;
  705. const DATA_LEN: usize = 2 * SECT_SZ;
  706. const DATA: [u8; DATA_LEN] = integer_array::<DATA_LEN>(0);
  707. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
  708. let written = sectored
  709. .write_from(BtCursor::new(DATA), DATA.len())
  710. .unwrap();
  711. assert_eq!(DATA.len(), written);
  712. sectored.rewind().unwrap();
  713. const OFFSET: u64 = SECT_SZ as u64 + 1;
  714. sectored.try_seek(SeekFrom::Start(OFFSET)).unwrap();
  715. let mut actual = BtCursor::new(Vec::new());
  716. sectored.read_into(&mut actual, SECT_SZ).unwrap();
  717. const EXPECTED_LEN: usize = SECT_SZ - 1;
  718. const EXPECTED: [u8; EXPECTED_LEN] = integer_array::<EXPECTED_LEN>(OFFSET as u8);
  719. assert_eq!(&EXPECTED, actual.into_inner().as_slice());
  720. }
  721. #[test]
  722. fn seek_past_end_is_error() {
  723. const SECT_SZ: usize = 4;
  724. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
  725. let result = sectored.seek(SeekFrom::Start(1));
  726. let matched = if let Err(err) = result {
  727. io::ErrorKind::InvalidInput == err.kind()
  728. } else {
  729. false
  730. };
  731. assert!(matched);
  732. }
  733. #[test]
  734. fn seek_to_zero_when_empty() {
  735. const SECT_SZ: usize = 4;
  736. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
  737. let pos = sectored.seek(SeekFrom::Start(0)).unwrap();
  738. assert_eq!(0, pos);
  739. }
  740. #[test]
  741. fn read_into_consumes_remaining_sector() {
  742. // SECT_SZ % 2 is assumed be 0.
  743. const SECT_SZ: usize = 4;
  744. const DATA_LEN: usize = 2 * SECT_SZ;
  745. const DATA: [u8; DATA_LEN] = integer_array::<DATA_LEN>(0);
  746. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
  747. sectored.write(&DATA).unwrap();
  748. const MID_FIRST: usize = SECT_SZ / 2;
  749. sectored.seek(SeekFrom::Start(MID_FIRST as u64)).unwrap();
  750. // Notice that the `count` argument plus the current `sectored.pos` equals `SECT_SZ`.
  751. // This will cause `pos / sect_sz` to increase by one, but without incrementing
  752. // `buf_start`. This creates a special case that `seek_impl` has to take into account.
  753. sectored
  754. .read_into(&mut BtCursor::new([0u8; MID_FIRST]), MID_FIRST)
  755. .unwrap();
  756. const MID_SECOND: u64 = 3 * SECT_SZ as u64 / 2;
  757. sectored.try_seek(SeekFrom::Start(MID_SECOND)).unwrap();
  758. const EXPECTED_LEN: usize = SECT_SZ / 2;
  759. const EXPECTED: [u8; EXPECTED_LEN] = integer_array::<EXPECTED_LEN>(MID_SECOND as u8);
  760. let mut actual = BtCursor::new(Vec::new());
  761. let nread = sectored.read_into(&mut actual, EXPECTED_LEN).unwrap();
  762. assert_eq!(EXPECTED_LEN, nread);
  763. assert_eq!(&EXPECTED, actual.into_inner().as_slice());
  764. }
  765. #[test]
  766. fn read_into_reads_nothing_when_at_end() {
  767. const SECT_SZ: usize = 8;
  768. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
  769. sectored.write([1u8; 6].as_slice()).unwrap();
  770. let mut actual = Cursor::new(Vec::new());
  771. sectored.read_into(&mut actual, SECT_SZ).unwrap();
  772. assert_eq!(&[0u8; 0], actual.get_ref().as_slice());
  773. }
  774. #[test]
  775. fn zero_extend_less_than_sect_sz() {
  776. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), 8)).unwrap();
  777. let written = sectored.write([1u8; 4].as_slice()).unwrap();
  778. assert_eq!(4, written);
  779. sectored.zero_extend(2).unwrap();
  780. sectored.rewind().unwrap();
  781. let mut actual = Cursor::new(Vec::new());
  782. sectored.read_into(&mut actual, 8).unwrap();
  783. assert_eq!(&[1, 1, 1, 1, 0, 0], actual.get_ref().as_slice());
  784. }
  785. #[test]
  786. fn zero_extend_multiple_sectors() {
  787. const SECT_SZ: usize = 8;
  788. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
  789. let written = sectored.write([1u8; SECT_SZ / 2].as_slice()).unwrap();
  790. assert_eq!(SECT_SZ / 2, written);
  791. const EXPECTED_LEN: usize = 3 * SECT_SZ / 2;
  792. sectored.rewind().unwrap();
  793. // Note that zero_extend is called when the current position is 0. The current position
  794. // must not affect zero extension.
  795. sectored.zero_extend(EXPECTED_LEN as u64).unwrap();
  796. let mut actual = Cursor::new(Vec::new());
  797. sectored
  798. .read_into(&mut actual, EXPECTED_LEN + SECT_SZ / 2)
  799. .unwrap();
  800. let actual = actual.into_inner();
  801. assert_eq!(&[1, 1, 1, 1], &actual[..(SECT_SZ / 2)]);
  802. assert_eq!(&[0u8; EXPECTED_LEN], &actual[(SECT_SZ / 2)..]);
  803. }
  804. #[test]
  805. fn zero_extend_multiple_sectors_with_remainder() {
  806. const SECT_SZ: usize = 8;
  807. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
  808. let written = sectored.write([1u8; SECT_SZ / 2].as_slice()).unwrap();
  809. assert_eq!(SECT_SZ / 2, written);
  810. // Notice that the total length of the inner stream will be 2 * SECT_SZ + 1.
  811. const EXPECTED_LEN: usize = 3 * SECT_SZ / 2 + 1;
  812. sectored.rewind().unwrap();
  813. sectored.zero_extend(EXPECTED_LEN as u64).unwrap();
  814. let mut actual = Cursor::new(Vec::new());
  815. sectored
  816. .read_into(&mut actual, EXPECTED_LEN + SECT_SZ / 2)
  817. .unwrap();
  818. let actual = actual.into_inner();
  819. assert_eq!(&[1, 1, 1, 1], &actual[..(SECT_SZ / 2)]);
  820. assert_eq!(&[0u8; EXPECTED_LEN], &actual[(SECT_SZ / 2)..]);
  821. }
  822. #[test]
  823. fn get_buf() {
  824. const SECT_SZ: usize = crate::SECTOR_SZ_DEFAULT;
  825. const DIVISOR: usize = 8;
  826. const READ_SZ: usize = SECT_SZ / DIVISOR;
  827. let mut sectored = SectoredBuf::new(SectoredCursor::new(Vec::new(), SECT_SZ)).unwrap();
  828. let mut expected = vec![0u8; READ_SZ];
  829. for index in 0..(DIVISOR as u8 + 1) {
  830. expected.fill(index + 1);
  831. sectored.write(&expected).unwrap();
  832. }
  833. sectored.rewind().unwrap();
  834. for index in 0..(DIVISOR as u8 + 1) {
  835. let offset = (READ_SZ * index as usize) as u64;
  836. sectored.try_seek(SeekFrom::Start(offset)).unwrap();
  837. let actual = sectored.get_buf(offset, READ_SZ as u64).unwrap();
  838. expected.fill(index + 1);
  839. assert!(actual == expected);
  840. }
  841. }
  842. }