runtime_tests.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. #![feature(impl_trait_in_assoc_type)]
  2. use btrun::*;
  3. use btlib::{
  4. crypto::{ConcreteCreds, CredStore, CredsPriv},
  5. log::BuilderExt,
  6. Result,
  7. };
  8. use btlib_tests::TEST_STORE;
  9. use btproto::protocol;
  10. use btserde::to_vec;
  11. use bttp::{BlockAddr, Transmitter};
  12. use ctor::ctor;
  13. use lazy_static::lazy_static;
  14. use serde::{Deserialize, Serialize};
  15. use std::{
  16. future::{ready, Future, Ready},
  17. net::{IpAddr, Ipv4Addr},
  18. sync::{
  19. atomic::{AtomicU8, Ordering},
  20. Arc,
  21. },
  22. time::{Duration, Instant},
  23. };
  24. use tokio::{runtime::Builder, sync::mpsc};
  25. use uuid::Uuid;
  26. const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
  27. lazy_static! {
  28. static ref RUNTIME_CREDS: Arc<ConcreteCreds> = TEST_STORE.node_creds().unwrap();
  29. }
  30. declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
  31. lazy_static! {
  32. /// A tokio async runtime.
  33. ///
  34. /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
  35. /// is created for each test
  36. /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
  37. /// This creates a problem, because the first test thread to access the `RUNTIME` static
  38. /// will initialize its `Receiver` in its runtime, which will stop running at the end of
  39. /// the test. Hence subsequent tests will not be able to send remote messages to this
  40. /// `Runtime`.
  41. ///
  42. /// By creating a single async runtime which is used by all of the tests, we can avoid this
  43. /// problem.
  44. static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread()
  45. .enable_all()
  46. .build()
  47. .unwrap();
  48. }
  49. /// The log level to use when running tests.
  50. const LOG_LEVEL: &str = "warn";
  51. #[ctor]
  52. fn ctor() {
  53. std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
  54. env_logger::Builder::from_default_env().btformat().init();
  55. }
  56. #[derive(Serialize, Deserialize)]
  57. struct EchoMsg(String);
  58. impl CallMsg for EchoMsg {
  59. type Reply = EchoMsg;
  60. }
  61. async fn echo(
  62. _rt: &'static Runtime,
  63. mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
  64. _act_id: Uuid,
  65. ) {
  66. while let Some(envelope) = mailbox.recv().await {
  67. let (msg, replier, ..) = envelope.split();
  68. if let Some(replier) = replier {
  69. if let Err(_) = replier.send(msg) {
  70. panic!("failed to send reply");
  71. }
  72. }
  73. }
  74. }
  75. #[test]
  76. fn local_call() {
  77. ASYNC_RT.block_on(async {
  78. const EXPECTED: &str = "hello";
  79. let name = RUNTIME.activate(echo).await;
  80. let from = ActorName::new(name.path().clone(), Uuid::default());
  81. let reply = RUNTIME
  82. .call(name.clone(), from, EchoMsg(EXPECTED.into()))
  83. .await
  84. .unwrap();
  85. assert_eq!(EXPECTED, reply.0);
  86. RUNTIME.take(&name).await.unwrap();
  87. })
  88. }
  89. #[test]
  90. fn remote_call() {
  91. ASYNC_RT.block_on(async {
  92. const EXPECTED: &str = "hello";
  93. let actor_name = RUNTIME.activate(echo).await;
  94. let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
  95. let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
  96. let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
  97. .await
  98. .unwrap();
  99. let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
  100. let wire_msg = WireMsg::new(
  101. actor_name.clone(),
  102. RUNTIME.actor_name(Uuid::default()),
  103. &buf,
  104. );
  105. let reply = transmitter
  106. .call(wire_msg, ReplyCallback::<EchoMsg>::new())
  107. .await
  108. .unwrap()
  109. .unwrap();
  110. assert_eq!(EXPECTED, reply.0);
  111. RUNTIME.take(&actor_name).await.unwrap();
  112. });
  113. }
  114. /// Tests the `num_running` method.
  115. ///
  116. /// This test uses its own runtime and so can use the `#[tokio::test]` attribute.
  117. #[tokio::test]
  118. async fn num_running() {
  119. declare_runtime!(
  120. LOCAL_RT,
  121. // This needs to be different from the address where `RUNTIME` is listening.
  122. IpAddr::from([127, 0, 0, 2]),
  123. TEST_STORE.node_creds().unwrap()
  124. );
  125. assert_eq!(0, LOCAL_RT.num_running().await);
  126. let name = LOCAL_RT.activate(echo).await;
  127. assert_eq!(1, LOCAL_RT.num_running().await);
  128. LOCAL_RT.take(&name).await.unwrap();
  129. assert_eq!(0, LOCAL_RT.num_running().await);
  130. }
  131. mod ping_pong {
  132. use super::*;
  133. // The following code is a proof-of-concept for what types should be generated for a
  134. // simple ping-pong protocol:
  135. protocol! {
  136. named PingPongProtocol;
  137. let server = [Server];
  138. let client = [Client, Waiting];
  139. Client -> Waiting, >service(Server)!Ping;
  140. Server?Ping -> End, >Waiting!Ping::Reply;
  141. Waiting?Ping::Reply -> End;
  142. }
  143. //
  144. // In words, the protocol is described as follows.
  145. // 1. The ClientInit state receives the Activate message. It returns the SentPing state and a
  146. // Ping message to be sent to the Listening state.
  147. // 2. The ServerInit state receives the Activate message. It returns the Listening state.
  148. // 3. When the Listening state receives the Ping message it returns the End state and a
  149. // Ping::Reply message to be sent to the SentPing state.
  150. // 4. When the SentPing state receives the Ping::Reply message it returns the End state.
  151. //
  152. // The End state represents an end to the session described by the protocol. When an actor
  153. // transitions to the End state its function returns.
  154. // The generated actor implementation is the sender of the Activate message.
  155. // When a state is expecting a Reply message, an error occurs if the message is not received
  156. // in a timely manner.
  157. #[derive(Serialize, Deserialize)]
  158. pub struct Ping;
  159. impl CallMsg for Ping {
  160. type Reply = PingReply;
  161. }
  162. // I was tempted to name this "Pong", but the proc macro wouldn't think to do that.
  163. #[derive(Serialize, Deserialize)]
  164. pub struct PingReply;
  165. trait ClientInit2 {
  166. type AfterActivate: SentPing2;
  167. type HandleActivateFut: Future<Output = Result<(Self::AfterActivate, Ping)>>;
  168. fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
  169. }
  170. trait ServerInit2 {
  171. type AfterActivate: Listening2;
  172. type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
  173. fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
  174. }
  175. trait Listening2 {
  176. type HandlePingFut: Future<Output = Result<(End, PingReply)>>;
  177. fn handle_ping(self, msg: Ping) -> Self::HandlePingFut;
  178. }
  179. trait SentPing2 {
  180. type HandleReplyFut: Future<Output = Result<End>>;
  181. fn handle_reply(self, msg: PingReply) -> Self::HandleReplyFut;
  182. }
  183. #[derive(Serialize, Deserialize)]
  184. enum PingProtocolMsg {
  185. Ping(Ping),
  186. PingReply(PingReply),
  187. }
  188. impl CallMsg for PingProtocolMsg {
  189. type Reply = PingProtocolMsg;
  190. }
  191. impl SendMsg for PingProtocolMsg {}
  192. struct ClientInitState;
  193. impl ClientInit2 for ClientInitState {
  194. type AfterActivate = ClientState;
  195. type HandleActivateFut = impl Future<Output = Result<(Self::AfterActivate, Ping)>>;
  196. fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
  197. ready(Ok((ClientState, Ping)))
  198. }
  199. }
  200. struct ClientState;
  201. impl SentPing2 for ClientState {
  202. type HandleReplyFut = Ready<Result<End>>;
  203. fn handle_reply(self, _msg: PingReply) -> Self::HandleReplyFut {
  204. ready(Ok(End))
  205. }
  206. }
  207. #[allow(dead_code)]
  208. enum PingClientState {
  209. Init(ClientInitState),
  210. SentPing(ClientState),
  211. End(End),
  212. }
  213. struct ServerInitState;
  214. struct ServerState;
  215. impl ServerInit2 for ServerInitState {
  216. type AfterActivate = ServerState;
  217. type HandleActivateFut = Ready<Result<Self::AfterActivate>>;
  218. fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
  219. ready(Ok(ServerState))
  220. }
  221. }
  222. impl Listening2 for ServerState {
  223. type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
  224. fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
  225. ready(Ok((End, PingReply)))
  226. }
  227. }
  228. #[allow(dead_code)]
  229. enum PingServerState {
  230. ServerInit(ServerInitState),
  231. Listening(ServerState),
  232. End(End),
  233. }
  234. async fn ping_server(
  235. counter: Arc<AtomicU8>,
  236. rt: &'static Runtime,
  237. mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
  238. act_id: Uuid,
  239. ) {
  240. let mut state = {
  241. let init = ServerInitState;
  242. let state = init
  243. .handle_activate(Activate::new(rt, act_id))
  244. .await
  245. .unwrap();
  246. PingServerState::Listening(state)
  247. };
  248. while let Some(envelope) = mailbox.recv().await {
  249. let (msg, replier, _from) = envelope.split();
  250. match (state, msg) {
  251. (PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => {
  252. let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
  253. state = PingServerState::End(new_state);
  254. if let Err(_) = replier.unwrap().send(PingProtocolMsg::PingReply(reply)) {
  255. panic!("Failed to send Ping reply.");
  256. }
  257. }
  258. (_prev_state, _) => {
  259. panic!("Ping protocol violation.");
  260. // A real implementation should assign the previous state and log the error.
  261. // state = prev_state;
  262. }
  263. }
  264. if let PingServerState::End(_) = state {
  265. break;
  266. }
  267. }
  268. counter.fetch_sub(1, Ordering::SeqCst);
  269. }
  270. async fn ping_client(
  271. counter: Arc<AtomicU8>,
  272. server_name: ActorName,
  273. rt: &'static Runtime,
  274. _mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
  275. act_id: Uuid,
  276. ) {
  277. let init = ClientInitState;
  278. let (state, msg) = init
  279. .handle_activate(Activate::new(rt, act_id))
  280. .await
  281. .unwrap();
  282. let from = rt.actor_name(act_id);
  283. let reply = rt
  284. .call(server_name, from, PingProtocolMsg::Ping(msg))
  285. .await
  286. .unwrap();
  287. if let PingProtocolMsg::PingReply(msg) = reply {
  288. state.handle_reply(msg).await.unwrap();
  289. } else {
  290. panic!("Incorrect message type sent in reply to Ping.");
  291. }
  292. counter.fetch_sub(1, Ordering::SeqCst);
  293. }
  294. #[test]
  295. fn ping_pong_test() {
  296. ASYNC_RT.block_on(async {
  297. let counter = Arc::new(AtomicU8::new(2));
  298. let server_name = {
  299. let counter = counter.clone();
  300. RUNTIME
  301. .activate(move |rt, mailbox, act_id| ping_server(counter, rt, mailbox, act_id))
  302. .await
  303. };
  304. let client_name = {
  305. let server_name = server_name.clone();
  306. let counter = counter.clone();
  307. RUNTIME
  308. .activate(move |rt, mailbox, act_id| {
  309. ping_client(counter, server_name, rt, mailbox, act_id)
  310. })
  311. .await
  312. };
  313. let deadline = Instant::now() + Duration::from_millis(500);
  314. while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline {
  315. tokio::time::sleep(Duration::from_millis(20)).await;
  316. }
  317. // Check that both tasks finished successfully and we didn't just timeout.
  318. assert_eq!(0, counter.load(Ordering::SeqCst));
  319. // TODO: Should actor which return be removed from the runtime automatically?
  320. RUNTIME.take(&server_name).await.unwrap();
  321. RUNTIME.take(&client_name).await.unwrap();
  322. });
  323. }
  324. }
  325. mod travel_agency {
  326. use super::*;
  327. // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
  328. // example in the survey paper "Behavioral Types in Programming Languages."
  329. // Note that the Choosing state can send messages at any time, not just in response to another
  330. // message because there is a transition from Choosing that doesn't use the receive operator
  331. // (`?`).
  332. protocol! {
  333. named TravelAgency;
  334. let agency = [Listening];
  335. let customer = [Choosing];
  336. Choosing -> Choosing, >service(Listening)!Query;
  337. Choosing -> Choosing, >service(Listening)!Accept;
  338. Choosing -> Choosing, >service(Listening)!Reject;
  339. Listening?Query -> Listening, >Choosing!Query::Reply;
  340. Choosing?Query::Reply -> Choosing;
  341. Listening?Accept -> End, >Choosing!Accept::Reply;
  342. Choosing?Accept::Reply -> End;
  343. Listening?Reject -> End, >Choosing!Reject::Reply;
  344. Choosing?Reject::Reply -> End;
  345. }
  346. #[derive(Serialize, Deserialize)]
  347. pub struct Query;
  348. impl CallMsg for Query {
  349. type Reply = ();
  350. }
  351. #[derive(Serialize, Deserialize)]
  352. pub struct Reject;
  353. impl CallMsg for Reject {
  354. type Reply = ();
  355. }
  356. #[derive(Serialize, Deserialize)]
  357. pub struct Accept;
  358. impl CallMsg for Accept {
  359. type Reply = ();
  360. }
  361. }