sectored_buf.rs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819
  1. use log::error;
  2. use positioned_io::Size;
  3. use std::io::{self, Read, Seek, SeekFrom, Write};
  4. use crate::{
  5. bterr, suppress_err_if_non_zero, BlockError, BlockMeta, BoxInIoErr, Decompose, MetaAccess,
  6. Positioned, ReadDual, ReadExt, Result, Sectored, SeekFromExt, SizeExt, Split, TryCompose,
  7. TrySeek, WriteDual, EMPTY_SLICE,
  8. };
  9. pub use private::SectoredBuf;
  10. mod private {
  11. use super::*;
  12. /// A stream which buffers writes and read such that the inner stream only sees reads and writes
  13. /// of sector length buffers.
  14. pub struct SectoredBuf<T> {
  15. inner: T,
  16. buf: Vec<u8>,
  17. /// The offset into the inner stream which the zero offset byte in `buf` corresponds to.
  18. buf_start: usize,
  19. /// Indicates if the contents of `buf` have been written to, and so whether `buf` needs to
  20. /// be written back to `inner` before it is refilled.
  21. dirty: bool,
  22. /// The current position of this stream, expressed as an offset into the inner stream.
  23. pos: usize,
  24. }
  25. impl SectoredBuf<()> {
  26. pub fn new() -> SectoredBuf<()> {
  27. SectoredBuf {
  28. inner: (),
  29. pos: 0,
  30. buf: Vec::new(),
  31. buf_start: 0,
  32. dirty: false,
  33. }
  34. }
  35. }
  36. /// Returns the slice of the internal buffer which is ready to be read from.
  37. /// If the buffer all the bytes in the buffer have been consumed, then the buffer is refilled.
  38. /// The returned slice will be empty if and only if there are no additional bytes in the
  39. /// inner stream.
  40. macro_rules! readable_slice {
  41. ($self:expr) => {{
  42. let pos = $self.buf_pos();
  43. let end = $self.buf_end();
  44. if pos == end {
  45. match $self.fill_internal_buf() {
  46. Ok(nread) => {
  47. if nread > 0 {
  48. Ok(&$self.buf[..$self.buf_end()])
  49. } else {
  50. Ok(&$self.buf[..0])
  51. }
  52. }
  53. Err(err) => Err(err),
  54. }
  55. } else {
  56. Ok(&$self.buf[pos..end])
  57. }
  58. }};
  59. }
  60. impl<T> SectoredBuf<T> {
  61. /// Returns a reference to the inner stream.
  62. pub fn get_ref(&self) -> &T {
  63. &self.inner
  64. }
  65. /// Returns a mutable reference to the inner stream.
  66. pub fn get_mut(&mut self) -> &mut T {
  67. &mut self.inner
  68. }
  69. /// Returns the offset into the internal buffer that corresponds to the current position.
  70. fn buf_pos(&self) -> usize {
  71. let buf_pos = self.pos - self.buf_start;
  72. debug_assert!(buf_pos <= self.buf.len());
  73. buf_pos
  74. }
  75. }
  76. impl<T: AsRef<BlockMeta>> SectoredBuf<T> {
  77. fn len(&self) -> usize {
  78. self.inner
  79. .as_ref()
  80. .body
  81. .secrets()
  82. .unwrap()
  83. .size
  84. .try_into()
  85. .unwrap()
  86. }
  87. /// Returns one more than the last index in the internal buffer which can be read.
  88. fn buf_end(&self) -> usize {
  89. let len = self.len();
  90. let sect_sz = self.sector_sz();
  91. let limit = len.min(self.buf_start + sect_sz);
  92. limit - self.buf_start
  93. }
  94. }
  95. impl<T: Read + Seek + AsRef<BlockMeta>> SectoredBuf<T> {
  96. /// Fills the internal buffer by reading from the inner stream at the current position
  97. /// and updates `self.buf_start` with the position read from.
  98. fn fill_internal_buf(&mut self) -> Result<usize> {
  99. self.buf_start = self.inner.stream_position()?.try_into().box_err()?;
  100. let read_bytes = if self.buf_start < self.len() {
  101. let read_bytes = self.inner.fill_buf(&mut self.buf)?;
  102. if read_bytes < self.buf.len() {
  103. return Err(bterr!(BlockError::IncorrectSize {
  104. expected: self.buf.len(),
  105. actual: read_bytes,
  106. }));
  107. }
  108. read_bytes
  109. } else {
  110. 0
  111. };
  112. Ok(read_bytes)
  113. }
  114. }
  115. impl Default for SectoredBuf<()> {
  116. fn default() -> Self {
  117. Self::new()
  118. }
  119. }
  120. impl<T> Split<SectoredBuf<&'static [u8]>, T> for SectoredBuf<T> {
  121. fn split(self) -> (SectoredBuf<&'static [u8]>, T) {
  122. let new_self = SectoredBuf {
  123. inner: EMPTY_SLICE,
  124. buf: self.buf,
  125. buf_start: self.buf_start,
  126. dirty: self.dirty,
  127. pos: self.pos,
  128. };
  129. (new_self, self.inner)
  130. }
  131. fn combine(left: SectoredBuf<&'static [u8]>, right: T) -> Self {
  132. SectoredBuf {
  133. inner: right,
  134. buf: left.buf,
  135. buf_start: left.buf_start,
  136. dirty: left.dirty,
  137. pos: left.pos,
  138. }
  139. }
  140. }
  141. impl<T> Decompose<T> for SectoredBuf<T> {
  142. fn into_inner(self) -> T {
  143. self.inner
  144. }
  145. }
  146. impl<T: Sectored + Read + Seek + AsRef<BlockMeta>> TryCompose<T, SectoredBuf<T>>
  147. for SectoredBuf<()>
  148. {
  149. type Error = crate::Error;
  150. fn try_compose(self, inner: T) -> Result<SectoredBuf<T>> {
  151. let sect_sz = inner.sector_sz();
  152. let mut sectored = SectoredBuf {
  153. inner,
  154. buf: self.buf,
  155. buf_start: 0,
  156. dirty: false,
  157. pos: 0,
  158. };
  159. sectored.buf.resize(sect_sz, 0);
  160. sectored.inner.seek(SeekFrom::Start(0))?;
  161. sectored.fill_internal_buf()?;
  162. Ok(sectored)
  163. }
  164. }
  165. impl<T> Sectored for SectoredBuf<T> {
  166. fn sector_sz(&self) -> usize {
  167. self.buf.len()
  168. }
  169. }
  170. impl<T: Seek + Read + Write + MetaAccess> Write for SectoredBuf<T> {
  171. fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
  172. let src_len_start = src.len();
  173. let mut dest = {
  174. let buf_pos = self.buf_pos();
  175. &mut self.buf[buf_pos..]
  176. };
  177. while !src.is_empty() {
  178. if dest.is_empty() {
  179. suppress_err_if_non_zero!(src_len_start - src.len(), self.flush());
  180. dest = &mut self.buf[..];
  181. }
  182. let sz = src.len().min(dest.len());
  183. dest[..sz].copy_from_slice(&src[..sz]);
  184. dest = &mut dest[sz..];
  185. src = &src[sz..];
  186. self.dirty = sz > 0;
  187. self.pos += sz;
  188. self.inner.mut_meta_body().access_secrets(|secrets| {
  189. secrets.size = secrets.size.max(self.pos as u64);
  190. Ok(())
  191. })?;
  192. }
  193. Ok(src_len_start - src.len())
  194. }
  195. fn flush(&mut self) -> io::Result<()> {
  196. if !self.dirty {
  197. return Ok(());
  198. }
  199. // Write out the contents of the buffer.
  200. let sect_sz: u64 = self.sector_sz().try_into().box_err()?;
  201. let inner_pos = self.inner.stream_position()?;
  202. let inner_pos_usize: usize = inner_pos.try_into().box_err()?;
  203. let is_new_sector = self.pos > inner_pos_usize;
  204. let is_full = (self.buf.len() - self.buf_pos()) == 0;
  205. let (seek_to, fill_internal_buf) = if is_new_sector {
  206. if is_full {
  207. (inner_pos + sect_sz, true)
  208. } else {
  209. (inner_pos, true)
  210. }
  211. } else {
  212. // The contents of the buffer were previously read from inner, so we write the
  213. // updated contents to the same offset.
  214. let sect_start: u64 = self.buf_start.try_into().box_err()?;
  215. self.inner.seek(SeekFrom::Start(sect_start))?;
  216. if is_full {
  217. (inner_pos, true)
  218. } else {
  219. // This is the one case were we don't have to refill the internal buffer.
  220. (inner_pos - sect_sz, false)
  221. }
  222. };
  223. self.inner.write_all(&self.buf)?;
  224. self.inner.flush()?;
  225. // Seek to the next position.
  226. self.inner.seek(SeekFrom::Start(seek_to))?;
  227. if fill_internal_buf {
  228. self.fill_internal_buf()?;
  229. }
  230. self.dirty = false;
  231. Ok(())
  232. }
  233. }
  234. impl<T: Read + Seek + AsRef<BlockMeta>> Read for SectoredBuf<T> {
  235. fn read(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
  236. if self.pos == self.len() {
  237. return Ok(0);
  238. }
  239. let dest_len_start = dest.len();
  240. let mut src = readable_slice!(self)?;
  241. while !dest.is_empty() {
  242. if src.is_empty() {
  243. src = suppress_err_if_non_zero!(
  244. dest_len_start - dest.len(),
  245. readable_slice!(self)
  246. );
  247. // If `src` is still empty, then we've reached the end of the stream.
  248. if src.is_empty() {
  249. break;
  250. }
  251. }
  252. let sz = src.len().min(dest.len());
  253. dest[..sz].copy_from_slice(&src[..sz]);
  254. dest = &mut dest[sz..];
  255. src = &src[sz..];
  256. self.pos += sz;
  257. }
  258. Ok(dest_len_start - dest.len())
  259. }
  260. }
  261. impl<T: Seek + Read + AsRef<BlockMeta>> SectoredBuf<T> {
  262. /// Seeks this stream to the given position.
  263. /// If a seek to a different sector is needed then `pre_seek` is called before this seek
  264. /// is performed. This can be used to flush buffered data, or to prevent the seek if a
  265. /// flush can't be performed.
  266. fn seek_impl<F: FnOnce(&mut Self) -> io::Result<()>>(
  267. &mut self,
  268. seek_from: SeekFrom,
  269. pre_seek: F,
  270. ) -> io::Result<u64> {
  271. let pos = self.pos as u64;
  272. let pos_new = seek_from.abs(|| Ok(pos), || self.size_or_err())?;
  273. let len = self.len();
  274. if pos_new > len as u64 {
  275. return Err(io::Error::new(
  276. io::ErrorKind::InvalidInput,
  277. format!("can't seek to {pos_new}, only {len} bytes total"),
  278. ));
  279. }
  280. let sect_sz = self.sector_sz() as u64;
  281. let sect_index = pos / sect_sz;
  282. let sect_index_new = pos_new / sect_sz;
  283. if sect_index != sect_index_new || sect_sz == pos - self.buf_start as u64 {
  284. pre_seek(self)?;
  285. let seek_to = sect_index_new * sect_sz;
  286. self.inner.seek(SeekFrom::Start(seek_to))?;
  287. self.fill_internal_buf()?;
  288. }
  289. self.pos = pos_new.try_into().box_err()?;
  290. Ok(pos_new)
  291. }
  292. }
  293. impl<T: Seek + Read + Write + MetaAccess> Seek for SectoredBuf<T> {
  294. fn seek(&mut self, seek_from: SeekFrom) -> io::Result<u64> {
  295. self.seek_impl(seek_from, |sich| sich.flush())
  296. }
  297. }
  298. impl<T: Read + Seek + AsRef<BlockMeta>> TrySeek for SectoredBuf<T> {
  299. fn try_seek(&mut self, seek_from: SeekFrom) -> io::Result<()> {
  300. self.seek_impl(seek_from, |sich| {
  301. if sich.dirty {
  302. Err(io::Error::new(
  303. io::ErrorKind::Unsupported,
  304. "SectoredBuf::try_seek failed because it has unwritten data",
  305. ))
  306. } else {
  307. Ok(())
  308. }
  309. })?;
  310. Ok(())
  311. }
  312. }
  313. impl<U, T: AsRef<U>> AsRef<U> for SectoredBuf<T> {
  314. fn as_ref(&self) -> &U {
  315. self.inner.as_ref()
  316. }
  317. }
  318. impl<U, T: AsMut<U>> AsMut<U> for SectoredBuf<T> {
  319. fn as_mut(&mut self) -> &mut U {
  320. self.inner.as_mut()
  321. }
  322. }
  323. impl<T: AsRef<BlockMeta>> Size for SectoredBuf<T> {
  324. /// Returns the size of the block, which the value stored in the block's metadata, not
  325. /// the length of the inner stream.
  326. fn size(&self) -> io::Result<Option<u64>> {
  327. Ok(Some(self.inner.as_ref().body.secrets()?.size))
  328. }
  329. }
  330. impl<T: Read + Write + Seek + MetaAccess> WriteDual for SectoredBuf<T> {
  331. fn write_from<R: Read>(&mut self, mut read: R, mut count: usize) -> io::Result<usize> {
  332. let pos_start = self.pos;
  333. let mut dest = {
  334. let pos = self.buf_pos();
  335. &mut self.buf[pos..]
  336. };
  337. let dest_len = dest.len();
  338. dest = &mut dest[..dest_len.min(count)];
  339. while count > 0 {
  340. if dest.is_empty() {
  341. suppress_err_if_non_zero!(self.pos - pos_start, self.flush());
  342. dest = &mut self.buf[..];
  343. let dest_len = dest.len();
  344. dest = &mut dest[..dest_len.min(count)];
  345. }
  346. let nread = suppress_err_if_non_zero!(self.pos - pos_start, read.read(dest));
  347. if 0 == nread {
  348. break;
  349. }
  350. self.dirty = true;
  351. dest = &mut dest[nread..];
  352. self.pos += nread;
  353. count -= nread;
  354. self.inner.mut_meta_body().access_secrets(|secrets| {
  355. secrets.size = secrets.size.max(self.pos as u64);
  356. Ok(())
  357. })?;
  358. }
  359. Ok(self.pos - pos_start)
  360. }
  361. }
  362. impl<T: Read + Seek + AsRef<BlockMeta>> ReadDual for SectoredBuf<T> {
  363. fn read_into<W: Write>(&mut self, mut write: W, mut count: usize) -> io::Result<usize> {
  364. let pos_start = self.pos;
  365. let mut src = readable_slice!(self)?;
  366. src = &src[..src.len().min(count)];
  367. while count > 0 {
  368. if src.is_empty() {
  369. src = suppress_err_if_non_zero!(self.pos - pos_start, readable_slice!(self));
  370. src = &src[..src.len().min(count)];
  371. // If `src` is still empty, then we've reached the end of the stream.
  372. if src.is_empty() {
  373. break;
  374. }
  375. }
  376. let written = write.write(src)?;
  377. src = &src[written..];
  378. self.pos += written;
  379. count -= written;
  380. }
  381. Ok(self.pos - pos_start)
  382. }
  383. }
  384. impl<T> Positioned for SectoredBuf<T> {
  385. fn pos(&self) -> usize {
  386. self.pos
  387. }
  388. }
  389. }
  390. #[cfg(test)]
  391. mod tests {
  392. use super::*;
  393. use crate::test_helpers::{
  394. integer_array, read_check, write_fill, BtCursor, Randomizer, SectoredCursor,
  395. SECTOR_SZ_DEFAULT,
  396. };
  397. fn make_sectored_buf(sect_sz: usize, sect_ct: usize) -> SectoredBuf<SectoredCursor<Vec<u8>>> {
  398. SectoredBuf::new()
  399. .try_compose(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz))
  400. .expect("compose for sectored buffer failed")
  401. }
  402. #[test]
  403. fn sectored_buf_fill_inner() {
  404. const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
  405. const SECT_CT: usize = 16;
  406. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  407. let sect_sz = sectored.sector_sz();
  408. assert_eq!(0, sect_sz % 16);
  409. let chunk_sz = sect_sz / 16;
  410. let chunk_ct = SECT_CT * 16;
  411. write_fill(&mut sectored, chunk_sz, chunk_ct);
  412. }
  413. #[test]
  414. fn sectored_buf_write_read_sequential() {
  415. const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
  416. const SECT_CT: usize = 16;
  417. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  418. let sect_sz = sectored.sector_sz();
  419. assert_eq!(0, sect_sz % 16);
  420. let chunk_sz = sect_sz / 16;
  421. // We subtract one here so that the underlying buffer is not completely filled. This
  422. // exercises the length limiting capability of the sectored buffer.
  423. let chunk_ct = SECT_CT * 16 - 1;
  424. write_fill(&mut sectored, chunk_sz, chunk_ct);
  425. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  426. read_check(&mut sectored, chunk_sz, chunk_ct);
  427. }
  428. #[test]
  429. fn sectored_buf_len_preserved() {
  430. const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
  431. const SECT_CT: usize = 16;
  432. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  433. let expected = vec![42u8; 12];
  434. // We need to ensure that writing expected will not fill up the buffer in sectored.
  435. assert!(expected.len() < sectored.sector_sz());
  436. sectored.write_all(&expected).expect("write failed");
  437. sectored.flush().expect("flush failed");
  438. let inner = sectored.into_inner();
  439. let mut sectored = SectoredBuf::new()
  440. .try_compose(inner)
  441. .expect("failed to compose sectored buffer");
  442. let mut actual = vec![0u8; expected.len()];
  443. sectored
  444. .fill_buf(actual.as_mut_slice())
  445. .expect("failed to fill actual");
  446. assert_eq!(expected, actual);
  447. }
  448. #[test]
  449. fn sectored_buf_seek() {
  450. let sect_sz = 16usize;
  451. let sect_ct = 16usize;
  452. let cap = sect_sz * sect_ct - std::mem::size_of::<usize>();
  453. let source = {
  454. let mut source = Vec::with_capacity(cap);
  455. source.extend(
  456. std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
  457. .take(cap),
  458. );
  459. source
  460. };
  461. let mut sectored = make_sectored_buf(sect_sz, sect_ct);
  462. sectored.write(&source).expect("write failed");
  463. let mut buf = [0u8; 1];
  464. let end = cap.try_into().expect("cap cannot fit into a u8");
  465. for pos in (0..end).rev() {
  466. sectored
  467. .seek(SeekFrom::Start(pos as u64))
  468. .expect("seek failed");
  469. sectored.read(&mut buf).expect("read failed");
  470. assert_eq!(pos, buf[0]);
  471. }
  472. }
  473. /// Tests that data written can be read from the buffer without an intervening call to `flush`.
  474. #[test]
  475. fn sectored_buf_write_then_read() {
  476. const EXPECTED: &[u8] = b"alpha";
  477. let mut sectored = make_sectored_buf(4096, 1);
  478. sectored.write(EXPECTED).expect("write failed");
  479. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  480. let mut actual = [0u8; EXPECTED.len()];
  481. sectored.read(&mut actual).expect("read failed");
  482. assert_eq!(EXPECTED, actual);
  483. }
  484. #[test]
  485. fn sectored_buf_write_read_random() {
  486. const SECT_SZ: usize = 16;
  487. const SECT_CT: usize = 16;
  488. const CAP: usize = SECT_SZ * SECT_CT - std::mem::size_of::<usize>();
  489. let source = {
  490. let mut expected = Vec::with_capacity(CAP);
  491. expected.extend(
  492. std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
  493. .take(CAP),
  494. );
  495. expected
  496. };
  497. let indices: Vec<(usize, usize)> = {
  498. let rando = Randomizer::new([3u8; Randomizer::HASH.len()]);
  499. let rando2 = Randomizer::new([5u8; Randomizer::HASH.len()]);
  500. rando
  501. .zip(rando2)
  502. .take(SECT_CT)
  503. .map(|(mut first, mut second)| {
  504. first %= source.len();
  505. second &= source.len();
  506. let low = first.min(second);
  507. let high = first.max(second);
  508. (low, high)
  509. })
  510. .collect()
  511. };
  512. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  513. sectored
  514. .write_all(&[0u8; CAP])
  515. .expect("failed to fill sectored");
  516. sectored.flush().expect("flush failed");
  517. for (_k, (low, high)) in indices.iter().enumerate() {
  518. sectored
  519. .seek(SeekFrom::Start(*low as u64))
  520. .expect("seek failed");
  521. let src = &source[*low..*high];
  522. sectored.write(src).expect("write failed");
  523. }
  524. sectored.flush().expect("flush failed");
  525. let mut buf = vec![0u8; CAP];
  526. for (_k, (low, high)) in indices.iter().enumerate() {
  527. sectored
  528. .seek(SeekFrom::Start(*low as u64))
  529. .expect("seek failed");
  530. let actual = &mut buf[*low..*high];
  531. sectored.fill_buf(actual).expect("read failed");
  532. let expected = &source[*low..*high];
  533. assert_eq!(expected, actual);
  534. }
  535. }
  536. #[test]
  537. fn sectored_buf_read_past_end() {
  538. const LEN: usize = 32;
  539. let mut sectored = SectoredBuf::new()
  540. .try_compose(SectoredCursor::new([0u8; LEN], LEN))
  541. .expect("compose failed");
  542. const BUF_LEN: usize = LEN + 1;
  543. sectored.write(&[1u8; BUF_LEN - 1]).expect("write failed");
  544. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  545. let mut buf = [0u8; BUF_LEN];
  546. // Note that buf is one byte longer than the available capacity in the cursor.
  547. sectored.read(&mut buf).expect("read failed");
  548. assert_eq!(&[1u8; BUF_LEN - 1], &buf[..(BUF_LEN - 1)]);
  549. assert_eq!(0u8, buf[BUF_LEN - 1]);
  550. }
  551. /// Tests that the data written in try_compose is actually written back to the underlying stream.
  552. #[test]
  553. fn sectored_buf_write_back() {
  554. let mut sectored = SectoredBuf::new()
  555. .try_compose(SectoredCursor::new(vec![0u8; 24], 16))
  556. .expect("compose failed");
  557. let expected = [1u8; 8];
  558. sectored.write(&expected).expect("first write failed");
  559. sectored.write(&[2u8; 8]).expect("second write failed");
  560. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  561. let mut actual = [0u8; 8];
  562. sectored.read(&mut actual).expect("read failed");
  563. assert_eq!(expected, actual);
  564. }
  565. #[test]
  566. fn sectored_buf_write_past_end() {
  567. const LEN: usize = 8;
  568. let mut sectored = SectoredBuf::new()
  569. .try_compose(SectoredCursor::new(vec![0u8; 0], LEN))
  570. .expect("compos failed");
  571. let expected = [1u8; LEN + 1];
  572. sectored.write(&expected).expect("write failed");
  573. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  574. let mut actual = [0u8; LEN + 1];
  575. sectored.read(&mut actual).expect("read failed");
  576. assert_eq!(expected, actual);
  577. }
  578. #[test]
  579. fn read_into_count_limits_read_bytes() {
  580. const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
  581. let mut sectored = SectoredBuf::new()
  582. .try_compose(SectoredCursor::new(Vec::new(), DATA.len()))
  583. .unwrap();
  584. sectored.write(&DATA).unwrap();
  585. sectored.rewind().unwrap();
  586. let mut actual = BtCursor::new([0u8; DATA.len()]);
  587. let read = sectored.read_into(&mut actual, DATA.len() - 1).unwrap();
  588. assert_eq!(DATA.len() - 1, read);
  589. assert_eq!([0, 1, 2, 3, 4, 5, 6, 0], actual.into_inner());
  590. }
  591. #[test]
  592. fn read_into_read_spans_multiple_sectors() {
  593. const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
  594. let mut sectored = SectoredBuf::new()
  595. .try_compose(SectoredCursor::new(Vec::new(), DATA.len()))
  596. .unwrap();
  597. sectored.write(&DATA).unwrap();
  598. sectored.write(&DATA).unwrap();
  599. sectored.rewind().unwrap();
  600. const ACTUAL_LEN: usize = DATA.len() + DATA.len() / 2;
  601. let mut actual = BtCursor::new([0u8; ACTUAL_LEN]);
  602. let read = sectored.read_into(&mut actual, ACTUAL_LEN).unwrap();
  603. assert_eq!(ACTUAL_LEN, read);
  604. assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3], actual.into_inner());
  605. }
  606. /// Tests that a read asking for more bytes than the number available returns the number
  607. /// available.
  608. #[test]
  609. fn read_into_read_past_len() {
  610. const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
  611. let mut sectored = SectoredBuf::new()
  612. .try_compose(SectoredCursor::new(Vec::new(), DATA.len()))
  613. .unwrap();
  614. sectored.write(&DATA).unwrap();
  615. sectored.rewind().unwrap();
  616. const ACTUAL_LEN: usize = DATA.len() + 1;
  617. let mut actual = BtCursor::new([0u8; ACTUAL_LEN]);
  618. let read = sectored.read_into(&mut actual, ACTUAL_LEN).unwrap();
  619. assert_eq!(DATA.len(), read);
  620. assert_eq!([0, 1, 2, 3, 4, 5, 6, 7, 0], actual.into_inner());
  621. }
  622. #[test]
  623. fn write_from_full_cursor() {
  624. const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
  625. let mut sectored = SectoredBuf::new()
  626. .try_compose(SectoredCursor::new(Vec::new(), DATA.len()))
  627. .unwrap();
  628. let written = sectored
  629. .write_from(BtCursor::new(DATA), DATA.len())
  630. .unwrap();
  631. assert_eq!(DATA.len(), written);
  632. sectored.flush().unwrap();
  633. assert_eq!(&DATA, sectored.into_inner().into_inner().as_slice());
  634. }
  635. #[test]
  636. fn write_from_count_limits_bytes_read() {
  637. const DATA: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
  638. let mut sectored = SectoredBuf::new()
  639. .try_compose(SectoredCursor::new(Vec::new(), DATA.len()))
  640. .unwrap();
  641. let mut cursor = BtCursor::new(DATA);
  642. let written = sectored.write_from(&mut cursor, DATA.len() / 2).unwrap();
  643. assert_eq!(DATA.len() / 2, written);
  644. sectored.flush().unwrap();
  645. assert_eq!(
  646. &[0, 1, 2, 3, 0, 0, 0, 0],
  647. sectored.into_inner().into_inner().as_slice()
  648. );
  649. let mut remaining = Vec::new();
  650. cursor.read_to_end(&mut remaining).unwrap();
  651. assert_eq!(&[4, 5, 6, 7], remaining.as_slice());
  652. }
  653. #[test]
  654. fn write_from_write_spans_multiple_sectors() {
  655. const SECT_SZ: usize = 4;
  656. const DATA: [u8; SECT_SZ + 1] = [0, 1, 2, 3, 4];
  657. let mut sectored = SectoredBuf::new()
  658. .try_compose(SectoredCursor::new(Vec::new(), SECT_SZ))
  659. .unwrap();
  660. let written = sectored
  661. .write_from(BtCursor::new(DATA), DATA.len())
  662. .unwrap();
  663. assert_eq!(DATA.len(), written);
  664. sectored.rewind().unwrap();
  665. let mut actual = Vec::new();
  666. sectored.read_to_end(&mut actual).unwrap();
  667. assert_eq!(&[0, 1, 2, 3, 4], actual.as_slice());
  668. }
  669. #[test]
  670. fn try_seek_to_second_sector() {
  671. const SECT_SZ: usize = 4;
  672. const DATA_LEN: usize = 2 * SECT_SZ;
  673. const DATA: [u8; DATA_LEN] = integer_array::<DATA_LEN>(0);
  674. let mut sectored = SectoredBuf::new()
  675. .try_compose(SectoredCursor::new(Vec::new(), SECT_SZ))
  676. .unwrap();
  677. let written = sectored
  678. .write_from(BtCursor::new(DATA), DATA.len())
  679. .unwrap();
  680. assert_eq!(DATA.len(), written);
  681. sectored.rewind().unwrap();
  682. const OFFSET: u64 = SECT_SZ as u64 + 1;
  683. sectored.try_seek(SeekFrom::Start(OFFSET)).unwrap();
  684. let mut actual = BtCursor::new(Vec::new());
  685. sectored.read_into(&mut actual, SECT_SZ).unwrap();
  686. const EXPECTED_LEN: usize = SECT_SZ - 1;
  687. const EXPECTED: [u8; EXPECTED_LEN] = integer_array::<EXPECTED_LEN>(OFFSET as u8);
  688. assert_eq!(&EXPECTED, actual.into_inner().as_slice());
  689. }
  690. #[test]
  691. fn seek_past_end_is_error() {
  692. const SECT_SZ: usize = 4;
  693. let mut sectored = SectoredBuf::new()
  694. .try_compose(SectoredCursor::new(Vec::new(), SECT_SZ))
  695. .unwrap();
  696. let result = sectored.seek(SeekFrom::Start(1));
  697. let matched = if let Err(err) = result {
  698. io::ErrorKind::InvalidInput == err.kind()
  699. } else {
  700. false
  701. };
  702. assert!(matched);
  703. }
  704. #[test]
  705. fn seek_to_zero_when_empty() {
  706. const SECT_SZ: usize = 4;
  707. let mut sectored = SectoredBuf::new()
  708. .try_compose(SectoredCursor::new(Vec::new(), SECT_SZ))
  709. .unwrap();
  710. let pos = sectored.seek(SeekFrom::Start(0)).unwrap();
  711. assert_eq!(0, pos);
  712. }
  713. #[test]
  714. fn read_into_consumes_remaining_sector() {
  715. // SECT_SZ % 2 is assumed be 0.
  716. const SECT_SZ: usize = 4;
  717. const DATA_LEN: usize = 2 * SECT_SZ;
  718. const DATA: [u8; DATA_LEN] = integer_array::<DATA_LEN>(0);
  719. let mut sectored = SectoredBuf::new()
  720. .try_compose(SectoredCursor::new(Vec::new(), SECT_SZ))
  721. .unwrap();
  722. sectored.write(&DATA).unwrap();
  723. const MID_FIRST: usize = SECT_SZ / 2;
  724. sectored.seek(SeekFrom::Start(MID_FIRST as u64)).unwrap();
  725. // Notice that the `count` argument plus the current `sectored.pos` equals `SECT_SZ`.
  726. // This will cause `pos / sect_sz` to increase by one, but without incrementing
  727. // `buf_start`. This creates a special case that `seek_impl` has to take into account.
  728. sectored
  729. .read_into(&mut BtCursor::new([0u8; MID_FIRST]), MID_FIRST)
  730. .unwrap();
  731. const MID_SECOND: u64 = 3 * SECT_SZ as u64 / 2;
  732. sectored.try_seek(SeekFrom::Start(MID_SECOND)).unwrap();
  733. const EXPECTED_LEN: usize = SECT_SZ / 2;
  734. const EXPECTED: [u8; EXPECTED_LEN] = integer_array::<EXPECTED_LEN>(MID_SECOND as u8);
  735. let mut actual = BtCursor::new(Vec::new());
  736. let nread = sectored.read_into(&mut actual, EXPECTED_LEN).unwrap();
  737. assert_eq!(EXPECTED_LEN, nread);
  738. assert_eq!(&EXPECTED, actual.into_inner().as_slice());
  739. }
  740. }