accessor.rs 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. use positioned_io::{ReadAt, Size, WriteAt};
  2. use std::io::{Read, Seek, SeekFrom, Write};
  3. use crate::{
  4. sectored_buf::SectoredBuf, BlockMeta, Cursor, Decompose, FlushMeta, MetaAccess, Positioned,
  5. ReadDual, Result, SecretStream, Sectored, Split, TryCompose, TrySeek, WriteDual,
  6. };
  7. pub use private::Accessor;
  8. mod private {
  9. use super::*;
  10. pub struct Accessor<T: Size> {
  11. inner: SectoredBuf<SecretStream<Cursor<T>>>,
  12. }
  13. impl<T: ReadAt + Sectored + AsRef<BlockMeta> + Size> Accessor<T> {
  14. pub fn new(inner: T) -> Result<Accessor<T>> {
  15. let meta: &BlockMeta = inner.as_ref();
  16. let key = meta.body.block_key()?.clone();
  17. let inner = SecretStream::new(key).try_compose(Cursor::new(inner))?;
  18. Ok(Self {
  19. inner: SectoredBuf::new().try_compose(inner)?,
  20. })
  21. }
  22. }
  23. impl<T: Size> Accessor<T> {
  24. pub fn get_ref(&self) -> &T {
  25. self.inner.get_ref().get_ref().get_ref()
  26. }
  27. pub fn get_mut(&mut self) -> &mut T {
  28. self.inner.get_mut().get_mut().get_mut()
  29. }
  30. }
  31. impl<T: Size + ReadAt + AsRef<BlockMeta>> Accessor<T> {
  32. pub fn get_buf<'a>(&'a self, offset: u64, size: u64) -> Result<&'a [u8]> {
  33. self.inner.get_buf(offset, size)
  34. }
  35. }
  36. impl<T: ReadAt + AsRef<BlockMeta> + Size> Read for Accessor<T> {
  37. fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
  38. self.inner.read(buf)
  39. }
  40. }
  41. impl<T: ReadAt + WriteAt + MetaAccess> Write for Accessor<T> {
  42. fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
  43. self.inner.write(buf)
  44. }
  45. fn flush(&mut self) -> std::io::Result<()> {
  46. self.inner.flush()
  47. }
  48. }
  49. impl<T: ReadAt + WriteAt + MetaAccess> Seek for Accessor<T> {
  50. fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
  51. self.inner.seek(pos)
  52. }
  53. }
  54. impl<U, T: AsRef<U> + Size> AsRef<U> for Accessor<T> {
  55. fn as_ref(&self) -> &U {
  56. self.inner.get_ref().as_ref()
  57. }
  58. }
  59. impl<U, T: AsMut<U> + Size> AsMut<U> for Accessor<T> {
  60. fn as_mut(&mut self) -> &mut U {
  61. self.inner.get_mut().as_mut()
  62. }
  63. }
  64. impl<T: Size> Decompose<T> for Accessor<T> {
  65. fn into_inner(self) -> T {
  66. self.inner.into_inner().into_inner().into_inner()
  67. }
  68. }
  69. impl<T: FlushMeta + Size> FlushMeta for Accessor<T> {
  70. fn flush_meta(&mut self) -> Result<()> {
  71. self.get_mut().flush_meta()
  72. }
  73. }
  74. impl<T: Size> Sectored for Accessor<T> {
  75. fn sector_sz(&self) -> usize {
  76. self.inner.sector_sz()
  77. }
  78. }
  79. impl<T: Size> Size for Accessor<T> {
  80. fn size(&self) -> std::io::Result<Option<u64>> {
  81. self.inner.get_ref().size()
  82. }
  83. }
  84. impl<T: ReadAt + WriteAt + MetaAccess> WriteDual for Accessor<T> {
  85. fn write_from<R: Read>(&mut self, read: R, count: usize) -> std::io::Result<usize> {
  86. self.inner.write_from(read, count)
  87. }
  88. }
  89. impl<T: ReadAt + AsRef<BlockMeta> + Size> ReadDual for Accessor<T> {
  90. fn read_into<W: Write>(&mut self, write: W, count: usize) -> std::io::Result<usize> {
  91. self.inner.read_into(write, count)
  92. }
  93. }
  94. impl<T: Size> Positioned for Accessor<T> {
  95. fn pos(&self) -> usize {
  96. self.inner.pos()
  97. }
  98. }
  99. impl<T: ReadAt + AsRef<BlockMeta> + Size> TrySeek for Accessor<T> {
  100. fn try_seek(&mut self, seek_from: SeekFrom) -> std::io::Result<()> {
  101. self.inner.try_seek(seek_from)
  102. }
  103. }
  104. impl<T: Size> Split<Accessor<&'static [u8]>, T> for Accessor<T> {
  105. fn split(self) -> (Accessor<&'static [u8]>, T) {
  106. let (sectored_buf, inner) = self.inner.split();
  107. let (secret_stream, inner) = inner.split();
  108. let (cursor, inner) = inner.split();
  109. let new_inner =
  110. SectoredBuf::combine(sectored_buf, SecretStream::combine(secret_stream, cursor));
  111. (Accessor { inner: new_inner }, inner)
  112. }
  113. fn combine(left: Accessor<&'static [u8]>, right: T) -> Self {
  114. let (sectored_buf, inner) = left.inner.split();
  115. let (secret_stream, inner) = inner.split();
  116. let (cursor, ..) = inner.split();
  117. let new_inner = SectoredBuf::combine(
  118. sectored_buf,
  119. SecretStream::combine(secret_stream, Cursor::combine(cursor, right)),
  120. );
  121. Accessor { inner: new_inner }
  122. }
  123. }
  124. }
  125. #[cfg(test)]
  126. mod test {
  127. use super::*;
  128. use crate::test_helpers::{make_block_with, node_creds};
  129. #[test]
  130. fn can_wrap_block_ref() {
  131. let block = make_block_with(node_creds())
  132. .into_inner()
  133. .into_inner()
  134. .into_inner();
  135. let mut accessor = Accessor::new(block).expect("failed to wrap block");
  136. const EXPECTED: &[u8] = &[1u8; 8];
  137. accessor.write_all(EXPECTED).expect("write failed");
  138. accessor.flush().expect("flush failed");
  139. accessor.rewind().expect("rewind failed");
  140. let block = accessor.into_inner();
  141. let mut wrapped = Accessor::new(&block).expect("failed to wrap block reference");
  142. let mut actual = [0u8; EXPECTED.len()];
  143. wrapped.read(&mut actual).expect("read failed");
  144. assert_eq!(EXPECTED, actual);
  145. }
  146. }