runtime_tests.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. #![feature(impl_trait_in_assoc_type)]
  2. use btrun::*;
  3. use btlib::{
  4. crypto::{ConcreteCreds, CredStore, CredsPriv},
  5. log::BuilderExt,
  6. Result,
  7. };
  8. use btlib_tests::TEST_STORE;
  9. use btproto::protocol;
  10. use btserde::to_vec;
  11. use bttp::{BlockAddr, Transmitter};
  12. use ctor::ctor;
  13. use lazy_static::lazy_static;
  14. use serde::{Deserialize, Serialize};
  15. use std::{
  16. future::{ready, Future, Ready},
  17. net::{IpAddr, Ipv4Addr},
  18. sync::{
  19. atomic::{AtomicU8, Ordering},
  20. Arc,
  21. },
  22. time::{Duration, Instant},
  23. };
  24. use tokio::{runtime::Builder, sync::mpsc};
  25. use uuid::Uuid;
  26. const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
  27. lazy_static! {
  28. static ref RUNTIME_CREDS: Arc<ConcreteCreds> = TEST_STORE.node_creds().unwrap();
  29. }
  30. declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
  31. lazy_static! {
  32. /// A tokio async runtime.
  33. ///
  34. /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
  35. /// is created for each test
  36. /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
  37. /// This creates a problem, because the first test thread to access the `RUNTIME` static
  38. /// will initialize its `Receiver` in its runtime, which will stop running at the end of
  39. /// the test. Hence subsequent tests will not be able to send remote messages to this
  40. /// `Runtime`.
  41. ///
  42. /// By creating a single async runtime which is used by all of the tests, we can avoid this
  43. /// problem.
  44. static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread()
  45. .enable_all()
  46. .build()
  47. .unwrap();
  48. }
  49. /// The log level to use when running tests.
  50. const LOG_LEVEL: &str = "warn";
  51. #[ctor]
  52. fn ctor() {
  53. std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
  54. env_logger::Builder::from_default_env().btformat().init();
  55. }
  56. #[derive(Serialize, Deserialize)]
  57. struct EchoMsg(String);
  58. impl CallMsg for EchoMsg {
  59. type Reply = EchoMsg;
  60. }
  61. async fn echo(
  62. _rt: &'static Runtime,
  63. mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
  64. _act_id: Uuid,
  65. ) {
  66. while let Some(envelope) = mailbox.recv().await {
  67. let (msg, replier, ..) = envelope.split();
  68. if let Some(replier) = replier {
  69. if let Err(_) = replier.send(msg) {
  70. panic!("failed to send reply");
  71. }
  72. }
  73. }
  74. }
  75. #[test]
  76. fn local_call() {
  77. ASYNC_RT.block_on(async {
  78. const EXPECTED: &str = "hello";
  79. let name = RUNTIME.activate(echo).await;
  80. let from = ActorName::new(name.path().clone(), Uuid::default());
  81. let reply = RUNTIME
  82. .call(name.clone(), from, EchoMsg(EXPECTED.into()))
  83. .await
  84. .unwrap();
  85. assert_eq!(EXPECTED, reply.0);
  86. RUNTIME.take(&name).await.unwrap();
  87. })
  88. }
  89. #[test]
  90. fn remote_call() {
  91. ASYNC_RT.block_on(async {
  92. const EXPECTED: &str = "hello";
  93. let actor_name = RUNTIME.activate(echo).await;
  94. let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
  95. let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
  96. let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
  97. .await
  98. .unwrap();
  99. let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
  100. let wire_msg = WireMsg::new(
  101. actor_name.clone(),
  102. RUNTIME.actor_name(Uuid::default()),
  103. &buf,
  104. );
  105. let reply = transmitter
  106. .call(wire_msg, ReplyCallback::<EchoMsg>::new())
  107. .await
  108. .unwrap()
  109. .unwrap();
  110. assert_eq!(EXPECTED, reply.0);
  111. RUNTIME.take(&actor_name).await.unwrap();
  112. });
  113. }
  114. /// Tests the `num_running` method.
  115. ///
  116. /// This test uses its own runtime and so can use the `#[tokio::test]` attribute.
  117. #[tokio::test]
  118. async fn num_running() {
  119. declare_runtime!(
  120. LOCAL_RT,
  121. // This needs to be different from the address where `RUNTIME` is listening.
  122. IpAddr::from([127, 0, 0, 2]),
  123. TEST_STORE.node_creds().unwrap()
  124. );
  125. assert_eq!(0, LOCAL_RT.num_running().await);
  126. let name = LOCAL_RT.activate(echo).await;
  127. assert_eq!(1, LOCAL_RT.num_running().await);
  128. LOCAL_RT.take(&name).await.unwrap();
  129. assert_eq!(0, LOCAL_RT.num_running().await);
  130. }
  131. mod ping_pong {
  132. use super::*;
  133. // The following code is a proof-of-concept for what types should be generated for a
  134. // simple ping-pong protocol:
  135. protocol! {
  136. named PingPongProtocol;
  137. let server = [Server];
  138. let client = [Client];
  139. Client -> End, >service(Server)!Ping;
  140. Server?Ping -> End, >Client!Ping::Reply;
  141. }
  142. //
  143. // In words, the protocol is described as follows.
  144. // 1. The ClientInit state receives the Activate message. It returns the SentPing state and a
  145. // Ping message to be sent to the Listening state.
  146. // 2. The ServerInit state receives the Activate message. It returns the Listening state.
  147. // 3. When the Listening state receives the Ping message it returns the End state and a
  148. // Ping::Reply message to be sent to the SentPing state.
  149. // 4. When the SentPing state receives the Ping::Reply message it returns the End state.
  150. //
  151. // The End state represents an end to the session described by the protocol. When an actor
  152. // transitions to the End state its function returns.
  153. // The generated actor implementation is the sender of the Activate message.
  154. // When a state is expecting a Reply message, an error occurs if the message is not received
  155. // in a timely manner.
  156. #[derive(Serialize, Deserialize)]
  157. pub struct Ping;
  158. impl CallMsg for Ping {
  159. type Reply = PingReply;
  160. }
  161. // I was tempted to name this "Pong", but the proc macro wouldn't think to do that.
  162. #[derive(Serialize, Deserialize)]
  163. pub struct PingReply;
  164. trait ClientInit2 {
  165. type AfterActivate: SentPing2;
  166. type HandleActivateFut: Future<Output = Result<(Self::AfterActivate, Ping)>>;
  167. fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
  168. }
  169. trait ServerInit2 {
  170. type AfterActivate: Listening2;
  171. type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
  172. fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
  173. }
  174. trait Listening2 {
  175. type HandlePingFut: Future<Output = Result<(End, PingReply)>>;
  176. fn handle_ping(self, msg: Ping) -> Self::HandlePingFut;
  177. }
  178. trait SentPing2 {
  179. type HandleReplyFut: Future<Output = Result<End>>;
  180. fn handle_reply(self, msg: PingReply) -> Self::HandleReplyFut;
  181. }
  182. #[derive(Serialize, Deserialize)]
  183. enum PingProtocolMsg {
  184. Ping(Ping),
  185. PingReply(PingReply),
  186. }
  187. impl CallMsg for PingProtocolMsg {
  188. type Reply = PingProtocolMsg;
  189. }
  190. impl SendMsg for PingProtocolMsg {}
  191. struct ClientInitState;
  192. impl ClientInit2 for ClientInitState {
  193. type AfterActivate = ClientState;
  194. type HandleActivateFut = impl Future<Output = Result<(Self::AfterActivate, Ping)>>;
  195. fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
  196. ready(Ok((ClientState, Ping)))
  197. }
  198. }
  199. struct ClientState;
  200. impl SentPing2 for ClientState {
  201. type HandleReplyFut = Ready<Result<End>>;
  202. fn handle_reply(self, _msg: PingReply) -> Self::HandleReplyFut {
  203. ready(Ok(End))
  204. }
  205. }
  206. #[allow(dead_code)]
  207. enum PingClientState {
  208. Init(ClientInitState),
  209. SentPing(ClientState),
  210. End(End),
  211. }
  212. struct ServerInitState;
  213. struct ServerState;
  214. impl ServerInit2 for ServerInitState {
  215. type AfterActivate = ServerState;
  216. type HandleActivateFut = Ready<Result<Self::AfterActivate>>;
  217. fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
  218. ready(Ok(ServerState))
  219. }
  220. }
  221. impl Listening2 for ServerState {
  222. type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
  223. fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
  224. ready(Ok((End, PingReply)))
  225. }
  226. }
  227. #[allow(dead_code)]
  228. enum PingServerState {
  229. ServerInit(ServerInitState),
  230. Listening(ServerState),
  231. End(End),
  232. }
  233. async fn ping_server(
  234. counter: Arc<AtomicU8>,
  235. rt: &'static Runtime,
  236. mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
  237. act_id: Uuid,
  238. ) {
  239. let mut state = {
  240. let init = ServerInitState;
  241. let state = init
  242. .handle_activate(Activate::new(rt, act_id))
  243. .await
  244. .unwrap();
  245. PingServerState::Listening(state)
  246. };
  247. while let Some(envelope) = mailbox.recv().await {
  248. let (msg, replier, _from) = envelope.split();
  249. match (state, msg) {
  250. (PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => {
  251. let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
  252. state = PingServerState::End(new_state);
  253. if let Err(_) = replier.unwrap().send(PingProtocolMsg::PingReply(reply)) {
  254. panic!("Failed to send Ping reply.");
  255. }
  256. }
  257. (_prev_state, _) => {
  258. panic!("Ping protocol violation.");
  259. // A real implementation should assign the previous state and log the error.
  260. // state = prev_state;
  261. }
  262. }
  263. if let PingServerState::End(_) = state {
  264. break;
  265. }
  266. }
  267. counter.fetch_sub(1, Ordering::SeqCst);
  268. }
  269. async fn ping_client(
  270. counter: Arc<AtomicU8>,
  271. server_name: ActorName,
  272. rt: &'static Runtime,
  273. _mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
  274. act_id: Uuid,
  275. ) {
  276. let init = ClientInitState;
  277. let (state, msg) = init
  278. .handle_activate(Activate::new(rt, act_id))
  279. .await
  280. .unwrap();
  281. let from = rt.actor_name(act_id);
  282. let reply = rt
  283. .call(server_name, from, PingProtocolMsg::Ping(msg))
  284. .await
  285. .unwrap();
  286. if let PingProtocolMsg::PingReply(msg) = reply {
  287. state.handle_reply(msg).await.unwrap();
  288. } else {
  289. panic!("Incorrect message type sent in reply to Ping.");
  290. }
  291. counter.fetch_sub(1, Ordering::SeqCst);
  292. }
  293. #[test]
  294. fn ping_pong_test() {
  295. ASYNC_RT.block_on(async {
  296. let counter = Arc::new(AtomicU8::new(2));
  297. let server_name = {
  298. let counter = counter.clone();
  299. RUNTIME
  300. .activate(move |rt, mailbox, act_id| ping_server(counter, rt, mailbox, act_id))
  301. .await
  302. };
  303. let client_name = {
  304. let server_name = server_name.clone();
  305. let counter = counter.clone();
  306. RUNTIME
  307. .activate(move |rt, mailbox, act_id| {
  308. ping_client(counter, server_name, rt, mailbox, act_id)
  309. })
  310. .await
  311. };
  312. let deadline = Instant::now() + Duration::from_millis(500);
  313. while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline {
  314. tokio::time::sleep(Duration::from_millis(20)).await;
  315. }
  316. // Check that both tasks finished successfully and we didn't just timeout.
  317. assert_eq!(0, counter.load(Ordering::SeqCst));
  318. // TODO: Should actor which return be removed from the runtime automatically?
  319. RUNTIME.take(&server_name).await.unwrap();
  320. RUNTIME.take(&client_name).await.unwrap();
  321. });
  322. }
  323. }
  324. mod travel_agency {
  325. use super::*;
  326. // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
  327. // example in the survey paper "Behavioral Types in Programming Languages."
  328. // Note that the Choosing state can send messages at any time, not just in response to another
  329. // message because there is a transition from Choosing that doesn't use the receive operator
  330. // (`?`).
  331. protocol! {
  332. named TravelAgency;
  333. let agency = [Listening];
  334. let customer = [Choosing];
  335. Choosing -> Choosing, >service(Listening)!Query;
  336. Choosing -> Choosing, >service(Listening)!Accept;
  337. Choosing -> Choosing, >service(Listening)!Reject;
  338. Listening?Query -> Listening, >Choosing!Query::Reply;
  339. Choosing?Query::Reply -> Choosing;
  340. Listening?Accept -> End, >Choosing!Accept::Reply;
  341. Choosing?Accept::Reply -> End;
  342. Listening?Reject -> End, >Choosing!Reject::Reply;
  343. Choosing?Reject::Reply -> End;
  344. }
  345. #[derive(Serialize, Deserialize)]
  346. pub struct Query;
  347. impl CallMsg for Query {
  348. type Reply = ();
  349. }
  350. #[derive(Serialize, Deserialize)]
  351. pub struct Reject;
  352. impl CallMsg for Reject {
  353. type Reply = ();
  354. }
  355. #[derive(Serialize, Deserialize)]
  356. pub struct Accept;
  357. impl CallMsg for Accept {
  358. type Reply = ();
  359. }
  360. }