runtime_tests.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. #![feature(impl_trait_in_assoc_type)]
  2. use btrun::*;
  3. use btlib::{
  4. crypto::{ConcreteCreds, CredStore, CredsPriv},
  5. log::BuilderExt,
  6. Result,
  7. };
  8. use btlib_tests::TEST_STORE;
  9. use btproto::protocol;
  10. use btserde::to_vec;
  11. use bttp::{BlockAddr, Transmitter};
  12. use ctor::ctor;
  13. use lazy_static::lazy_static;
  14. use serde::{Deserialize, Serialize};
  15. use std::{
  16. future::{ready, Future, Ready},
  17. net::{IpAddr, Ipv4Addr},
  18. sync::{
  19. atomic::{AtomicU8, Ordering},
  20. Arc,
  21. },
  22. time::{Duration, Instant},
  23. };
  24. use tokio::{runtime::Builder, sync::mpsc};
  25. use uuid::Uuid;
  26. const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
  27. lazy_static! {
  28. static ref RUNTIME_CREDS: Arc<ConcreteCreds> = TEST_STORE.node_creds().unwrap();
  29. }
  30. declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
  31. lazy_static! {
  32. /// A tokio async runtime.
  33. ///
  34. /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
  35. /// is created for each test
  36. /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
  37. /// This creates a problem, because the first test thread to access the `RUNTIME` static
  38. /// will initialize its `Receiver` in its runtime, which will stop running at the end of
  39. /// the test. Hence subsequent tests will not be able to send remote messages to this
  40. /// `Runtime`.
  41. ///
  42. /// By creating a single async runtime which is used by all of the tests, we can avoid this
  43. /// problem.
  44. static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread()
  45. .enable_all()
  46. .build()
  47. .unwrap();
  48. }
  49. /// The log level to use when running tests.
  50. const LOG_LEVEL: &str = "warn";
  51. #[ctor]
  52. fn ctor() {
  53. std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
  54. env_logger::Builder::from_default_env().btformat().init();
  55. }
  56. #[derive(Serialize, Deserialize)]
  57. struct EchoMsg(String);
  58. impl CallMsg for EchoMsg {
  59. type Reply = EchoMsg;
  60. }
  61. async fn echo(
  62. _rt: &'static Runtime,
  63. mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
  64. _act_id: Uuid,
  65. ) {
  66. while let Some(envelope) = mailbox.recv().await {
  67. let (msg, replier, ..) = envelope.split();
  68. if let Some(replier) = replier {
  69. if let Err(_) = replier.send(msg) {
  70. panic!("failed to send reply");
  71. }
  72. }
  73. }
  74. }
  75. #[test]
  76. fn local_call() {
  77. ASYNC_RT.block_on(async {
  78. const EXPECTED: &str = "hello";
  79. let name = RUNTIME.activate(echo).await;
  80. let from = ActorName::new(name.path().clone(), Uuid::default());
  81. let reply = RUNTIME
  82. .call(name.clone(), from, EchoMsg(EXPECTED.into()))
  83. .await
  84. .unwrap();
  85. assert_eq!(EXPECTED, reply.0);
  86. RUNTIME.take(&name).await.unwrap();
  87. })
  88. }
  89. #[test]
  90. fn remote_call() {
  91. ASYNC_RT.block_on(async {
  92. const EXPECTED: &str = "hello";
  93. let actor_name = RUNTIME.activate(echo).await;
  94. let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
  95. let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
  96. let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
  97. .await
  98. .unwrap();
  99. let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
  100. let wire_msg = WireMsg::new(
  101. actor_name.clone(),
  102. RUNTIME.actor_name(Uuid::default()),
  103. &buf,
  104. );
  105. let reply = transmitter
  106. .call(wire_msg, ReplyCallback::<EchoMsg>::new())
  107. .await
  108. .unwrap()
  109. .unwrap();
  110. assert_eq!(EXPECTED, reply.0);
  111. RUNTIME.take(&actor_name).await.unwrap();
  112. });
  113. }
  114. /// Tests the `num_running` method.
  115. ///
  116. /// This test uses its own runtime and so can use the `#[tokio::test]` attribute.
  117. #[tokio::test]
  118. async fn num_running() {
  119. declare_runtime!(
  120. LOCAL_RT,
  121. // This needs to be different from the address where `RUNTIME` is listening.
  122. IpAddr::from([127, 0, 0, 2]),
  123. TEST_STORE.node_creds().unwrap()
  124. );
  125. assert_eq!(0, LOCAL_RT.num_running().await);
  126. let name = LOCAL_RT.activate(echo).await;
  127. assert_eq!(1, LOCAL_RT.num_running().await);
  128. LOCAL_RT.take(&name).await.unwrap();
  129. assert_eq!(0, LOCAL_RT.num_running().await);
  130. }
  131. mod ping_pong {
  132. use super::*;
  133. // The following code is a proof-of-concept for what types should be generated for a
  134. // simple ping-pong protocol:
  135. protocol! {
  136. named PingPongProtocol;
  137. let states = [
  138. ClientInit, SentPing,
  139. ServerInit, Listening,
  140. ];
  141. ClientInit?Activate -> SentPing, >service(Listening)!Ping;
  142. ServerInit?Activate -> Listening;
  143. Listening?Ping -> End, >SentPing!Ping::Reply;
  144. SentPing?Ping::Reply -> End;
  145. }
  146. //
  147. // In words, the protocol is described as follows.
  148. // 1. The ClientInit state receives the Activate message. It returns the SentPing state and a
  149. // Ping message to be sent to the Listening state.
  150. // 2. The ServerInit state receives the Activate message. It returns the Listening state.
  151. // 3. When the Listening state receives the Ping message it returns the End state and a
  152. // Ping::Reply message to be sent to the SentPing state.
  153. // 4. When the SentPing state receives the Ping::Reply message it returns the End state.
  154. //
  155. // The End state represents an end to the session described by the protocol. When an actor
  156. // transitions to the End state its function returns.
  157. // The generated actor implementation is the sender of the Activate message.
  158. // When a state is expecting a Reply message, an error occurs if the message is not received
  159. // in a timely manner.
  160. #[derive(Serialize, Deserialize)]
  161. pub struct Ping;
  162. impl CallMsg for Ping {
  163. type Reply = PingReply;
  164. }
  165. // I was tempted to name this "Pong", but the proc macro wouldn't think to do that.
  166. #[derive(Serialize, Deserialize)]
  167. pub struct PingReply;
  168. trait ClientInit2 {
  169. type AfterActivate: SentPing2;
  170. type HandleActivateFut: Future<Output = Result<(Self::AfterActivate, Ping)>>;
  171. fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
  172. }
  173. trait ServerInit2 {
  174. type AfterActivate: Listening2;
  175. type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
  176. fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
  177. }
  178. trait Listening2 {
  179. type HandlePingFut: Future<Output = Result<(End, PingReply)>>;
  180. fn handle_ping(self, msg: Ping) -> Self::HandlePingFut;
  181. }
  182. trait SentPing2 {
  183. type HandleReplyFut: Future<Output = Result<End>>;
  184. fn handle_reply(self, msg: PingReply) -> Self::HandleReplyFut;
  185. }
  186. #[derive(Serialize, Deserialize)]
  187. enum PingProtocolMsg {
  188. Ping(Ping),
  189. PingReply(PingReply),
  190. }
  191. impl CallMsg for PingProtocolMsg {
  192. type Reply = PingProtocolMsg;
  193. }
  194. impl SendMsg for PingProtocolMsg {}
  195. struct ClientInitState;
  196. impl ClientInit2 for ClientInitState {
  197. type AfterActivate = ClientState;
  198. type HandleActivateFut = impl Future<Output = Result<(Self::AfterActivate, Ping)>>;
  199. fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
  200. ready(Ok((ClientState, Ping)))
  201. }
  202. }
  203. struct ClientState;
  204. impl SentPing2 for ClientState {
  205. type HandleReplyFut = Ready<Result<End>>;
  206. fn handle_reply(self, _msg: PingReply) -> Self::HandleReplyFut {
  207. ready(Ok(End))
  208. }
  209. }
  210. #[allow(dead_code)]
  211. enum PingClientState {
  212. Init(ClientInitState),
  213. SentPing(ClientState),
  214. End(End),
  215. }
  216. struct ServerInitState;
  217. struct ServerState;
  218. impl ServerInit2 for ServerInitState {
  219. type AfterActivate = ServerState;
  220. type HandleActivateFut = Ready<Result<Self::AfterActivate>>;
  221. fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
  222. ready(Ok(ServerState))
  223. }
  224. }
  225. impl Listening2 for ServerState {
  226. type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
  227. fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
  228. ready(Ok((End, PingReply)))
  229. }
  230. }
  231. #[allow(dead_code)]
  232. enum PingServerState {
  233. ServerInit(ServerInitState),
  234. Listening(ServerState),
  235. End(End),
  236. }
  237. async fn ping_server(
  238. counter: Arc<AtomicU8>,
  239. rt: &'static Runtime,
  240. mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
  241. act_id: Uuid,
  242. ) {
  243. let mut state = {
  244. let init = ServerInitState;
  245. let state = init
  246. .handle_activate(Activate::new(rt, act_id))
  247. .await
  248. .unwrap();
  249. PingServerState::Listening(state)
  250. };
  251. while let Some(envelope) = mailbox.recv().await {
  252. let (msg, replier, _from) = envelope.split();
  253. match (state, msg) {
  254. (PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => {
  255. let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
  256. state = PingServerState::End(new_state);
  257. if let Err(_) = replier.unwrap().send(PingProtocolMsg::PingReply(reply)) {
  258. panic!("Failed to send Ping reply.");
  259. }
  260. }
  261. (_prev_state, _) => {
  262. panic!("Ping protocol violation.");
  263. // A real implementation should assign the previous state and log the error.
  264. // state = prev_state;
  265. }
  266. }
  267. if let PingServerState::End(_) = state {
  268. break;
  269. }
  270. }
  271. counter.fetch_sub(1, Ordering::SeqCst);
  272. }
  273. async fn ping_client(
  274. counter: Arc<AtomicU8>,
  275. server_name: ActorName,
  276. rt: &'static Runtime,
  277. _mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
  278. act_id: Uuid,
  279. ) {
  280. let init = ClientInitState;
  281. let (state, msg) = init
  282. .handle_activate(Activate::new(rt, act_id))
  283. .await
  284. .unwrap();
  285. let from = rt.actor_name(act_id);
  286. let reply = rt
  287. .call(server_name, from, PingProtocolMsg::Ping(msg))
  288. .await
  289. .unwrap();
  290. if let PingProtocolMsg::PingReply(msg) = reply {
  291. state.handle_reply(msg).await.unwrap();
  292. } else {
  293. panic!("Incorrect message type sent in reply to Ping.");
  294. }
  295. counter.fetch_sub(1, Ordering::SeqCst);
  296. }
  297. #[test]
  298. fn ping_pong_test() {
  299. ASYNC_RT.block_on(async {
  300. let counter = Arc::new(AtomicU8::new(2));
  301. let server_name = {
  302. let counter = counter.clone();
  303. RUNTIME
  304. .activate(move |rt, mailbox, act_id| ping_server(counter, rt, mailbox, act_id))
  305. .await
  306. };
  307. let client_name = {
  308. let server_name = server_name.clone();
  309. let counter = counter.clone();
  310. RUNTIME
  311. .activate(move |rt, mailbox, act_id| {
  312. ping_client(counter, server_name, rt, mailbox, act_id)
  313. })
  314. .await
  315. };
  316. let deadline = Instant::now() + Duration::from_millis(500);
  317. while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline {
  318. tokio::time::sleep(Duration::from_millis(20)).await;
  319. }
  320. // Check that both tasks finished successfully and we didn't just timeout.
  321. assert_eq!(0, counter.load(Ordering::SeqCst));
  322. // TODO: Should actor which return be removed from the runtime automatically?
  323. RUNTIME.take(&server_name).await.unwrap();
  324. RUNTIME.take(&client_name).await.unwrap();
  325. });
  326. }
  327. }
  328. mod travel_agency {
  329. use super::*;
  330. // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
  331. // example in the survey paper "Behavioral Types in Programming Languages."
  332. // Note that the Choosing state can send messages at any time, not just in response to another
  333. // message because there is a transition from Choosing that doesn't use the receive operator
  334. // (`?`).
  335. protocol! {
  336. named TravelAgency;
  337. let states = [
  338. AgencyInit, Listening,
  339. Choosing,
  340. ];
  341. AgencyInit?Activate -> Listening;
  342. Choosing -> Choosing, >service(Listening)!Query, service(Listening)!Accept, service(Listening)!Reject;
  343. Listening?Query -> Listening, >Choosing!Query::Reply;
  344. Choosing?Query::Reply -> Choosing;
  345. Listening?Accept -> End, >Choosing!Accept::Reply;
  346. Choosing?Accept::Reply -> End;
  347. Listening?Reject -> End, >Choosing!Reject::Reply;
  348. Choosing?Reject::Reply -> End;
  349. }
  350. #[derive(Serialize, Deserialize)]
  351. pub struct Query;
  352. impl CallMsg for Query {
  353. type Reply = ();
  354. }
  355. #[derive(Serialize, Deserialize)]
  356. pub struct Reject;
  357. impl CallMsg for Reject {
  358. type Reply = ();
  359. }
  360. #[derive(Serialize, Deserialize)]
  361. pub struct Accept;
  362. impl CallMsg for Accept {
  363. type Reply = ();
  364. }
  365. }