sectored_buf.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. pub use private::SectoredBuf;
  2. mod private {
  3. use log::{error, warn};
  4. use std::io::{self, Read, Seek, SeekFrom, Write};
  5. use crate::{
  6. bterr, Block, BlockError, BlockMeta, BoxInIoErr, Decompose, MetaAccess, ReadExt, Result,
  7. Sectored, TryCompose,
  8. };
  9. /// A stream which buffers writes and read such that the inner stream only sees reads and writes
  10. /// of sector length buffers.
  11. pub struct SectoredBuf<T> {
  12. inner: T,
  13. buf: Vec<u8>,
  14. /// The offset into the inner stream which the zero offset byte in `buf` corresponds to.
  15. buf_start: usize,
  16. /// Indicates if the contents of `buf` have been written to, and so whether `buf` needs to
  17. /// be written back to `inner` before it is refilled.
  18. dirty: bool,
  19. /// The current position of this stream, expressed as an offset into the inner stream.
  20. pos: usize,
  21. }
  22. impl SectoredBuf<()> {
  23. pub fn new() -> SectoredBuf<()> {
  24. SectoredBuf {
  25. inner: (),
  26. buf: Vec::new(),
  27. buf_start: 0,
  28. dirty: false,
  29. pos: 0,
  30. }
  31. }
  32. }
  33. impl<T> SectoredBuf<T> {
  34. /// Returns the offset into the internal buffer that corresponds to the current position.
  35. fn buf_pos(&self) -> usize {
  36. self.pos - self.buf_start
  37. }
  38. /// Returns the index of the sector which is currently loaded into the buffer.
  39. fn sector_index(&self, pos: u64) -> u64 {
  40. pos / (self.sector_sz() as u64)
  41. }
  42. }
  43. impl<T: MetaAccess> SectoredBuf<T> {
  44. fn len(&self) -> usize {
  45. self.inner
  46. .meta_body()
  47. .secrets()
  48. .unwrap()
  49. .size
  50. .try_into()
  51. .unwrap()
  52. }
  53. /// Returns one more than the last index in the internal buffer which can be read.
  54. fn buf_end(&self) -> usize {
  55. let limit = self.len().min(self.buf_start + self.sector_sz());
  56. limit - self.buf_start
  57. }
  58. }
  59. impl<T: Read + Seek + MetaAccess> SectoredBuf<T> {
  60. /// Fills the internal buffer by reading from the inner stream at the current position
  61. /// and updates `self.buf_start` with the position read from.
  62. fn fill_internal_buf(&mut self) -> Result<usize> {
  63. self.buf_start = self.inner.stream_position()?.try_into().box_err()?;
  64. let read_bytes = if self.buf_start < self.len() {
  65. let read_bytes = self.inner.fill_buf(&mut self.buf)?;
  66. if read_bytes < self.buf.len() {
  67. return Err(bterr!(BlockError::IncorrectSize {
  68. expected: self.buf.len(),
  69. actual: read_bytes,
  70. }));
  71. }
  72. read_bytes
  73. } else {
  74. 0
  75. };
  76. Ok(read_bytes)
  77. }
  78. }
  79. impl Default for SectoredBuf<()> {
  80. fn default() -> Self {
  81. Self::new()
  82. }
  83. }
  84. impl<T> Decompose<T> for SectoredBuf<T> {
  85. fn into_inner(self) -> T {
  86. self.inner
  87. }
  88. }
  89. impl<T: Sectored + Read + Seek + MetaAccess> TryCompose<T, SectoredBuf<T>> for SectoredBuf<()> {
  90. type Error = crate::Error;
  91. fn try_compose(self, inner: T) -> Result<SectoredBuf<T>> {
  92. let sect_sz = inner.sector_sz();
  93. let mut sectored = SectoredBuf {
  94. inner,
  95. buf: self.buf,
  96. buf_start: 0,
  97. dirty: false,
  98. pos: 0,
  99. };
  100. sectored.buf.resize(sect_sz, 0);
  101. sectored.inner.seek(SeekFrom::Start(0))?;
  102. sectored.fill_internal_buf()?;
  103. Ok(sectored)
  104. }
  105. }
  106. impl<T> Sectored for SectoredBuf<T> {
  107. fn sector_sz(&self) -> usize {
  108. self.buf.len()
  109. }
  110. }
  111. impl<T: Seek + Read + Write + MetaAccess> Write for SectoredBuf<T> {
  112. fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
  113. let src_len_start = src.len();
  114. let mut dest = {
  115. let buf_pos = self.buf_pos();
  116. &mut self.buf[buf_pos..]
  117. };
  118. while !src.is_empty() {
  119. if dest.is_empty() {
  120. if let Err(err) = self.flush() {
  121. error!("A call to SectoredBuf::flush returned an error: {}", err);
  122. break;
  123. }
  124. dest = &mut self.buf[..];
  125. }
  126. let sz = src.len().min(dest.len());
  127. dest[..sz].copy_from_slice(&src[..sz]);
  128. dest = &mut dest[sz..];
  129. src = &src[sz..];
  130. self.dirty = sz > 0;
  131. self.pos += sz;
  132. self.inner.mut_meta_body().access_secrets(|secrets| {
  133. secrets.size = secrets.size.max(self.pos as u64);
  134. Ok(())
  135. })?;
  136. }
  137. Ok(src_len_start - src.len())
  138. }
  139. fn flush(&mut self) -> io::Result<()> {
  140. if !self.dirty {
  141. return Ok(());
  142. }
  143. // Write out the contents of the buffer.
  144. let sect_sz: u64 = self.sector_sz().try_into().box_err()?;
  145. let inner_pos = self.inner.stream_position()?;
  146. let inner_pos_usize: usize = inner_pos.try_into().box_err()?;
  147. let is_new_sector = self.pos > inner_pos_usize;
  148. let is_full = (self.buf.len() - self.buf_pos()) == 0;
  149. let (seek_to, fill_internal_buf) = if is_new_sector {
  150. if is_full {
  151. (inner_pos + sect_sz, true)
  152. } else {
  153. (inner_pos, true)
  154. }
  155. } else {
  156. // The contents of the buffer were previously read from inner, so we write the
  157. // updated contents to the same offset.
  158. let sect_start: u64 = self.buf_start.try_into().box_err()?;
  159. self.inner.seek(SeekFrom::Start(sect_start))?;
  160. if is_full {
  161. (inner_pos, true)
  162. } else {
  163. // This is the one case were we don't have to refill the internal buffer.
  164. (inner_pos - sect_sz, false)
  165. }
  166. };
  167. self.inner.write_all(&self.buf)?;
  168. self.inner.flush()?;
  169. // Seek to the next position.
  170. self.inner.seek(SeekFrom::Start(seek_to))?;
  171. if fill_internal_buf {
  172. self.fill_internal_buf()?;
  173. }
  174. self.dirty = false;
  175. Ok(())
  176. }
  177. }
  178. impl<T: Read + Seek + MetaAccess> Read for SectoredBuf<T> {
  179. fn read(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
  180. if self.pos == self.len() {
  181. return Ok(0);
  182. }
  183. let dest_len_start = dest.len();
  184. let mut src = {
  185. let start = self.buf_pos();
  186. let end = self.buf_end();
  187. &self.buf[start..end]
  188. };
  189. while !dest.is_empty() {
  190. if src.is_empty() {
  191. if self.pos >= self.len() {
  192. break;
  193. }
  194. let byte_ct = match self.fill_internal_buf() {
  195. Ok(byte_ct) => byte_ct,
  196. Err(err) => {
  197. warn!("SectoredBuf::full_internal_buf returned an error: {}", err);
  198. break;
  199. }
  200. };
  201. if 0 == byte_ct {
  202. break;
  203. }
  204. src = &self.buf[..byte_ct];
  205. }
  206. let sz = src.len().min(dest.len());
  207. dest[..sz].copy_from_slice(&src[..sz]);
  208. dest = &mut dest[sz..];
  209. src = &src[sz..];
  210. self.pos += sz;
  211. }
  212. Ok(dest_len_start - dest.len())
  213. }
  214. }
  215. impl<T: Seek + Read + Write + MetaAccess> Seek for SectoredBuf<T> {
  216. fn seek(&mut self, seek_from: SeekFrom) -> io::Result<u64> {
  217. let inner_pos = self.inner.stream_position()?;
  218. let inner_pos_new = match seek_from {
  219. SeekFrom::Start(rel_start) => rel_start,
  220. SeekFrom::Current(rel_curr) => {
  221. if rel_curr > 0 {
  222. inner_pos + rel_curr as u64
  223. } else {
  224. inner_pos - rel_curr as u64
  225. }
  226. }
  227. SeekFrom::End(_) => {
  228. return Err(io::Error::new(
  229. io::ErrorKind::Unsupported,
  230. "seeking relative to the end of the stream is not supported",
  231. ))
  232. }
  233. };
  234. let sect_index = self.sector_index(inner_pos);
  235. let sect_index_new = self.sector_index(inner_pos_new);
  236. let pos: u64 = self.pos.try_into().box_err()?;
  237. if sect_index != sect_index_new || pos == inner_pos {
  238. self.flush()?;
  239. let sect_sz: u64 = self.sector_sz().try_into().box_err()?;
  240. let seek_to = sect_index_new * sect_sz;
  241. self.inner.seek(SeekFrom::Start(seek_to))?;
  242. self.fill_internal_buf()?;
  243. }
  244. self.pos = inner_pos_new.try_into().box_err()?;
  245. Ok(inner_pos_new)
  246. }
  247. }
  248. impl<T: AsRef<BlockMeta>> AsRef<BlockMeta> for SectoredBuf<T> {
  249. fn as_ref(&self) -> &BlockMeta {
  250. self.inner.as_ref()
  251. }
  252. }
  253. impl<T: AsMut<BlockMeta>> AsMut<BlockMeta> for SectoredBuf<T> {
  254. fn as_mut(&mut self) -> &mut BlockMeta {
  255. self.inner.as_mut()
  256. }
  257. }
  258. impl<T: MetaAccess> MetaAccess for SectoredBuf<T> {}
  259. impl<T: Block> Block for SectoredBuf<T> {
  260. fn flush_meta(&mut self) -> Result<()> {
  261. self.inner.flush_meta()
  262. }
  263. }
  264. }
  265. #[cfg(test)]
  266. mod tests {
  267. use std::io::{Read, Seek, SeekFrom, Write};
  268. use crate::{
  269. test_helpers::{read_check, write_fill, Randomizer, SectoredCursor},
  270. Decompose, ReadExt, Sectored, TryCompose, SECTOR_SZ_DEFAULT,
  271. };
  272. use super::*;
  273. fn make_sectored_buf(sect_sz: usize, sect_ct: usize) -> SectoredBuf<SectoredCursor<Vec<u8>>> {
  274. SectoredBuf::new()
  275. .try_compose(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz))
  276. .expect("compose for sectored buffer failed")
  277. }
  278. #[test]
  279. fn sectored_buf_fill_inner() {
  280. const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
  281. const SECT_CT: usize = 16;
  282. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  283. let sect_sz = sectored.sector_sz();
  284. assert_eq!(0, sect_sz % 16);
  285. let chunk_sz = sect_sz / 16;
  286. let chunk_ct = SECT_CT * 16;
  287. write_fill(&mut sectored, chunk_sz, chunk_ct);
  288. }
  289. #[test]
  290. fn sectored_buf_write_read_sequential() {
  291. const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
  292. const SECT_CT: usize = 16;
  293. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  294. let sect_sz = sectored.sector_sz();
  295. assert_eq!(0, sect_sz % 16);
  296. let chunk_sz = sect_sz / 16;
  297. // We subtract one here so that the underlying buffer is not completely filled. This
  298. // exercises the length limiting capability of the sectored buffer.
  299. let chunk_ct = SECT_CT * 16 - 1;
  300. write_fill(&mut sectored, chunk_sz, chunk_ct);
  301. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  302. read_check(&mut sectored, chunk_sz, chunk_ct);
  303. }
  304. #[test]
  305. fn sectored_buf_len_preserved() {
  306. const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
  307. const SECT_CT: usize = 16;
  308. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  309. let expected = vec![42u8; 12];
  310. // We need to ensure that writing expected will not fill up the buffer in sectored.
  311. assert!(expected.len() < sectored.sector_sz());
  312. sectored.write_all(&expected).expect("write failed");
  313. sectored.flush().expect("flush failed");
  314. let inner = sectored.into_inner();
  315. let mut sectored = SectoredBuf::new()
  316. .try_compose(inner)
  317. .expect("failed to compose sectored buffer");
  318. let mut actual = vec![0u8; expected.len()];
  319. sectored
  320. .fill_buf(actual.as_mut_slice())
  321. .expect("failed to fill actual");
  322. assert_eq!(expected, actual);
  323. }
  324. #[test]
  325. fn sectored_buf_seek() {
  326. let sect_sz = 16usize;
  327. let sect_ct = 16usize;
  328. let cap = sect_sz * sect_ct - std::mem::size_of::<usize>();
  329. let source = {
  330. let mut source = Vec::with_capacity(cap);
  331. source.extend(
  332. std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
  333. .take(cap),
  334. );
  335. source
  336. };
  337. let mut sectored = make_sectored_buf(sect_sz, sect_ct);
  338. sectored.write(&source).expect("write failed");
  339. let mut buf = [0u8; 1];
  340. let end = cap.try_into().expect("cap cannot fit into a u8");
  341. for pos in (0..end).rev() {
  342. sectored
  343. .seek(SeekFrom::Start(pos as u64))
  344. .expect("seek failed");
  345. sectored.read(&mut buf).expect("read failed");
  346. assert_eq!(pos, buf[0]);
  347. }
  348. }
  349. /// Tests that data written can be read from the buffer without an intervening call to `flush`.
  350. #[test]
  351. fn sectored_buf_write_then_read() {
  352. const EXPECTED: &[u8] = b"alpha";
  353. let mut sectored = make_sectored_buf(4096, 1);
  354. sectored.write(EXPECTED).expect("write failed");
  355. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  356. let mut actual = [0u8; EXPECTED.len()];
  357. sectored.read(&mut actual).expect("read failed");
  358. assert_eq!(EXPECTED, actual);
  359. }
  360. #[test]
  361. fn sectored_buf_write_read_random() {
  362. const SECT_SZ: usize = 16;
  363. const SECT_CT: usize = 16;
  364. const CAP: usize = SECT_SZ * SECT_CT - std::mem::size_of::<usize>();
  365. let source = {
  366. let mut expected = Vec::with_capacity(CAP);
  367. expected.extend(
  368. std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
  369. .take(CAP),
  370. );
  371. expected
  372. };
  373. let indices: Vec<(usize, usize)> = {
  374. let rando = Randomizer::new([3u8; Randomizer::HASH.len()]);
  375. let rando2 = Randomizer::new([5u8; Randomizer::HASH.len()]);
  376. rando
  377. .zip(rando2)
  378. .take(SECT_CT)
  379. .map(|(mut first, mut second)| {
  380. first %= source.len();
  381. second &= source.len();
  382. let low = first.min(second);
  383. let high = first.max(second);
  384. (low, high)
  385. })
  386. .collect()
  387. };
  388. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  389. sectored
  390. .write_all(&[0u8; CAP])
  391. .expect("failed to fill sectored");
  392. sectored.flush().expect("flush failed");
  393. for (_k, (low, high)) in indices.iter().enumerate() {
  394. sectored
  395. .seek(SeekFrom::Start(*low as u64))
  396. .expect("seek failed");
  397. let src = &source[*low..*high];
  398. sectored.write(src).expect("write failed");
  399. }
  400. sectored.flush().expect("flush failed");
  401. let mut buf = vec![0u8; CAP];
  402. for (_k, (low, high)) in indices.iter().enumerate() {
  403. sectored
  404. .seek(SeekFrom::Start(*low as u64))
  405. .expect("seek failed");
  406. let actual = &mut buf[*low..*high];
  407. sectored.fill_buf(actual).expect("read failed");
  408. let expected = &source[*low..*high];
  409. assert_eq!(expected, actual);
  410. }
  411. }
  412. #[test]
  413. fn sectored_buf_read_past_end() {
  414. const LEN: usize = 32;
  415. let mut sectored = SectoredBuf::new()
  416. .try_compose(SectoredCursor::new([0u8; LEN], LEN))
  417. .expect("compose failed");
  418. const BUF_LEN: usize = LEN + 1;
  419. sectored.write(&[1u8; BUF_LEN - 1]).expect("write failed");
  420. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  421. let mut buf = [0u8; BUF_LEN];
  422. // Note that buf is one byte longer than the available capacity in the cursor.
  423. sectored.read(&mut buf).expect("read failed");
  424. assert_eq!(&[1u8; BUF_LEN - 1], &buf[..(BUF_LEN - 1)]);
  425. assert_eq!(0u8, buf[BUF_LEN - 1]);
  426. }
  427. /// Tests that the data written in try_compose is actually written back to the underlying stream.
  428. #[test]
  429. fn sectored_buf_write_back() {
  430. let mut sectored = SectoredBuf::new()
  431. .try_compose(SectoredCursor::new(vec![0u8; 24], 16))
  432. .expect("compose failed");
  433. let expected = [1u8; 8];
  434. sectored.write(&expected).expect("first write failed");
  435. sectored.write(&[2u8; 8]).expect("second write failed");
  436. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  437. let mut actual = [0u8; 8];
  438. sectored.read(&mut actual).expect("read failed");
  439. assert_eq!(expected, actual);
  440. }
  441. #[test]
  442. fn sectored_buf_write_past_end() {
  443. const LEN: usize = 8;
  444. let mut sectored = SectoredBuf::new()
  445. .try_compose(SectoredCursor::new(vec![0u8; 0], LEN))
  446. .expect("compos failed");
  447. let expected = [1u8; LEN + 1];
  448. sectored.write(&expected).expect("write failed");
  449. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  450. let mut actual = [0u8; LEN + 1];
  451. sectored.read(&mut actual).expect("read failed");
  452. assert_eq!(expected, actual);
  453. }
  454. }