tests.rs 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. use btmsg::*;
  2. use btlib::{
  3. crypto::{ConcreteCreds, Creds},
  4. Epoch, Principal, Principaled, Result,
  5. };
  6. use ctor::ctor;
  7. use futures::stream::StreamExt;
  8. use lazy_static::lazy_static;
  9. use serde::{Deserialize, Serialize};
  10. use std::{
  11. net::{IpAddr, Ipv6Addr},
  12. sync::Arc,
  13. time::Duration,
  14. };
  15. use tokio::sync::mpsc;
  16. #[ctor]
  17. fn setup_logging() {
  18. env_logger::init();
  19. }
  20. lazy_static! {
  21. static ref ROOT_CREDS: ConcreteCreds = ConcreteCreds::generate().unwrap();
  22. static ref NODE_CREDS: ConcreteCreds = {
  23. let mut creds = ConcreteCreds::generate().unwrap();
  24. let root_creds = &ROOT_CREDS;
  25. let writecap = root_creds
  26. .issue_writecap(
  27. creds.principal(),
  28. vec![],
  29. Epoch::now() + Duration::from_secs(3600),
  30. )
  31. .unwrap();
  32. creds.set_writecap(writecap);
  33. creds
  34. };
  35. static ref ROOT_PRINCIPAL: Principal = ROOT_CREDS.principal();
  36. }
  37. #[derive(Serialize, Deserialize)]
  38. enum MsgError {
  39. Unknown,
  40. }
  41. #[derive(Deserialize)]
  42. enum BodyOwned {
  43. Ping,
  44. Success,
  45. Fail(MsgError),
  46. Read { offset: u64, size: u64 },
  47. Write { offset: u64, buf: Vec<u8> },
  48. }
  49. impl CallRx for BodyOwned {
  50. type Reply<'a> = BodyRef<'a>;
  51. }
  52. impl SendRx for BodyOwned {}
  53. #[derive(Serialize)]
  54. enum BodyRef<'a> {
  55. Ping,
  56. Success,
  57. Fail(MsgError),
  58. Read { offset: u64, size: u64 },
  59. Write { offset: u64, buf: &'a [u8] },
  60. }
  61. impl<'a> CallTx for BodyRef<'a> {
  62. type Reply = BodyOwned;
  63. }
  64. impl<'a> SendTx for BodyRef<'a> {}
  65. struct TestCase;
  66. impl TestCase {
  67. fn new() -> TestCase {
  68. Self
  69. }
  70. fn new_process_router(&self) -> (impl Router, Arc<BlockAddr>) {
  71. let ip_addr = IpAddr::V6(Ipv6Addr::LOCALHOST);
  72. let mut creds = ConcreteCreds::generate().unwrap();
  73. let writecap = NODE_CREDS
  74. .issue_writecap(
  75. creds.principal(),
  76. vec![],
  77. Epoch::now() + Duration::from_secs(3600),
  78. )
  79. .unwrap();
  80. let addr = Arc::new(BlockAddr::new(ip_addr, Arc::new(writecap.bind_path())));
  81. creds.set_writecap(writecap);
  82. (router(ip_addr, Arc::new(creds)).unwrap(), addr)
  83. }
  84. /// Returns a ([Sender], [Receiver]) pair for a process identified by the given integer.
  85. async fn new_process(&self) -> (impl Transmitter, impl Receiver<BodyOwned>) {
  86. let (router, addr) = self.new_process_router();
  87. let receiver = router.receiver::<BodyOwned>().await.unwrap();
  88. let sender = router.transmitter(addr).await.unwrap();
  89. (sender, receiver)
  90. }
  91. }
  92. #[tokio::test]
  93. async fn message_received_is_message_sent() {
  94. let case = TestCase::new();
  95. let (mut sender, mut receiver) = case.new_process().await;
  96. sender.send(BodyRef::Ping).await.unwrap();
  97. let actual = receiver.next().await.unwrap().unwrap();
  98. let matched = if let BodyOwned::Ping = actual.body() {
  99. true
  100. } else {
  101. false
  102. };
  103. assert!(matched);
  104. }
  105. #[tokio::test]
  106. async fn message_received_from_path_is_correct() {
  107. let case = TestCase::new();
  108. let (mut sender, mut receiver) = case.new_process().await;
  109. sender.send(BodyRef::Ping).await.unwrap();
  110. let actual = receiver.next().await.unwrap().unwrap();
  111. assert_eq!(receiver.addr().path(), actual.from().as_ref());
  112. }
  113. #[tokio::test]
  114. async fn ping_pong() {
  115. let case = TestCase::new();
  116. let (mut sender_one, mut receiver_one) = case.new_process().await;
  117. let (mut sender_two, mut receiver_two) = case.new_process().await;
  118. tokio::spawn(async move {
  119. let received = receiver_one.next().await.unwrap().unwrap();
  120. let reply_body = if let BodyOwned::Ping = received.body() {
  121. BodyRef::Success
  122. } else {
  123. BodyRef::Fail(MsgError::Unknown)
  124. };
  125. let fut = assert_send::<'_, Result<()>>(sender_two.send(reply_body));
  126. fut.await.unwrap();
  127. sender_two.finish().await.unwrap();
  128. });
  129. sender_one.send(BodyRef::Ping).await.unwrap();
  130. let reply = receiver_two.next().await.unwrap().unwrap();
  131. let matched = if let BodyOwned::Success = reply.body() {
  132. true
  133. } else {
  134. false
  135. };
  136. assert!(matched);
  137. assert_eq!(receiver_two.addr().path(), reply.from().as_ref());
  138. }
  139. #[tokio::test]
  140. async fn read_write() {
  141. let case = TestCase::new();
  142. let (mut sender_one, mut receiver_one) = case.new_process().await;
  143. let (mut sender_two, mut receiver_two) = case.new_process().await;
  144. let handle = tokio::spawn(async move {
  145. let data: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
  146. let received = receiver_one.next().await.unwrap().unwrap();
  147. let reply_body = if let BodyOwned::Read { offset, size } = received.body() {
  148. let offset: usize = (*offset).try_into().unwrap();
  149. let size: usize = (*size).try_into().unwrap();
  150. let end: usize = offset + size;
  151. BodyRef::Write {
  152. offset: offset as u64,
  153. buf: &data[offset..end],
  154. }
  155. } else {
  156. BodyRef::Fail(MsgError::Unknown)
  157. };
  158. let fut = assert_send::<'_, Result<()>>(sender_two.send(reply_body));
  159. fut.await.unwrap();
  160. sender_two.finish().await.unwrap();
  161. });
  162. sender_one
  163. .send(BodyRef::Read { offset: 2, size: 2 })
  164. .await
  165. .unwrap();
  166. handle.await.unwrap();
  167. let reply = receiver_two.next().await.unwrap().unwrap();
  168. if let BodyOwned::Write { offset, buf } = reply.body() {
  169. assert_eq!(2, *offset);
  170. assert_eq!([2, 3].as_slice(), buf.as_slice());
  171. } else {
  172. panic!("reply was not the right type");
  173. };
  174. }
  175. async fn file_server<T: Receiver<BodyOwned> + Unpin>(
  176. mut receiver: T,
  177. mut stop_rx: mpsc::Receiver<()>,
  178. ) {
  179. let mut file: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
  180. loop {
  181. let mut received = tokio::select! {
  182. Some(..) = stop_rx.recv() => return,
  183. Some(received) = receiver.next() => received.unwrap(),
  184. };
  185. let reply_body = match received.body() {
  186. BodyOwned::Read { offset, size } => {
  187. let offset: usize = (*offset).try_into().unwrap();
  188. let size: usize = (*size).try_into().unwrap();
  189. let end: usize = offset + size;
  190. BodyRef::Write {
  191. offset: offset as u64,
  192. buf: &file[offset..end],
  193. }
  194. }
  195. BodyOwned::Write { offset, ref buf } => {
  196. let offset: usize = (*offset).try_into().unwrap();
  197. (&mut file[offset..buf.len()]).copy_from_slice(buf);
  198. BodyRef::Success
  199. }
  200. _ => BodyRef::Fail(MsgError::Unknown),
  201. };
  202. received.reply(reply_body).await.unwrap();
  203. }
  204. }
  205. #[tokio::test]
  206. async fn reply_to_read() {
  207. let case = TestCase::new();
  208. let (mut sender, receiver) = case.new_process().await;
  209. let (stop_tx, stop_rx) = mpsc::channel::<()>(1);
  210. let handle = tokio::spawn(file_server(receiver, stop_rx));
  211. let reply = sender
  212. .call(BodyRef::Read { offset: 2, size: 2 })
  213. .await
  214. .unwrap();
  215. if let BodyOwned::Write { offset, buf } = reply {
  216. assert_eq!(2, offset);
  217. assert_eq!([2, 3].as_slice(), buf.as_slice());
  218. } else {
  219. panic!("reply was not the right type");
  220. };
  221. stop_tx.send(()).await.unwrap();
  222. handle.await.unwrap();
  223. }
  224. #[tokio::test]
  225. async fn call_twice() {
  226. let case = TestCase::new();
  227. let (mut sender, receiver) = case.new_process().await;
  228. let (stop_tx, stop_rx) = mpsc::channel::<()>(1);
  229. let handle = tokio::spawn(file_server(receiver, stop_rx));
  230. let reply = sender
  231. .call(BodyRef::Read { offset: 2, size: 2 })
  232. .await
  233. .unwrap();
  234. if let BodyOwned::Write { offset, buf } = reply {
  235. assert_eq!(2, offset);
  236. assert_eq!([2, 3].as_slice(), buf.as_slice());
  237. } else {
  238. panic!("reply was not the right type");
  239. };
  240. let reply = sender
  241. .call(BodyRef::Read { offset: 3, size: 5 })
  242. .await
  243. .unwrap();
  244. if let BodyOwned::Write { offset, buf } = reply {
  245. assert_eq!(3, offset);
  246. assert_eq!([3, 4, 5, 6, 7].as_slice(), buf.as_slice());
  247. } else {
  248. panic!("second reply was not the right type");
  249. }
  250. stop_tx.send(()).await.unwrap();
  251. handle.await.unwrap();
  252. }
  253. #[tokio::test]
  254. async fn replies_sent_out_of_order() {
  255. let case = TestCase::new();
  256. let (sender_one, mut receiver_one) = case.new_process().await;
  257. let (router, ..) = case.new_process_router();
  258. let sender_two = router
  259. .transmitter(Arc::new(sender_one.addr().clone()))
  260. .await
  261. .unwrap();
  262. let handle = tokio::spawn(async move {
  263. const EMPTY_SLICE: &[u8] = &[];
  264. fn reply(body: &BodyOwned) -> BodyRef<'static> {
  265. match body {
  266. BodyOwned::Write { offset, .. } => BodyRef::Write {
  267. offset: *offset,
  268. buf: EMPTY_SLICE,
  269. },
  270. _ => panic!("message was the wrong variant"),
  271. }
  272. }
  273. let mut received_one = receiver_one.next().await.unwrap().unwrap();
  274. let mut received_two = receiver_one.next().await.unwrap().unwrap();
  275. received_two
  276. .reply(reply(received_two.body()))
  277. .await
  278. .unwrap();
  279. received_one
  280. .reply(reply(received_one.body()))
  281. .await
  282. .unwrap();
  283. });
  284. async fn client(num: u64, mut tx: impl Transmitter) {
  285. let fut = assert_send::<'_, Result<BodyOwned>>(tx.call(BodyRef::Write {
  286. offset: num,
  287. buf: [].as_slice(),
  288. }));
  289. let reply = fut.await.unwrap();
  290. if let BodyOwned::Write { offset, .. } = reply {
  291. assert_eq!(num, offset);
  292. } else {
  293. panic!("reply was the wrong variant");
  294. }
  295. }
  296. let handle_one = tokio::spawn(client(1, sender_one));
  297. let handle_two = tokio::spawn(client(2, sender_two));
  298. handle.await.unwrap();
  299. handle_one.await.unwrap();
  300. handle_two.await.unwrap();
  301. }