tests.rs 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. use btmsg::*;
  2. use btlib::{
  3. crypto::{ConcreteCreds, Creds, CredsPriv},
  4. BlockPath, Epoch, Principal, Principaled,
  5. };
  6. use core::future::Future;
  7. use ctor::ctor;
  8. use lazy_static::lazy_static;
  9. use serde::{Deserialize, Serialize};
  10. use std::{
  11. net::{IpAddr, Ipv6Addr},
  12. sync::{Arc, Mutex},
  13. time::Duration,
  14. };
  15. use tokio::sync::mpsc::{self, Sender};
  16. #[ctor]
  17. fn setup_logging() {
  18. use env_logger::Env;
  19. let env = Env::default().default_filter_or("ERROR");
  20. env_logger::init_from_env(env);
  21. }
  22. lazy_static! {
  23. static ref ROOT_CREDS: ConcreteCreds = ConcreteCreds::generate().unwrap();
  24. static ref NODE_CREDS: ConcreteCreds = {
  25. let mut creds = ConcreteCreds::generate().unwrap();
  26. let root_creds = &ROOT_CREDS;
  27. let writecap = root_creds
  28. .issue_writecap(
  29. creds.principal(),
  30. vec![],
  31. Epoch::now() + Duration::from_secs(3600),
  32. )
  33. .unwrap();
  34. creds.set_writecap(writecap);
  35. creds
  36. };
  37. static ref ROOT_PRINCIPAL: Principal = ROOT_CREDS.principal();
  38. }
  39. #[derive(Debug, Serialize, Deserialize)]
  40. enum Reply {
  41. Success,
  42. Fail,
  43. ReadReply { offset: u64, buf: Vec<u8> },
  44. }
  45. #[derive(Serialize, Deserialize)]
  46. enum Msg<'a> {
  47. Ping,
  48. Success,
  49. Fail,
  50. Read { offset: u64, size: u64 },
  51. Write { offset: u64, buf: &'a [u8] },
  52. }
  53. impl<'a> CallMsg<'a> for Msg<'a> {
  54. type Reply = Reply;
  55. }
  56. impl<'a> SendMsg<'a> for Msg<'a> {}
  57. trait TestFunc<S: 'static + Send, Fut: Send + Future>:
  58. Send + Sync + Fn(MsgReceived<Msg<'_>>, Sender<S>) -> Fut
  59. {
  60. }
  61. impl<
  62. S: 'static + Send,
  63. Fut: Send + Future,
  64. T: Send + Sync + Fn(MsgReceived<Msg<'_>>, Sender<S>) -> Fut,
  65. > TestFunc<S, Fut> for T
  66. {
  67. }
  68. struct Delegate<S, Fut> {
  69. func: Arc<dyn TestFunc<S, Fut>>,
  70. sender: Sender<S>,
  71. }
  72. impl<S, Fut> Clone for Delegate<S, Fut> {
  73. fn clone(&self) -> Self {
  74. Self {
  75. func: self.func.clone(),
  76. sender: self.sender.clone(),
  77. }
  78. }
  79. }
  80. impl<S: 'static + Send, Fut: Send + Future> Delegate<S, Fut> {
  81. fn new<F: 'static + TestFunc<S, Fut>>(sender: Sender<S>, func: F) -> Self {
  82. Self {
  83. func: Arc::new(func),
  84. sender,
  85. }
  86. }
  87. }
  88. impl<S: 'static + Send, Fut: Send + Future> MsgCallback for Delegate<S, Fut> {
  89. type Arg<'de> = Msg<'de> where Self: 'de;
  90. type Return = Fut::Output;
  91. type CallFut<'s> = Fut where Fut: 's;
  92. fn call<'de>(&'de self, arg: MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de> {
  93. (self.func)(arg, self.sender.clone())
  94. }
  95. }
  96. fn proc_creds() -> impl Creds {
  97. let mut creds = ConcreteCreds::generate().unwrap();
  98. let writecap = NODE_CREDS
  99. .issue_writecap(
  100. creds.principal(),
  101. vec![],
  102. Epoch::now() + Duration::from_secs(3600),
  103. )
  104. .unwrap();
  105. creds.set_writecap(writecap);
  106. creds
  107. }
  108. fn proc_rx<F: 'static + MsgCallback>(callback: F) -> (impl Receiver, Arc<BlockAddr>) {
  109. let ip_addr = IpAddr::V6(Ipv6Addr::LOCALHOST);
  110. let creds = proc_creds();
  111. let writecap = creds.writecap().unwrap();
  112. let addr = Arc::new(BlockAddr::new(ip_addr, Arc::new(writecap.bind_path())));
  113. (receiver(ip_addr, Arc::new(creds), callback).unwrap(), addr)
  114. }
  115. async fn proc_tx_rx<F: 'static + MsgCallback>(func: F) -> (impl Transmitter, impl Receiver) {
  116. let (receiver, addr) = proc_rx(func);
  117. let sender = receiver.transmitter(addr).await.unwrap();
  118. (sender, receiver)
  119. }
  120. async fn file_server() -> (impl Transmitter, impl Receiver) {
  121. let (sender, _) = mpsc::channel::<()>(1);
  122. let file = Arc::new(Mutex::new([0, 1, 2, 3, 4, 5, 6, 7]));
  123. proc_tx_rx(Delegate::new(
  124. sender,
  125. move |mut received: MsgReceived<Msg<'_>>, _| {
  126. let mut guard = file.lock().unwrap();
  127. let reply_body = match received.body() {
  128. Msg::Read { offset, size } => {
  129. let offset: usize = (*offset).try_into().unwrap();
  130. let size: usize = (*size).try_into().unwrap();
  131. let end: usize = offset + size;
  132. let mut buf = Vec::with_capacity(end - offset);
  133. buf.extend_from_slice(&guard[offset..end]);
  134. Reply::ReadReply {
  135. offset: offset as u64,
  136. buf,
  137. }
  138. }
  139. Msg::Write { offset, ref buf } => {
  140. let offset: usize = (*offset).try_into().unwrap();
  141. let end: usize = offset + buf.len();
  142. (&mut guard[offset..end]).copy_from_slice(buf);
  143. Reply::Success
  144. }
  145. _ => Reply::Fail,
  146. };
  147. let replier = received.take_replier().unwrap();
  148. async move { replier.reply(reply_body).await }
  149. },
  150. ))
  151. .await
  152. }
  153. async fn timeout<F: Future>(future: F) -> F::Output {
  154. tokio::time::timeout(Duration::from_millis(1000), future)
  155. .await
  156. .unwrap()
  157. }
  158. macro_rules! recv {
  159. ($rx:expr) => {
  160. timeout($rx.recv()).await.unwrap()
  161. };
  162. }
  163. #[tokio::test]
  164. async fn message_received_is_message_sent() {
  165. let (sender, mut passed) = mpsc::channel(1);
  166. let (mut sender, _receiver) = proc_tx_rx(Delegate::new(
  167. sender,
  168. |msg: MsgReceived<Msg<'_>>, sender: Sender<bool>| {
  169. let passed = if let Msg::Ping = msg.body() {
  170. true
  171. } else {
  172. false
  173. };
  174. let sender = sender.clone();
  175. async move {
  176. sender.send(passed).await.unwrap();
  177. }
  178. },
  179. ))
  180. .await;
  181. sender.send(Msg::Ping).await.unwrap();
  182. assert!(recv!(passed));
  183. }
  184. #[tokio::test]
  185. async fn message_received_from_path_is_correct() {
  186. let (sender, mut path) = mpsc::channel(1);
  187. let (mut sender, receiver) = proc_tx_rx(Delegate::new(
  188. sender,
  189. |msg: MsgReceived<Msg<'_>>, sender: Sender<Arc<BlockPath>>| {
  190. let path = msg.from().clone();
  191. let sender = sender.clone();
  192. async move {
  193. sender.send(path).await.unwrap();
  194. }
  195. },
  196. ))
  197. .await;
  198. sender.send(Msg::Ping).await.unwrap();
  199. assert_eq!(receiver.addr().path(), recv!(path).as_ref());
  200. }
  201. #[tokio::test]
  202. async fn reply_to_read() {
  203. let (mut sender, _receiver) = file_server().await;
  204. let reply = sender
  205. .call(Msg::Read { offset: 2, size: 2 })
  206. .await
  207. .unwrap();
  208. if let Reply::ReadReply { offset, buf } = reply {
  209. assert_eq!(2, offset);
  210. assert_eq!([2, 3].as_slice(), buf.as_slice());
  211. } else {
  212. panic!("reply was not the right type");
  213. };
  214. }
  215. #[tokio::test]
  216. async fn call_twice() {
  217. let (mut sender, _receiver) = file_server().await;
  218. let reply = sender
  219. .call(Msg::Write {
  220. offset: 1,
  221. buf: &[1, 1],
  222. })
  223. .await
  224. .unwrap();
  225. if let Reply::Success = reply {
  226. ()
  227. } else {
  228. panic!("reply was not the right type");
  229. };
  230. let reply = sender
  231. .call(Msg::Read { offset: 1, size: 2 })
  232. .await
  233. .unwrap();
  234. if let Reply::ReadReply { offset, buf } = reply {
  235. assert_eq!(1, offset);
  236. assert_eq!([1, 1].as_slice(), buf.as_slice());
  237. } else {
  238. panic!("second reply was not the right type");
  239. }
  240. }
  241. #[tokio::test]
  242. async fn separate_transmitter() {
  243. let (_senderx, receiver) = file_server().await;
  244. let creds = proc_creds();
  245. let mut transmitter = transmitter(receiver.addr().clone(), Arc::new(creds))
  246. .await
  247. .unwrap();
  248. let reply = transmitter
  249. .call(Msg::Write {
  250. offset: 5,
  251. buf: &[7, 7, 7],
  252. })
  253. .await
  254. .unwrap();
  255. let matched = if let Reply::Success = reply {
  256. true
  257. } else {
  258. false
  259. };
  260. assert!(matched);
  261. }