trailered.rs 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. // SPDX-License-Identifier: AGPL-3.0-or-later
  2. pub use private::Trailered;
  3. mod private {
  4. use std::{
  5. io::{self, BufReader, Seek, SeekFrom},
  6. marker::PhantomData,
  7. };
  8. use btserde::{read_from, write_to};
  9. use positioned_io::{ReadAt, Size, WriteAt};
  10. use serde::{de::DeserializeOwned, Serialize};
  11. use crate::{
  12. bterr, BoxInIoErr, Cursor, Decompose, FlushMeta, Result, Sectored, SizeExt, WriteInteg,
  13. };
  14. /// A struct which wraps a stream and which writes a trailing data structure to it when flushed.
  15. pub struct Trailered<T, D> {
  16. inner: T,
  17. body_len: u64,
  18. phantom: PhantomData<D>,
  19. write_buf: Vec<u8>,
  20. }
  21. impl<T: ReadAt + Size, D: DeserializeOwned> Trailered<T, D> {
  22. pub fn empty(inner: T) -> Trailered<T, D> {
  23. Trailered {
  24. inner,
  25. body_len: 0,
  26. phantom: PhantomData,
  27. write_buf: Vec::new(),
  28. }
  29. }
  30. /// Creates a new `Trailered<T>` containing the given `T`. This method requires that the given
  31. /// stream is either empty, or contains a valid serialization of `D` and the offset at which
  32. /// `D` is stored.
  33. pub fn new(inner: T) -> Result<(Trailered<T, D>, Option<D>)> {
  34. let end = inner.size_or_err()?;
  35. if 0 == end {
  36. return Ok((Self::empty(inner), None));
  37. }
  38. let mut reader = BufReader::new(Cursor::new(inner));
  39. let offset: i64 = std::mem::size_of::<i64>() as i64;
  40. if end < offset as u64 {
  41. return Err(bterr!("inner stream is non-empty but too small"));
  42. }
  43. reader.seek(SeekFrom::End(-offset))?;
  44. let offset: i64 = read_from(&mut reader)?;
  45. if end < offset.unsigned_abs() {
  46. return Err(bterr!("inner stream is non-empty but too small"));
  47. }
  48. let body_len = reader.seek(SeekFrom::End(offset))?;
  49. let trailer: D = read_from(&mut reader)?;
  50. let inner = reader.into_inner().into_inner();
  51. Ok((
  52. Trailered {
  53. inner,
  54. body_len,
  55. phantom: PhantomData,
  56. write_buf: Vec::new(),
  57. },
  58. Some(trailer),
  59. ))
  60. }
  61. }
  62. impl<T, D> Trailered<T, D> {
  63. fn update_body_len(&mut self, pos: u64, written: usize) -> usize {
  64. let new_pos = pos + written as u64;
  65. self.body_len = self.body_len.max(new_pos);
  66. written
  67. }
  68. }
  69. impl<T: ReadAt, D> ReadAt for Trailered<T, D> {
  70. fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result<usize> {
  71. if pos > self.body_len {
  72. return Err(bterr!("pos {pos} is past the end of body ({})", self.body_len).into());
  73. }
  74. let available_u64 = self.body_len - pos;
  75. let available: usize = available_u64.try_into().box_err()?;
  76. let limit = buf.len().min(available);
  77. self.inner.read_at(pos, &mut buf[..limit])
  78. }
  79. }
  80. impl<T: WriteAt, D: Serialize> Trailered<T, D> {
  81. pub fn write_at(&mut self, pos: u64, buf: &[u8]) -> io::Result<usize> {
  82. let written = self.inner.write_at(pos, buf)?;
  83. Ok(self.update_body_len(pos, written))
  84. }
  85. /// Writes the trailer followed by the offset relative to the end of the inner stream where
  86. /// the trailer was written. See the `Trailered::new` method for how this is read out of
  87. /// the inner stream.
  88. fn write_trailer(&mut self, trailer: &D) -> io::Result<()> {
  89. self.write_buf.clear();
  90. write_to(trailer, &mut self.write_buf)?;
  91. // This is the length of `write_buf` after writing an additional `i64` into it.
  92. let write_buf_len = (self.write_buf.len() + std::mem::size_of::<i64>()) as u64;
  93. let offset: i64 = write_buf_len.try_into().box_err()?;
  94. write_to(&(-offset), &mut self.write_buf)?;
  95. self.inner.write_all_at(self.body_len, &self.write_buf)?;
  96. Ok(())
  97. }
  98. pub fn flush(&mut self, trailer: &D) -> io::Result<()> {
  99. self.write_trailer(trailer)?;
  100. self.inner.flush()
  101. }
  102. }
  103. impl<T: WriteInteg + Size, D: Serialize> Trailered<T, D> {
  104. pub fn flush_integ(&mut self, trailer: &D, integrity: &[u8]) -> io::Result<()> {
  105. self.write_trailer(trailer)?;
  106. self.inner.flush_integ(integrity)
  107. }
  108. }
  109. impl<T, D> Decompose<T> for Trailered<T, D> {
  110. fn into_inner(self) -> T {
  111. self.inner
  112. }
  113. }
  114. impl<U, T: AsRef<U>, D> AsRef<U> for Trailered<T, D> {
  115. fn as_ref(&self) -> &U {
  116. self.inner.as_ref()
  117. }
  118. }
  119. impl<U, T: AsMut<U>, D> AsMut<U> for Trailered<T, D> {
  120. fn as_mut(&mut self) -> &mut U {
  121. self.inner.as_mut()
  122. }
  123. }
  124. impl<T: FlushMeta, D> Trailered<T, D> {
  125. pub fn flush_meta(&mut self) -> Result<()> {
  126. self.inner.flush_meta()
  127. }
  128. }
  129. impl<T: Sectored, D> Sectored for Trailered<T, D> {
  130. fn sector_sz(&self) -> usize {
  131. self.inner.sector_sz()
  132. }
  133. }
  134. impl<T, D> Size for Trailered<T, D> {
  135. fn size(&self) -> io::Result<Option<u64>> {
  136. Ok(Some(self.body_len))
  137. }
  138. }
  139. }
  140. #[cfg(test)]
  141. mod tests {
  142. use super::*;
  143. use positioned_io::ReadAt;
  144. use crate::{test_helpers::BtCursor as Cursor, Decompose};
  145. /// Tests that a new `Trailered<T>` can be created from an empty stream.
  146. #[test]
  147. fn trailered_new_empty() {
  148. let cursor = Cursor::new(Vec::new());
  149. let (_, trailer): (_, Option<String>) =
  150. Trailered::new(cursor).expect("Trailered::new failed");
  151. assert_eq!(None, trailer);
  152. }
  153. /// Tests that an error is returned when an attempt is made to create a `Trailered<T>` from a
  154. /// non-empty stream which is too short.
  155. #[test]
  156. fn trailered_new_inner_too_short_is_error() {
  157. let cursor = Cursor::new([0u8; 5]);
  158. let result = Trailered::<_, u128>::new(cursor);
  159. assert!(result.is_err())
  160. }
  161. /// Checks that the trailer is persisted to the inner stream.
  162. #[test]
  163. fn trailered_trailer_persisted() {
  164. const EXPECTED: &str = "Everyone deserves to be remembered,";
  165. let cursor = {
  166. let cursor = Cursor::new(Vec::new());
  167. let (mut trailered, trailer) =
  168. Trailered::<_, String>::new(cursor).expect("Trailered::new failed");
  169. assert!(trailer.is_none());
  170. trailered
  171. .flush(&EXPECTED.to_string())
  172. .expect("flush failed");
  173. trailered.into_inner()
  174. };
  175. let (_, trailer) = Trailered::<_, String>::new(cursor).expect("Trailered::new failed");
  176. assert_eq!(EXPECTED, trailer.unwrap());
  177. }
  178. #[test]
  179. fn trailered_written_data_persisted() {
  180. const EXPECTED: &[u8] = b"and every life has something to teach us.";
  181. let cursor = {
  182. let (mut trailered, _) = Trailered::<_, u8>::new(Cursor::new(Vec::new()))
  183. .expect("failed to create first trailered");
  184. trailered.write_at(0, EXPECTED).expect("write failed");
  185. trailered.flush(&1).expect("flush failed");
  186. trailered.into_inner()
  187. };
  188. let (trailered, _) =
  189. Trailered::<_, u8>::new(cursor).expect("failed to created second trailered");
  190. let mut actual = vec![0u8; EXPECTED.len()];
  191. trailered.read_at(0, &mut actual).expect("read failed");
  192. assert_eq!(EXPECTED, actual);
  193. }
  194. /// Tests that a read past the end of the body in a `Trailered<T>` is not allowed.
  195. #[test]
  196. fn trailered_read_limited_to_body_len() {
  197. const EXPECTED: &[u8] = &[1, 1, 1, 1, 1, 0, 0, 0];
  198. let (mut trailered, ..) =
  199. Trailered::new(Cursor::new(Vec::new())).expect("failed to create Trailered");
  200. trailered.write_at(0, &[1u8; 5]).expect("write failed");
  201. trailered.flush(&1u8).expect("flush failed");
  202. let mut actual = vec![0u8; EXPECTED.len()];
  203. trailered.read_at(0, &mut actual).expect("read failed");
  204. assert_eq!(EXPECTED, actual);
  205. }
  206. }