lib.rs 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212
  1. // The dead code warnings create too much noise during wire-framing.
  2. // TODO: Delete this prior to release.
  3. #![allow(dead_code)]
  4. #[cfg(test)]
  5. mod test_helpers;
  6. #[cfg(test)]
  7. mod serde_tests;
  8. #[macro_use]
  9. extern crate static_assertions;
  10. use brotli::{CompressorWriter, Decompressor};
  11. use btserde::{self, read_from, write_to};
  12. mod crypto;
  13. use crypto::{AsymKeyPub, Cryptotext, Hash, HashKind, Sign, Signature, SymKey};
  14. use log::error;
  15. use serde::{Deserialize, Serialize};
  16. use serde_big_array::BigArray;
  17. use std::{
  18. collections::HashMap,
  19. convert::{Infallible, TryFrom},
  20. fmt::{self, Display, Formatter},
  21. fs::{File, OpenOptions},
  22. hash::Hash as Hashable,
  23. io::{self, Read, Seek, SeekFrom, Write},
  24. ops::{Add, Sub},
  25. time::{Duration, SystemTime},
  26. };
  27. #[derive(Debug)]
  28. enum Error {
  29. Io(std::io::Error),
  30. Serde(btserde::Error),
  31. Crypto(crypto::Error),
  32. IncorrectSize { expected: usize, actual: usize },
  33. Custom(Box<dyn std::fmt::Debug + Send + Sync>),
  34. }
  35. impl Error {
  36. fn custom<E: std::fmt::Debug + Send + Sync + 'static>(err: E) -> Error {
  37. Error::Custom(Box::new(err))
  38. }
  39. }
  40. impl Display for Error {
  41. fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
  42. match self {
  43. Error::Io(err) => err.fmt(f),
  44. Error::Serde(err) => err.fmt(f),
  45. Error::Crypto(err) => err.fmt(f),
  46. Error::IncorrectSize { expected, actual } => {
  47. write!(f, "incorrect size {}, expected {}", actual, expected)
  48. }
  49. Error::Custom(err) => err.fmt(f),
  50. }
  51. }
  52. }
  53. impl std::error::Error for Error {}
  54. impl From<std::io::Error> for Error {
  55. fn from(err: std::io::Error) -> Self {
  56. Error::Io(err)
  57. }
  58. }
  59. impl From<Error> for std::io::Error {
  60. fn from(err: Error) -> Self {
  61. io::Error::new(io::ErrorKind::Other, err)
  62. }
  63. }
  64. impl From<btserde::Error> for Error {
  65. fn from(err: btserde::Error) -> Self {
  66. Error::Serde(err)
  67. }
  68. }
  69. impl From<crypto::Error> for Error {
  70. fn from(err: crypto::Error) -> Self {
  71. Error::Crypto(err)
  72. }
  73. }
  74. impl From<std::num::TryFromIntError> for Error {
  75. fn from(err: std::num::TryFromIntError) -> Self {
  76. Error::custom(err)
  77. }
  78. }
  79. type Result<T> = std::result::Result<T, Error>;
  80. /// A Block tagged with its version number. When a block of a previous version is received over
  81. /// the network or read from the filesystem, it is upgraded to the current version before being
  82. /// processed.
  83. #[derive(Debug, PartialEq, Serialize, Deserialize)]
  84. enum VersionedBlock<T> {
  85. V0(Block<T>),
  86. }
  87. const SECTOR_SZ_DEFAULT: usize = 4096;
  88. // A trait for streams which only allow reads and writes in fixed sized units called sectors.
  89. trait Sectored {
  90. // Returns the size of the sector for this stream.
  91. fn sector_sz(&self) -> usize;
  92. fn assert_sector_sz(&self, actual: usize) -> Result<()> {
  93. let expected = self.sector_sz();
  94. if expected == actual {
  95. Ok(())
  96. } else {
  97. Err(Error::IncorrectSize { expected, actual })
  98. }
  99. }
  100. }
  101. #[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
  102. pub struct Header {
  103. path: Path,
  104. readcaps: HashMap<Principal, Cryptotext<SymKey>>,
  105. writecap: Writecap,
  106. merkle_root: Hash,
  107. }
  108. /// A container which binds together ciphertext along with the metadata needed to identify,
  109. /// verify and decrypt it.
  110. #[derive(Debug, PartialEq, Serialize, Deserialize)]
  111. struct Block<T> {
  112. header: Header,
  113. sig: Signature,
  114. body: T,
  115. }
  116. impl<T> Block<T> {
  117. fn try_compose_body<E: Into<Error>, U: Decompose<T>, V: TryCompose<T, U, Error = E>>(
  118. self,
  119. new_body: V,
  120. ) -> Result<Block<U>> {
  121. Ok(Block {
  122. header: self.header,
  123. sig: self.sig,
  124. body: new_body.try_compose(self.body).map_err(|err| err.into())?,
  125. })
  126. }
  127. fn compose_body<U: Decompose<T>, V: Compose<T, U>>(self, new_body: V) -> Block<U> {
  128. Block {
  129. header: self.header,
  130. sig: self.sig,
  131. body: new_body.compose(self.body),
  132. }
  133. }
  134. }
  135. impl Block<File> {
  136. fn new<P: AsRef<std::path::Path>>(path: P) -> Result<Block<File>> {
  137. let mut file = OpenOptions::new().read(true).write(true).open(path)?;
  138. let header: Header = read_from(&mut file)?;
  139. let sig: Signature = read_from(&mut file)?;
  140. crypto::verify_header(&header, &sig)?;
  141. Ok(Block {
  142. header,
  143. sig,
  144. body: file,
  145. })
  146. }
  147. }
  148. impl<T: Write> Write for Block<T> {
  149. fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
  150. self.body.write(buf)
  151. }
  152. fn flush(&mut self) -> io::Result<()> {
  153. self.body.flush()
  154. }
  155. }
  156. impl<T: Read> Read for Block<T> {
  157. fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
  158. self.body.read(buf)
  159. }
  160. }
  161. impl<T: Seek> Seek for Block<T> {
  162. fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
  163. self.body.seek(pos)
  164. }
  165. }
  166. trait Decompose<T> {
  167. fn into_inner(self) -> T;
  168. }
  169. trait TryCompose<T, U: Decompose<T>> {
  170. type Error;
  171. fn try_compose(self, inner: T) -> std::result::Result<U, Self::Error>;
  172. }
  173. trait Compose<T, U> {
  174. fn compose(self, inner: T) -> U;
  175. }
  176. impl<T, U: Decompose<T>, S: TryCompose<T, U, Error = Infallible>> Compose<T, U> for S {
  177. fn compose(self, inner: T) -> U {
  178. let result = self.try_compose(inner);
  179. // Safety: Infallible has no values, so `result` must be `Ok`.
  180. unsafe { result.unwrap_unchecked() }
  181. }
  182. }
  183. /// Extensions to the `Read` trait.
  184. trait ReadExt: Read {
  185. /// Reads repeatedly until one of the following occur:
  186. /// 1. The given buffer is full.
  187. /// 2. A call to `read` returns 0.
  188. /// 3. A call to `read` returns an error.
  189. /// The number of bytes read is returned. If an error is returned, then no bytes were read.
  190. fn fill_buf(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
  191. let dest_len_start = dest.len();
  192. while !dest.is_empty() {
  193. let byte_ct = match self.read(dest) {
  194. Ok(byte_ct) => byte_ct,
  195. Err(err) => {
  196. if dest_len_start == dest.len() {
  197. return Err(err);
  198. } else {
  199. // We're not allowed to return an error if we've already read from self.
  200. error!("an error occurred in fill_buf: {}", err);
  201. break;
  202. }
  203. }
  204. };
  205. if 0 == byte_ct {
  206. break;
  207. }
  208. dest = &mut dest[byte_ct..];
  209. }
  210. Ok(dest_len_start - dest.len())
  211. }
  212. }
  213. impl<T: Read> ReadExt for T {}
  214. impl<T: Write> Decompose<T> for CompressorWriter<T> {
  215. fn into_inner(self) -> T {
  216. self.into_inner()
  217. }
  218. }
  219. impl<T: Read> Decompose<T> for Decompressor<T> {
  220. fn into_inner(self) -> T {
  221. self.into_inner()
  222. }
  223. }
  224. #[derive(Clone)]
  225. struct BrotliParams {
  226. buf_sz: usize,
  227. quality: u32,
  228. window_sz: u32,
  229. }
  230. impl BrotliParams {
  231. fn new(buf_sz: usize, quality: u32, window_sz: u32) -> BrotliParams {
  232. BrotliParams {
  233. buf_sz,
  234. quality,
  235. window_sz,
  236. }
  237. }
  238. }
  239. impl<T: Write> TryCompose<T, CompressorWriter<T>> for BrotliParams {
  240. type Error = Error;
  241. fn try_compose(self, inner: T) -> Result<CompressorWriter<T>> {
  242. Ok(CompressorWriter::new(
  243. inner,
  244. self.buf_sz,
  245. self.quality,
  246. self.window_sz,
  247. ))
  248. }
  249. }
  250. impl<T: Read> TryCompose<T, Decompressor<T>> for BrotliParams {
  251. type Error = Error;
  252. fn try_compose(self, inner: T) -> Result<Decompressor<T>> {
  253. Ok(Decompressor::new(inner, self.buf_sz))
  254. }
  255. }
  256. /// TODO: Remove this once the error_chain crate is integrated.
  257. fn err_conv<T, E: std::error::Error + Send + Sync + 'static>(
  258. result: std::result::Result<T, E>,
  259. ) -> std::result::Result<T, io::Error> {
  260. result.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
  261. }
  262. /// A stream which buffers writes and read such that the inner stream only sees reads and writes
  263. /// of sector length buffers.
  264. struct SectoredBuf<T> {
  265. inner: T,
  266. buf: Vec<u8>,
  267. /// The offset into the inner stream which the zero offset byte in `buf` corresponds to.
  268. buf_start: usize,
  269. /// Indicates if the contents of `buf` have been written to, and so whether `buf` needs to be
  270. /// written back to `inner` before it is refilled.
  271. dirty: bool,
  272. /// The total number of bytes that have been written to the inner stream, including the reserved
  273. /// bytes at the beginning.
  274. len: usize,
  275. /// The current position of this stream, expressed as an offset into the inner stream.
  276. pos: usize,
  277. }
  278. impl SectoredBuf<()> {
  279. fn new() -> SectoredBuf<()> {
  280. SectoredBuf {
  281. inner: (),
  282. buf: Vec::new(),
  283. buf_start: 0,
  284. dirty: false,
  285. len: 0,
  286. pos: 0,
  287. }
  288. }
  289. }
  290. impl<T> SectoredBuf<T> {
  291. /// The number of bytes at the beginning of the inner stream which are reserved to store the
  292. /// length of data written. All offsets into the stored data must be shifted by this amount to
  293. /// be translated to an offset in the inner stream.
  294. const RESERVED: usize = std::mem::size_of::<usize>();
  295. /// Returns the position in the inner stream which the given position in this stream corresponds
  296. /// to.
  297. fn inner_pos(self_pos: u64) -> u64 {
  298. let offset: u64 = Self::RESERVED.try_into().unwrap();
  299. self_pos + offset
  300. }
  301. /// Returns the position in this stream which the given position in the inner stream corresponds
  302. /// to.
  303. fn self_pos(inner_pos: u64) -> u64 {
  304. let offset: u64 = Self::RESERVED.try_into().unwrap();
  305. inner_pos - offset
  306. }
  307. /// Returns the offset into the internal buffer that corresponds to the current position.
  308. fn buf_pos(&self) -> usize {
  309. self.pos - self.buf_start
  310. }
  311. /// Returns one more than the last index in the internal buffer which can be read.
  312. fn buf_end(&self) -> usize {
  313. let limit = self.len.min(self.buf_start + self.sector_sz());
  314. limit - self.buf_start
  315. }
  316. /// Returns the index of the sector which is currently loaded into the buffer.
  317. fn buf_sector_index(&self) -> usize {
  318. self.pos / self.sector_sz()
  319. }
  320. }
  321. impl<T: Read + Seek> SectoredBuf<T> {
  322. /// Fills the internal buffer by reading from the inner stream at the current position
  323. /// and updates `self.buf_start` with the position read from.
  324. fn fill_internal_buf(&mut self) -> io::Result<usize> {
  325. self.buf_start = err_conv(self.inner.stream_position()?.try_into())?;
  326. let read_bytes = if self.buf_start < self.len {
  327. let read_bytes = self.inner.fill_buf(&mut self.buf)?;
  328. if read_bytes < self.buf.len() {
  329. return Err(io::Error::new(
  330. io::ErrorKind::Other,
  331. format!(
  332. "Failed to fill SectoredBuf.buf. Expected {} bytes, got {}.",
  333. self.buf.len(),
  334. read_bytes
  335. ),
  336. ));
  337. }
  338. read_bytes
  339. } else {
  340. 0
  341. };
  342. Ok(read_bytes)
  343. }
  344. }
  345. impl<T> Decompose<T> for SectoredBuf<T> {
  346. fn into_inner(self) -> T {
  347. self.inner
  348. }
  349. }
  350. impl<T: Sectored + Read + Seek> TryCompose<T, SectoredBuf<T>> for SectoredBuf<()> {
  351. type Error = Error;
  352. fn try_compose(self, inner: T) -> Result<SectoredBuf<T>> {
  353. let sect_sz = inner.sector_sz();
  354. if sect_sz < Self::RESERVED {
  355. return Err(Error::custom(format!(
  356. "a sector size of at least {} is required. Got {}",
  357. Self::RESERVED,
  358. sect_sz,
  359. )));
  360. }
  361. let mut sectored = SectoredBuf {
  362. inner,
  363. buf: self.buf,
  364. buf_start: 0,
  365. dirty: false,
  366. len: Self::RESERVED,
  367. pos: Self::RESERVED,
  368. };
  369. sectored.inner.seek(SeekFrom::Start(0))?;
  370. sectored.buf.resize(sect_sz, 0);
  371. let len_stored = match sectored.fill_internal_buf() {
  372. Ok(bytes_read) => bytes_read >= Self::RESERVED,
  373. Err(err) => {
  374. error!("SectoredBuf::fill_internal_buf returned an error: {}", err);
  375. false
  376. }
  377. };
  378. if len_stored {
  379. if let Ok(len) = read_from::<u64, _>(&mut sectored.buf.as_slice()) {
  380. sectored.len = len.try_into()?;
  381. }
  382. } else {
  383. write_to(&Self::RESERVED, &mut sectored.buf.as_mut_slice())?;
  384. sectored.dirty = true;
  385. }
  386. Ok(sectored)
  387. }
  388. }
  389. impl<T> Sectored for SectoredBuf<T> {
  390. fn sector_sz(&self) -> usize {
  391. self.buf.len()
  392. }
  393. }
  394. impl<T: Seek + Read + Write> Write for SectoredBuf<T> {
  395. fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
  396. let src_len_start = src.len();
  397. let mut dest = {
  398. let buf_pos = self.buf_pos();
  399. &mut self.buf[buf_pos..]
  400. };
  401. while !src.is_empty() {
  402. if dest.is_empty() {
  403. if let Err(err) = self.flush() {
  404. error!("A call to SectoredBuf::flush returned an error: {}", err);
  405. break;
  406. }
  407. dest = &mut self.buf[..];
  408. }
  409. let sz = src.len().min(dest.len());
  410. (&mut dest[..sz]).copy_from_slice(&src[..sz]);
  411. dest = &mut dest[sz..];
  412. src = &src[sz..];
  413. self.dirty = sz > 0;
  414. self.pos += sz;
  415. }
  416. Ok(src_len_start - src.len())
  417. }
  418. fn flush(&mut self) -> io::Result<()> {
  419. if !self.dirty {
  420. return Ok(());
  421. }
  422. // Write out the contents of the buffer.
  423. let sect_sz: u64 = err_conv(self.sector_sz().try_into())?;
  424. let inner_pos = self.inner.stream_position()?;
  425. let inner_pos_usize: usize = err_conv(inner_pos.try_into())?;
  426. let is_new_sector = self.pos > inner_pos_usize;
  427. let is_full = (self.buf.len() - self.buf_pos()) == 0;
  428. let seek_to = if is_new_sector {
  429. if is_full {
  430. inner_pos + sect_sz
  431. } else {
  432. inner_pos
  433. }
  434. } else {
  435. // The contents of the buffer were previously read from inner, so we write the
  436. // updated contents to the same offset.
  437. let sect_start: u64 = err_conv(self.buf_start.try_into())?;
  438. self.inner.seek(SeekFrom::Start(sect_start))?;
  439. if is_full {
  440. inner_pos
  441. } else {
  442. inner_pos - sect_sz
  443. }
  444. };
  445. self.inner.write_all(&self.buf)?;
  446. // Update the stored length.
  447. self.len = self.len.max(self.pos);
  448. self.inner.seek(SeekFrom::Start(0))?;
  449. self.fill_internal_buf()?;
  450. let len: u64 = err_conv(self.len.try_into())?;
  451. err_conv(write_to(&len, &mut self.buf.as_mut_slice()))?;
  452. self.inner.seek(SeekFrom::Start(0))?;
  453. self.inner.write_all(&self.buf)?;
  454. // Seek to the next position.
  455. self.inner.seek(SeekFrom::Start(seek_to))?;
  456. self.fill_internal_buf()?;
  457. self.dirty = false;
  458. Ok(())
  459. }
  460. }
  461. impl<T: Read + Seek> Read for SectoredBuf<T> {
  462. fn read(&mut self, mut dest: &mut [u8]) -> io::Result<usize> {
  463. if self.pos == self.len {
  464. return Ok(0);
  465. }
  466. let dest_len_start = dest.len();
  467. let mut src = {
  468. let start = self.buf_pos();
  469. let end = self.buf_end();
  470. &self.buf[start..end]
  471. };
  472. while !dest.is_empty() {
  473. if src.is_empty() {
  474. if self.pos >= self.len {
  475. break;
  476. }
  477. let byte_ct = match self.fill_internal_buf() {
  478. Ok(byte_ct) => byte_ct,
  479. Err(err) => {
  480. error!("SectoredBuf::full_internal_buf returned an error: {}", err);
  481. break;
  482. }
  483. };
  484. if 0 == byte_ct {
  485. break;
  486. }
  487. src = &self.buf[..byte_ct];
  488. }
  489. let sz = src.len().min(dest.len());
  490. (&mut dest[..sz]).copy_from_slice(&src[..sz]);
  491. dest = &mut dest[sz..];
  492. src = &src[sz..];
  493. self.pos += sz;
  494. }
  495. Ok(dest_len_start - dest.len())
  496. }
  497. }
  498. impl<T: Seek + Read + Write> Seek for SectoredBuf<T> {
  499. fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
  500. let inner_pos = self.inner.stream_position()?;
  501. let inner_pos_new = match pos {
  502. SeekFrom::Start(rel_start) => Self::inner_pos(rel_start),
  503. SeekFrom::Current(rel_curr) => {
  504. if rel_curr > 0 {
  505. inner_pos + rel_curr as u64
  506. } else {
  507. inner_pos - rel_curr as u64
  508. }
  509. }
  510. SeekFrom::End(_) => {
  511. return Err(io::Error::new(
  512. io::ErrorKind::Unsupported,
  513. "seeking relative to the end of the stream is not supported",
  514. ))
  515. }
  516. };
  517. let sect_sz = self.sector_sz();
  518. let sect_index = self.buf_sector_index();
  519. let sect_index_new = err_conv(TryInto::<usize>::try_into(inner_pos_new))? / sect_sz;
  520. let pos: u64 = err_conv(self.pos.try_into())?;
  521. if sect_index != sect_index_new || pos == inner_pos {
  522. self.flush()?;
  523. let seek_to: u64 = err_conv((sect_index_new * sect_sz).try_into())?;
  524. self.inner.seek(SeekFrom::Start(seek_to))?;
  525. self.fill_internal_buf()?;
  526. }
  527. self.pos = err_conv(inner_pos_new.try_into())?;
  528. Ok(Self::self_pos(inner_pos_new))
  529. }
  530. }
  531. /// An envelopment of a key, which is tagged with the principal who the key is meant for.
  532. #[derive(Debug, PartialEq, Serialize, Deserialize)]
  533. struct Readcap {
  534. /// The principal this `Readcap` was issued to.
  535. issued_to: Principal,
  536. /// An encipherment of a block key using the public key of the principal.
  537. key: Cryptotext<SymKey>,
  538. }
  539. impl Readcap {
  540. fn new(issued_to: Hash, key: Cryptotext<SymKey>) -> Readcap {
  541. Readcap {
  542. issued_to: Principal(issued_to),
  543. key,
  544. }
  545. }
  546. }
  547. /// Verifies that a principal is authorized to write blocks in a tree.
  548. #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
  549. struct Writecap {
  550. /// The principal this `Writecap` was issued to.
  551. issued_to: Principal,
  552. /// The path where this write caps's validity begins.
  553. path: Path,
  554. /// The point in time after which this write cap is no longer valid.
  555. expires: Epoch,
  556. /// The public key used to sign this write cap.
  557. signing_key: AsymKeyPub<Sign>,
  558. /// A digital signature which covers all of the fields in the write cap except for next.
  559. signature: Signature,
  560. /// The next write cap in the chain leading back to the root.
  561. next: Option<Box<Writecap>>,
  562. }
  563. /// Fragments are created from blocks using Erasure Encoding and stored with other nodes in the
  564. /// network to provide availability and redundancy of data.
  565. #[derive(Debug, PartialEq, Serialize, Deserialize)]
  566. struct Fragment {
  567. /// The path to the block this fragment is from.
  568. path: Path,
  569. /// The serial number of this fragment.
  570. serial: FragmentSerial,
  571. /// The actual data.
  572. body: Vec<u8>,
  573. }
  574. impl Fragment {
  575. /// Create a new fragment with the given fields. If `path_str` cannot be parsed then a failed
  576. /// `Result` is returned containing a `PathError`.
  577. fn new(
  578. path_str: &str,
  579. serial_num: u32,
  580. body: Vec<u8>,
  581. ) -> std::result::Result<Fragment, PathError> {
  582. let result = Path::try_from(path_str);
  583. Ok(Fragment {
  584. path: result?,
  585. serial: FragmentSerial(serial_num),
  586. body,
  587. })
  588. }
  589. }
  590. /// The body of every non-leaf node in a tree contains this data structure.
  591. #[derive(Debug, PartialEq, Serialize, Deserialize)]
  592. struct Directory {
  593. /// The nodes that are attached to the tree at this block.
  594. nodes: Vec<Principal>,
  595. /// This block's descendants.
  596. children: HashMap<String, HashMap<FragmentSerial, FragmentRecord>>,
  597. }
  598. /// Keeps track of which principal is storing a fragment.
  599. #[derive(Debug, PartialEq, Serialize, Deserialize)]
  600. struct FragmentRecord {
  601. /// The fragment serial number this record is for.
  602. serial: FragmentSerial,
  603. /// The principal who is storing this fragment.
  604. stored_by: Principal,
  605. }
  606. impl FragmentRecord {
  607. /// Creates a new `FragmentRecord` whose `serial` and `stored_by` fields are set to
  608. /// the given values.
  609. fn new(serial: u32, stored_by: Hash) -> FragmentRecord {
  610. FragmentRecord {
  611. serial: FragmentSerial(serial),
  612. stored_by: Principal(stored_by),
  613. }
  614. }
  615. }
  616. /// An identifier for a security principal, which is any entity that can be authenticated.
  617. #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Hashable, Clone)]
  618. struct Principal(Hash);
  619. impl Principal {
  620. fn kind(&self) -> HashKind {
  621. HashKind::from(&self.0)
  622. }
  623. }
  624. /// Trait for types which are owned by a `Principal`.
  625. trait Owned {
  626. /// Returns the `Principal` that owns `self`, using the given hash algorithm.
  627. fn owner_of_kind(&self, kind: HashKind) -> Principal;
  628. /// Returns the `Principal` that owns `self`, using the default hash algorithm.
  629. fn owner(&self) -> Principal {
  630. self.owner_of_kind(HashKind::default())
  631. }
  632. }
  633. /// An identifier for a block in a tree.
  634. #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
  635. struct Path {
  636. owner: Principal,
  637. components: Vec<String>,
  638. }
  639. impl Path {
  640. /// The character that is used to separate path components.
  641. const SEP: char = '/';
  642. /// The limit, in bytes, of a `Path`'s length.
  643. const BYTE_LIMIT: usize = 4096;
  644. /// Returns a result which, when successful, contains the index after the last character in the
  645. /// current path component.
  646. fn component_end<I: Iterator<Item = (usize, char)>>(
  647. start: usize,
  648. first: char,
  649. pairs: &mut I,
  650. ) -> std::result::Result<usize, PathError> {
  651. if first == Path::SEP {
  652. return Err(PathError::EmptyComponent);
  653. }
  654. let end;
  655. let mut last = start;
  656. loop {
  657. match pairs.next() {
  658. Some((index, Path::SEP)) => {
  659. end = index;
  660. break;
  661. }
  662. Some((index, _)) => last = index,
  663. None => {
  664. end = last + 1;
  665. break;
  666. }
  667. }
  668. }
  669. if end == start {
  670. Err(PathError::EmptyComponent)
  671. } else {
  672. Ok(end)
  673. }
  674. }
  675. /// Asserts that the number of bytes in the given string is no more than `Path::BYTE_LIMIT`.
  676. fn assert_not_too_long(string: &str) -> std::result::Result<(), PathError> {
  677. let len = string.len();
  678. if len > Path::BYTE_LIMIT {
  679. return Err(PathError::PathTooLong(len));
  680. }
  681. Ok(())
  682. }
  683. /// Returns true if `other` is a subpath of this `Path`.
  684. fn contains(&self, other: &Path) -> bool {
  685. if self.owner != other.owner {
  686. return false;
  687. };
  688. // This path must be no longer than the other path.
  689. if self.components.len() > other.components.len() {
  690. return false;
  691. }
  692. // Skip the component containing the owner.
  693. let self_iter = self.components.iter().skip(1);
  694. let other_iter = other.components.iter().skip(1);
  695. for pair in self_iter.zip(other_iter) {
  696. if pair.0 != pair.1 {
  697. return false;
  698. }
  699. }
  700. true
  701. }
  702. }
  703. impl<'s> TryFrom<&'s str> for Path {
  704. type Error = PathError;
  705. fn try_from(string: &'s str) -> std::result::Result<Path, PathError> {
  706. Path::assert_not_too_long(string)?;
  707. let mut pairs = string.char_indices();
  708. let mut components = Vec::new();
  709. let mut last_end = 0;
  710. while let Some((start, c)) = pairs.next() {
  711. let end = Path::component_end(start, c, &mut pairs)?;
  712. last_end = end;
  713. let slice = &string[start..end];
  714. components.push(slice.to_string());
  715. }
  716. // An empty component is added to the end to indicate if there was a trailing slash.
  717. if string.len() - 1 == last_end {
  718. components.push("".to_string());
  719. }
  720. let leading = components
  721. .get(0)
  722. .ok_or(PathError::InvalidLeadingComponent)?;
  723. let hash =
  724. Hash::try_from(leading.as_str()).map_err(|_| PathError::InvalidLeadingComponent)?;
  725. Ok(Path {
  726. owner: Principal(hash),
  727. components,
  728. })
  729. }
  730. }
  731. impl Display for Path {
  732. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  733. if self.components.is_empty() {
  734. return write!(f, "");
  735. };
  736. let mut iter = self.components.iter();
  737. let first = iter.next().unwrap();
  738. let mut output = write!(f, "{}", first);
  739. for component in iter {
  740. output = write!(f, "{}{}", Path::SEP, component)
  741. }
  742. output
  743. }
  744. }
  745. /// Errors which can occur when converting a string to a `Path`.
  746. #[derive(Debug, PartialEq)]
  747. enum PathError {
  748. /// Occurs when the number of bytes in a string is greater than `Path::BYTE_LIMIT`.
  749. PathTooLong(usize),
  750. /// Indicates that a path string was empty.
  751. Empty,
  752. /// Occurs when a component in a path string was empty.
  753. EmptyComponent,
  754. /// Occurs when the leading component of a path is not in the correct format.
  755. InvalidLeadingComponent,
  756. }
  757. impl Display for PathError {
  758. fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
  759. match self {
  760. PathError::PathTooLong(length) => formatter.write_fmt(format_args!(
  761. "path contained {} bytes, which is over the {} byte limit",
  762. length,
  763. Path::BYTE_LIMIT
  764. )),
  765. PathError::Empty => formatter.write_str("path was empty"),
  766. PathError::EmptyComponent => formatter.write_str("component of path was empty"),
  767. PathError::InvalidLeadingComponent => {
  768. formatter.write_str("invalid leading path component")
  769. }
  770. }
  771. }
  772. }
  773. /// An instant in time represented by the number of seconds since January 1st 1970, 00:00:00 UTC.
  774. #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord)]
  775. struct Epoch(u64);
  776. impl Epoch {
  777. /// Returns the current epoch time.
  778. fn now() -> Epoch {
  779. let now = SystemTime::now();
  780. // If the system clock is before the unix epoch, just panic.
  781. let epoch = now.duration_since(SystemTime::UNIX_EPOCH).unwrap();
  782. Epoch(epoch.as_secs())
  783. }
  784. }
  785. impl Copy for Epoch {}
  786. impl Add<Duration> for Epoch {
  787. type Output = Self;
  788. fn add(self, other: Duration) -> Self {
  789. Epoch(self.0 + other.as_secs())
  790. }
  791. }
  792. impl Sub<Duration> for Epoch {
  793. type Output = Self;
  794. fn sub(self, other: Duration) -> Self {
  795. Epoch(self.0 - other.as_secs())
  796. }
  797. }
  798. /// The serial number of a block fragment.
  799. #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Hashable)]
  800. struct FragmentSerial(u32);
  801. #[cfg(test)]
  802. mod tests {
  803. use std::io::Cursor;
  804. use super::*;
  805. use test_helpers::*;
  806. fn path_from_str_test_case(
  807. expected: std::result::Result<Path, PathError>,
  808. input: &str,
  809. ) -> std::result::Result<(), PathError> {
  810. let result = Path::try_from(input);
  811. assert_eq!(expected, result);
  812. Ok(())
  813. }
  814. #[test]
  815. fn path_from_str_multiple_components_ok() -> std::result::Result<(), PathError> {
  816. let expected = make_path(vec!["red", "green", "blue"]);
  817. let input = format!("{}/red/green/blue", expected.owner.0);
  818. path_from_str_test_case(Ok(expected), input.as_str())?;
  819. Ok(())
  820. }
  821. #[test]
  822. fn path_from_str_one_component_ok() -> std::result::Result<(), PathError> {
  823. let expected = make_path(vec![]);
  824. let input = expected.owner.0.to_string();
  825. path_from_str_test_case(Ok(expected), input.as_str())?;
  826. Ok(())
  827. }
  828. #[test]
  829. fn path_from_str_trailing_slash_ok() -> std::result::Result<(), PathError> {
  830. // Notice the empty component at the end of this path due to the trailing slash.
  831. let expected = make_path(vec!["orange", "banana", "shotgun", ""]);
  832. let input = format!("{}/orange/banana/shotgun/", expected.owner.0);
  833. path_from_str_test_case(Ok(expected), input.as_str())?;
  834. Ok(())
  835. }
  836. #[test]
  837. fn path_from_str_path_too_long_fail() -> std::result::Result<(), PathError> {
  838. let principal = make_principal();
  839. let input = format!("{}/{}", principal.0, "*".repeat(4097));
  840. let expected = Err(PathError::PathTooLong(input.len()));
  841. path_from_str_test_case(expected, input.as_str())?;
  842. Ok(())
  843. }
  844. #[test]
  845. fn path_from_str_multiple_slashes_fail() -> std::result::Result<(), PathError> {
  846. let expected = Err(PathError::EmptyComponent);
  847. let input = format!("{}//orange", make_principal().0);
  848. path_from_str_test_case(expected, input.as_str())?;
  849. Ok(())
  850. }
  851. #[test]
  852. fn path_from_str_leading_slash_fail() -> std::result::Result<(), PathError> {
  853. let expected = Err(PathError::EmptyComponent);
  854. let input = format!("/{}/orange/banana/shotgun", make_principal().0);
  855. path_from_str_test_case(expected, input.as_str())?;
  856. Ok(())
  857. }
  858. #[test]
  859. fn path_round_trip() -> std::result::Result<(), PathError> {
  860. let expected = make_path(vec!["interstitial", "inter-related", "intersections"]);
  861. let actual = Path::try_from(expected.to_string().as_str())?;
  862. assert_eq!(expected, actual);
  863. Ok(())
  864. }
  865. #[test]
  866. fn path_contains_true() {
  867. let larger = make_path(vec!["apps"]);
  868. let smaller = make_path(vec!["apps", "bohdi"]);
  869. assert!(larger.contains(&smaller));
  870. }
  871. #[test]
  872. fn path_contains_true_only_owner() {
  873. let larger = make_path(vec![]);
  874. let smaller = make_path(vec![]);
  875. assert!(larger.contains(&smaller));
  876. }
  877. #[test]
  878. fn path_contains_false_self_is_longer() {
  879. let first = make_path(vec!["apps", "bohdi"]);
  880. let second = make_path(vec!["apps"]);
  881. assert!(!first.contains(&second));
  882. }
  883. #[test]
  884. fn path_contains_false_same_owners() {
  885. let first = make_path(vec!["apps"]);
  886. let second = make_path(vec!["nodes"]);
  887. assert!(!first.contains(&second));
  888. }
  889. #[test]
  890. fn path_contains_false_different_owners() {
  891. let first = make_path(vec!["apps"]);
  892. let mut second = make_path(vec!["apps"]);
  893. second.owner = Principal(Hash::Sha2_256(PRINCIPAL2));
  894. assert!(!first.contains(&second));
  895. }
  896. #[test]
  897. fn brotli_compress_decompress() {
  898. const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
  899. const SECT_CT: usize = 16;
  900. let params = BrotliParams::new(SECT_SZ, 8, 20);
  901. let mut memory = Cursor::new([0u8; SECT_SZ * SECT_CT]);
  902. {
  903. let write: CompressorWriter<_> = params
  904. .clone()
  905. .try_compose(&mut memory)
  906. .expect("compose for write failed");
  907. write_fill(write, SECT_SZ, SECT_CT);
  908. }
  909. memory.seek(SeekFrom::Start(0)).expect("seek failed");
  910. {
  911. let read: Decompressor<_> = params
  912. .try_compose(&mut memory)
  913. .expect("compose for read failed");
  914. read_check(read, SECT_SZ, SECT_CT);
  915. }
  916. }
  917. fn make_sectored_buf(sect_sz: usize, sect_ct: usize) -> SectoredBuf<SectoredCursor<Vec<u8>>> {
  918. SectoredBuf::new()
  919. .try_compose(SectoredCursor::new(vec![0u8; sect_sz * sect_ct], sect_sz))
  920. .expect("compose for sectored buffer failed")
  921. }
  922. #[test]
  923. fn sectored_buf_fill_inner() {
  924. const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
  925. const SECT_CT: usize = 16;
  926. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  927. let sect_sz = sectored.sector_sz();
  928. assert_eq!(0, sect_sz % 16);
  929. let chunk_sz = sect_sz / 16;
  930. let chunk_ct = SECT_CT * 16;
  931. write_fill(&mut sectored, chunk_sz, chunk_ct);
  932. }
  933. #[test]
  934. fn sectored_buf_write_read_sequential() {
  935. const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
  936. const SECT_CT: usize = 16;
  937. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  938. let sect_sz = sectored.sector_sz();
  939. assert_eq!(0, sect_sz % 16);
  940. let chunk_sz = sect_sz / 16;
  941. // We subtract one here so that the underlying buffer is not completely filled. This
  942. // exercises the length limiting capability of the sectored buffer.
  943. let chunk_ct = SECT_CT * 16 - 1;
  944. write_fill(&mut sectored, chunk_sz, chunk_ct);
  945. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  946. read_check(&mut sectored, chunk_sz, chunk_ct);
  947. }
  948. #[test]
  949. fn sectored_buf_sect_sz_too_small_is_error() {
  950. const MIN: usize = SectoredBuf::<()>::RESERVED;
  951. let result = SectoredBuf::new().try_compose(SectoredCursor::new([0u8; MIN], MIN - 1));
  952. assert!(result.is_err());
  953. }
  954. #[test]
  955. fn sectored_buf_len_preserved() {
  956. const SECT_SZ: usize = SECTOR_SZ_DEFAULT;
  957. const SECT_CT: usize = 16;
  958. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  959. let expected = vec![42u8; 12];
  960. // We need to ensure that writing expected will not fill up the buffer in sectored.
  961. assert!(expected.len() < sectored.sector_sz() - SectoredBuf::<()>::RESERVED);
  962. sectored.write_all(&expected).expect("write failed");
  963. sectored.flush().expect("flush failed");
  964. let inner = sectored.into_inner();
  965. let mut sectored = SectoredBuf::new()
  966. .try_compose(inner)
  967. .expect("failed to compose sectored buffer");
  968. let mut actual = vec![0u8; expected.len()];
  969. sectored
  970. .fill_buf(actual.as_mut_slice())
  971. .expect("failed to fill actual");
  972. assert_eq!(expected, actual);
  973. }
  974. #[test]
  975. fn sectored_buf_seek() {
  976. let sect_sz = 16usize;
  977. let sect_ct = 16usize;
  978. let cap = sect_sz * sect_ct - std::mem::size_of::<usize>();
  979. let source = {
  980. let mut source = Vec::with_capacity(cap);
  981. source.extend(
  982. std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
  983. .take(cap),
  984. );
  985. source
  986. };
  987. let mut sectored = make_sectored_buf(sect_sz, sect_ct);
  988. sectored.write(&source).expect("write failed");
  989. let mut buf = [0u8; 1];
  990. let end = cap.try_into().expect("cap cannot fit into a u8");
  991. for pos in (0..end).rev() {
  992. sectored
  993. .seek(SeekFrom::Start(pos as u64))
  994. .expect("seek failed");
  995. sectored.read(&mut buf).expect("read failed");
  996. assert_eq!(pos, buf[0]);
  997. }
  998. }
  999. #[test]
  1000. fn sectored_buf_write_read_random() {
  1001. const SECT_SZ: usize = 16;
  1002. const SECT_CT: usize = 16;
  1003. const CAP: usize = SECT_SZ * SECT_CT - std::mem::size_of::<usize>();
  1004. let source = {
  1005. let mut expected = Vec::with_capacity(CAP);
  1006. expected.extend(
  1007. std::iter::successors(Some(0u8), |n| if *n <= 254 { Some(*n + 1) } else { None })
  1008. .take(CAP),
  1009. );
  1010. expected
  1011. };
  1012. let indices: Vec<(usize, usize)> = {
  1013. let rando = Randomizer::new([3u8; Randomizer::HASH.len()]);
  1014. let rando2 = Randomizer::new([5u8; Randomizer::HASH.len()]);
  1015. rando
  1016. .zip(rando2)
  1017. .take(SECT_CT)
  1018. .map(|(mut first, mut second)| {
  1019. first %= source.len();
  1020. second &= source.len();
  1021. let low = first.min(second);
  1022. let high = first.max(second);
  1023. (low, high)
  1024. })
  1025. .collect()
  1026. };
  1027. let mut sectored = make_sectored_buf(SECT_SZ, SECT_CT);
  1028. sectored
  1029. .write_all(&[0u8; CAP])
  1030. .expect("failed to fill sectored");
  1031. sectored.flush().expect("flush failed");
  1032. for (_k, (low, high)) in indices.iter().enumerate() {
  1033. sectored
  1034. .seek(SeekFrom::Start(*low as u64))
  1035. .expect("seek failed");
  1036. let src = &source[*low..*high];
  1037. sectored.write(src).expect("write failed");
  1038. }
  1039. sectored.flush().expect("flush failed");
  1040. let mut buf = vec![0u8; CAP];
  1041. for (_k, (low, high)) in indices.iter().enumerate() {
  1042. sectored
  1043. .seek(SeekFrom::Start(*low as u64))
  1044. .expect("seek failed");
  1045. let actual = &mut buf[*low..*high];
  1046. sectored.fill_buf(actual).expect("read failed");
  1047. let expected = &source[*low..*high];
  1048. assert_eq!(expected, actual);
  1049. }
  1050. }
  1051. #[test]
  1052. fn sectored_buf_read_past_end() {
  1053. const LEN: usize = 32;
  1054. let mut sectored = SectoredBuf::new()
  1055. .try_compose(SectoredCursor::new([0u8; LEN], LEN))
  1056. .expect("compose failed");
  1057. const BUF_LEN: usize = LEN - SectoredBuf::<()>::RESERVED + 1;
  1058. sectored.write(&[1u8; BUF_LEN - 1]).expect("write failed");
  1059. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  1060. let mut buf = [0u8; BUF_LEN];
  1061. // Note that buf is one byte longer than the available capacity in the cursor.
  1062. sectored.read(&mut buf).expect("read failed");
  1063. assert_eq!(&[1u8; BUF_LEN - 1], &buf[..(BUF_LEN - 1)]);
  1064. assert_eq!(0u8, buf[BUF_LEN - 1]);
  1065. }
  1066. /// Tests that the data written in try_compose is actually written back to the underlying stream.
  1067. #[test]
  1068. fn sectored_buf_write_back() {
  1069. let mut sectored = SectoredBuf::new()
  1070. .try_compose(SectoredCursor::new(vec![0u8; 24], 16))
  1071. .expect("compose failed");
  1072. let expected = [1u8; 8];
  1073. sectored.write(&expected).expect("first write failed");
  1074. sectored.write(&[2u8; 8]).expect("second write failed");
  1075. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  1076. let mut actual = [0u8; 8];
  1077. sectored.read(&mut actual).expect("read failed");
  1078. assert_eq!(expected, actual);
  1079. }
  1080. #[test]
  1081. fn sectored_buf_write_past_end() {
  1082. const LEN: usize = 8;
  1083. let mut sectored = SectoredBuf::new()
  1084. .try_compose(SectoredCursor::new(vec![0u8; 0], LEN))
  1085. .expect("compos failed");
  1086. let expected = [1u8; LEN + 1];
  1087. sectored.write(&expected).expect("write failed");
  1088. sectored.seek(SeekFrom::Start(0)).expect("seek failed");
  1089. let mut actual = [0u8; LEN + 1];
  1090. sectored.read(&mut actual).expect("read failed");
  1091. assert_eq!(expected, actual);
  1092. }
  1093. }