runtime_tests.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. #![feature(impl_trait_in_assoc_type)]
  2. use btrun::*;
  3. use btlib::{
  4. Result,
  5. crypto::{ConcreteCreds, CredStore, CredsPriv},
  6. log::BuilderExt,
  7. };
  8. use btlib_tests::TEST_STORE;
  9. use btproto::protocol;
  10. use btserde::to_vec;
  11. use bttp::{BlockAddr, Transmitter};
  12. use ctor::ctor;
  13. use lazy_static::lazy_static;
  14. use std::{
  15. net::{IpAddr, Ipv4Addr},
  16. sync::{
  17. Arc,
  18. atomic::{AtomicU8, Ordering}
  19. },
  20. time::{Duration, Instant},
  21. future::{Future, ready, Ready},
  22. };
  23. use tokio::{
  24. runtime::Builder,
  25. sync::mpsc,
  26. };
  27. use serde::{Serialize, Deserialize};
  28. use uuid::Uuid;
  29. const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
  30. lazy_static! {
  31. static ref RUNTIME_CREDS: Arc<ConcreteCreds> = TEST_STORE.node_creds().unwrap();
  32. }
  33. declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
  34. lazy_static! {
  35. /// A tokio async runtime.
  36. ///
  37. /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
  38. /// is created for each test
  39. /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
  40. /// This creates a problem, because the first test thread to access the `RUNTIME` static
  41. /// will initialize its `Receiver` in its runtime, which will stop running at the end of
  42. /// the test. Hence subsequent tests will not be able to send remote messages to this
  43. /// `Runtime`.
  44. ///
  45. /// By creating a single async runtime which is used by all of the tests, we can avoid this
  46. /// problem.
  47. static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread()
  48. .enable_all()
  49. .build()
  50. .unwrap();
  51. }
  52. /// The log level to use when running tests.
  53. const LOG_LEVEL: &str = "warn";
  54. #[ctor]
  55. fn ctor() {
  56. std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
  57. env_logger::Builder::from_default_env().btformat().init();
  58. }
  59. #[derive(Serialize, Deserialize)]
  60. struct EchoMsg(String);
  61. impl CallMsg for EchoMsg {
  62. type Reply = EchoMsg;
  63. }
  64. async fn echo(
  65. _rt: &'static Runtime,
  66. mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
  67. _act_id: Uuid,
  68. ) {
  69. while let Some(envelope) = mailbox.recv().await {
  70. let (msg, replier, ..) = envelope.split();
  71. if let Some(replier) = replier {
  72. if let Err(_) = replier.send(msg) {
  73. panic!("failed to send reply");
  74. }
  75. }
  76. }
  77. }
  78. #[test]
  79. fn local_call() {
  80. ASYNC_RT.block_on(async {
  81. const EXPECTED: &str = "hello";
  82. let name = RUNTIME.activate(echo).await;
  83. let from = ActorName::new(name.path().clone(), Uuid::default());
  84. let reply = RUNTIME
  85. .call(name.clone(), from, EchoMsg(EXPECTED.into()))
  86. .await
  87. .unwrap();
  88. assert_eq!(EXPECTED, reply.0);
  89. RUNTIME.take(&name).await.unwrap();
  90. })
  91. }
  92. #[test]
  93. fn remote_call() {
  94. ASYNC_RT.block_on(async {
  95. const EXPECTED: &str = "hello";
  96. let actor_name = RUNTIME.activate(echo).await;
  97. let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
  98. let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
  99. let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
  100. .await
  101. .unwrap();
  102. let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
  103. let wire_msg = WireMsg::new(
  104. actor_name.clone(),
  105. RUNTIME.actor_name(Uuid::default()),
  106. &buf,
  107. );
  108. let reply = transmitter
  109. .call(wire_msg, ReplyCallback::<EchoMsg>::new())
  110. .await
  111. .unwrap()
  112. .unwrap();
  113. assert_eq!(EXPECTED, reply.0);
  114. RUNTIME.take(&actor_name).await.unwrap();
  115. });
  116. }
  117. /// Tests the `num_running` method.
  118. ///
  119. /// This test uses its own runtime and so can use the `#[tokio::test]` attribute.
  120. #[tokio::test]
  121. async fn num_running() {
  122. declare_runtime!(
  123. LOCAL_RT,
  124. // This needs to be different from the address where `RUNTIME` is listening.
  125. IpAddr::from([127, 0, 0, 2]),
  126. TEST_STORE.node_creds().unwrap()
  127. );
  128. assert_eq!(0, LOCAL_RT.num_running().await);
  129. let name = LOCAL_RT.activate(echo).await;
  130. assert_eq!(1, LOCAL_RT.num_running().await);
  131. LOCAL_RT.take(&name).await.unwrap();
  132. assert_eq!(0, LOCAL_RT.num_running().await);
  133. }
  134. mod ping_pong {
  135. use super::*;
  136. // The following code is a proof-of-concept for what types should be generated for a
  137. // simple ping-pong protocol:
  138. protocol! {
  139. let name = PingPongProtocol;
  140. let states = [
  141. ClientInit, SentPing,
  142. ServerInit, Listening,
  143. ];
  144. ClientInit?Activate -> SentPing, >service(Listening)!Ping;
  145. ServerInit?Activate -> Listening;
  146. Listening?Ping -> End, >SentPing!Ping::Reply;
  147. SentPing?Ping::Reply -> End;
  148. }
  149. //
  150. // In words, the protocol is described as follows.
  151. // 1. The ClientInit state receives the Activate message. It returns the SentPing state and a
  152. // Ping message to be sent to the Listening state.
  153. // 2. The ServerInit state receives the Activate message. It returns the Listening state.
  154. // 3. When the Listening state receives the Ping message it returns the End state and a
  155. // Ping::Reply message to be sent to the SentPing state.
  156. // 4. When the SentPing state receives the Ping::Reply message it returns the End state.
  157. //
  158. // The End state represents an end to the session described by the protocol. When an actor
  159. // transitions to the End state its function returns.
  160. // The generated actor implementation is the sender of the Activate message.
  161. // When a state is expecting a Reply message, an error occurs if the message is not received
  162. // in a timely manner.
  163. #[derive(Serialize, Deserialize)]
  164. pub struct Ping;
  165. impl CallMsg for Ping {
  166. type Reply = PingReply;
  167. }
  168. // I was tempted to name this "Pong", but the proc macro wouldn't think to do that.
  169. #[derive(Serialize, Deserialize)]
  170. pub struct PingReply;
  171. trait ClientInit2 {
  172. type AfterActivate: SentPing2;
  173. type HandleActivateFut: Future<Output = Result<(Self::AfterActivate, Ping)>>;
  174. fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
  175. }
  176. trait ServerInit2 {
  177. type AfterActivate: Listening2;
  178. type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
  179. fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
  180. }
  181. trait Listening2 {
  182. type HandlePingFut: Future<Output = Result<(End, PingReply)>>;
  183. fn handle_ping(self, msg: Ping) -> Self::HandlePingFut;
  184. }
  185. trait SentPing2 {
  186. type HandleReplyFut: Future<Output = Result<End>>;
  187. fn handle_reply(self, msg: PingReply) -> Self::HandleReplyFut;
  188. }
  189. #[derive(Serialize, Deserialize)]
  190. enum PingProtocolMsg {
  191. Ping(Ping),
  192. PingReply(PingReply),
  193. }
  194. impl CallMsg for PingProtocolMsg {
  195. type Reply = PingProtocolMsg;
  196. }
  197. impl SendMsg for PingProtocolMsg {}
  198. struct ClientInitState;
  199. impl ClientInit2 for ClientInitState {
  200. type AfterActivate = ClientState;
  201. type HandleActivateFut = impl Future<Output = Result<(Self::AfterActivate, Ping)>>;
  202. fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
  203. ready(Ok((ClientState, Ping)))
  204. }
  205. }
  206. struct ClientState;
  207. impl SentPing2 for ClientState {
  208. type HandleReplyFut = Ready<Result<End>>;
  209. fn handle_reply(self, _msg: PingReply) -> Self::HandleReplyFut {
  210. ready(Ok(End))
  211. }
  212. }
  213. #[allow(dead_code)]
  214. enum PingClientState {
  215. Init(ClientInitState),
  216. SentPing(ClientState),
  217. End(End),
  218. }
  219. struct ServerInitState;
  220. struct ServerState;
  221. impl ServerInit2 for ServerInitState {
  222. type AfterActivate = ServerState;
  223. type HandleActivateFut = Ready<Result<Self::AfterActivate>>;
  224. fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
  225. ready(Ok(ServerState))
  226. }
  227. }
  228. impl Listening2 for ServerState {
  229. type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
  230. fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
  231. ready(Ok((End, PingReply)))
  232. }
  233. }
  234. #[allow(dead_code)]
  235. enum PingServerState {
  236. ServerInit(ServerInitState),
  237. Listening(ServerState),
  238. End(End),
  239. }
  240. async fn ping_server(
  241. counter: Arc<AtomicU8>,
  242. rt: &'static Runtime,
  243. mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
  244. act_id: Uuid,
  245. ) {
  246. let mut state = {
  247. let init = ServerInitState;
  248. let state = init.handle_activate(Activate::new(rt, act_id)).await.unwrap();
  249. PingServerState::Listening(state)
  250. };
  251. while let Some(envelope) = mailbox.recv().await {
  252. let (msg, replier, _from) = envelope.split();
  253. match (state, msg) {
  254. (PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => {
  255. let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
  256. state = PingServerState::End(new_state);
  257. if let Err(_) = replier.unwrap().send(PingProtocolMsg::PingReply(reply)) {
  258. panic!("Failed to send Ping reply.");
  259. }
  260. }
  261. (_prev_state, _) => {
  262. panic!("Ping protocol violation.");
  263. // A real implementation should assign the previous state and log the error.
  264. // state = prev_state;
  265. }
  266. }
  267. if let PingServerState::End(_) = state {
  268. break;
  269. }
  270. }
  271. counter.fetch_sub(1, Ordering::SeqCst);
  272. }
  273. async fn ping_client(
  274. counter: Arc<AtomicU8>,
  275. server_name: ActorName,
  276. rt: &'static Runtime,
  277. _mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
  278. act_id: Uuid,
  279. ) {
  280. let init = ClientInitState;
  281. let (state, msg) = init.handle_activate(Activate::new(rt, act_id)).await.unwrap();
  282. let from = rt.actor_name(act_id);
  283. let reply = rt
  284. .call(server_name, from, PingProtocolMsg::Ping(msg))
  285. .await
  286. .unwrap();
  287. if let PingProtocolMsg::PingReply(msg) = reply {
  288. state.handle_reply(msg).await.unwrap();
  289. } else {
  290. panic!("Incorrect message type sent in reply to Ping.");
  291. }
  292. counter.fetch_sub(1, Ordering::SeqCst);
  293. }
  294. #[test]
  295. fn ping_pong_test() {
  296. ASYNC_RT.block_on(async {
  297. let counter = Arc::new(AtomicU8::new(2));
  298. let server_name = {
  299. let counter = counter.clone();
  300. RUNTIME
  301. .activate(move |rt, mailbox, act_id| ping_server(counter, rt, mailbox, act_id))
  302. .await
  303. };
  304. let client_name = {
  305. let server_name = server_name.clone();
  306. let counter = counter.clone();
  307. RUNTIME
  308. .activate(move |rt, mailbox, act_id| {
  309. ping_client(counter, server_name, rt, mailbox, act_id)
  310. })
  311. .await
  312. };
  313. let deadline = Instant::now() + Duration::from_millis(500);
  314. while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline {
  315. tokio::time::sleep(Duration::from_millis(20)).await;
  316. }
  317. // Check that both tasks finished successfully and we didn't just timeout.
  318. assert_eq!(0, counter.load(Ordering::SeqCst));
  319. // TODO: Should actor which return be removed from the runtime automatically?
  320. RUNTIME.take(&server_name).await.unwrap();
  321. RUNTIME.take(&client_name).await.unwrap();
  322. });
  323. }
  324. }
  325. mod travel_agency {
  326. use super::*;
  327. // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
  328. // example in the survey paper "Behavioral Types in Programming Languages."
  329. // Note that the Choosing state can send messages at any time, not just in response to another
  330. // message because there is a transition from Choosing that doesn't use the receive operator
  331. // (`?`).
  332. protocol! {
  333. let name = TravelAgency;
  334. let states = [
  335. AgencyInit, Listening,
  336. Choosing,
  337. ];
  338. AgencyInit?Activate -> Listening;
  339. Choosing -> Choosing, >service(Listening)!Query, service(Listening)!Accept, service(Listening)!Reject;
  340. Listening?Query -> Listening, >Choosing!Query::Reply;
  341. Choosing?Query::Reply -> Choosing;
  342. Listening?Accept -> End, >Choosing!Accept::Reply;
  343. Choosing?Accept::Reply -> End;
  344. Listening?Reject -> End, >Choosing!Reject::Reply;
  345. Choosing?Reject::Reply -> End;
  346. }
  347. #[derive(Serialize, Deserialize)]
  348. pub struct Query;
  349. impl CallMsg for Query {
  350. type Reply = ();
  351. }
  352. #[derive(Serialize, Deserialize)]
  353. pub struct Reject;
  354. impl CallMsg for Reject {
  355. type Reply = ();
  356. }
  357. #[derive(Serialize, Deserialize)]
  358. pub struct Accept;
  359. impl CallMsg for Accept {
  360. type Reply = ();
  361. }
  362. }