runtime_tests.rs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. #![feature(impl_trait_in_assoc_type)]
  2. use btrun::*;
  3. use btlib::{
  4. crypto::{ConcreteCreds, CredStore, CredsPriv},
  5. log::BuilderExt,
  6. Result,
  7. };
  8. use btlib_tests::TEST_STORE;
  9. use btproto::protocol;
  10. use btserde::to_vec;
  11. use bttp::{BlockAddr, Transmitter};
  12. use ctor::ctor;
  13. use lazy_static::lazy_static;
  14. use log;
  15. use serde::{Deserialize, Serialize};
  16. use std::{
  17. future::{ready, Future, Ready},
  18. net::{IpAddr, Ipv4Addr},
  19. sync::{
  20. atomic::{AtomicU8, Ordering},
  21. Arc,
  22. },
  23. time::{Duration, Instant},
  24. };
  25. use tokio::{runtime::Builder, sync::mpsc};
  26. use uuid::Uuid;
  27. const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
  28. lazy_static! {
  29. static ref RUNTIME_CREDS: Arc<ConcreteCreds> = TEST_STORE.node_creds().unwrap();
  30. }
  31. declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
  32. lazy_static! {
  33. /// A tokio async runtime.
  34. ///
  35. /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
  36. /// is created for each test
  37. /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
  38. /// This creates a problem, because the first test thread to access the `RUNTIME` static
  39. /// will initialize its `Receiver` in its runtime, which will stop running at the end of
  40. /// the test. Hence subsequent tests will not be able to send remote messages to this
  41. /// `Runtime`.
  42. ///
  43. /// By creating a single async runtime which is used by all of the tests, we can avoid this
  44. /// problem.
  45. static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread()
  46. .enable_all()
  47. .build()
  48. .unwrap();
  49. }
  50. /// The log level to use when running tests.
  51. const LOG_LEVEL: &str = "warn";
  52. #[ctor]
  53. fn ctor() {
  54. std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
  55. env_logger::Builder::from_default_env().btformat().init();
  56. }
  57. #[derive(Serialize, Deserialize)]
  58. struct EchoMsg(String);
  59. impl CallMsg for EchoMsg {
  60. type Reply = EchoMsg;
  61. }
  62. async fn echo(
  63. _rt: &'static Runtime,
  64. mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
  65. _act_id: Uuid,
  66. ) {
  67. while let Some(envelope) = mailbox.recv().await {
  68. let (msg, replier, ..) = envelope.split();
  69. if let Some(replier) = replier {
  70. if let Err(_) = replier.send(msg) {
  71. panic!("failed to send reply");
  72. }
  73. }
  74. }
  75. }
  76. #[test]
  77. fn local_call() {
  78. ASYNC_RT.block_on(async {
  79. const EXPECTED: &str = "hello";
  80. let name = RUNTIME.activate(echo).await;
  81. let from = ActorName::new(name.path().clone(), Uuid::default());
  82. let reply = RUNTIME
  83. .call(name.clone(), from, EchoMsg(EXPECTED.into()))
  84. .await
  85. .unwrap();
  86. assert_eq!(EXPECTED, reply.0);
  87. RUNTIME.take(&name).await.unwrap();
  88. })
  89. }
  90. #[test]
  91. fn remote_call() {
  92. ASYNC_RT.block_on(async {
  93. const EXPECTED: &str = "hello";
  94. let actor_name = RUNTIME.activate(echo).await;
  95. let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
  96. let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
  97. let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
  98. .await
  99. .unwrap();
  100. let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
  101. let wire_msg = WireMsg::new(
  102. actor_name.clone(),
  103. RUNTIME.actor_name(Uuid::default()),
  104. &buf,
  105. );
  106. let reply = transmitter
  107. .call(wire_msg, ReplyCallback::<EchoMsg>::new())
  108. .await
  109. .unwrap()
  110. .unwrap();
  111. assert_eq!(EXPECTED, reply.0);
  112. RUNTIME.take(&actor_name).await.unwrap();
  113. });
  114. }
  115. /// Tests the `num_running` method.
  116. ///
  117. /// This test uses its own runtime and so can use the `#[tokio::test]` attribute.
  118. #[tokio::test]
  119. async fn num_running() {
  120. declare_runtime!(
  121. LOCAL_RT,
  122. // This needs to be different from the address where `RUNTIME` is listening.
  123. IpAddr::from([127, 0, 0, 2]),
  124. TEST_STORE.node_creds().unwrap()
  125. );
  126. assert_eq!(0, LOCAL_RT.num_running().await);
  127. let name = LOCAL_RT.activate(echo).await;
  128. assert_eq!(1, LOCAL_RT.num_running().await);
  129. LOCAL_RT.take(&name).await.unwrap();
  130. assert_eq!(0, LOCAL_RT.num_running().await);
  131. }
  132. mod ping_pong {
  133. use super::*;
  134. // The following code is a proof-of-concept for what types should be generated for a
  135. // simple ping-pong protocol:
  136. protocol! {
  137. named PingPongProtocol;
  138. let server = [Server];
  139. let client = [Client];
  140. Client -> End, >service(Server)!Ping;
  141. Server?Ping -> End, >Client!Ping::Reply;
  142. }
  143. //
  144. // In words, the protocol is described as follows.
  145. // 1. The ClientInit state receives the Activate message. It returns the SentPing state and a
  146. // Ping message to be sent to the Listening state.
  147. // 2. The ServerInit state receives the Activate message. It returns the Listening state.
  148. // 3. When the Listening state receives the Ping message it returns the End state and a
  149. // Ping::Reply message to be sent to the SentPing state.
  150. // 4. When the SentPing state receives the Ping::Reply message it returns the End state.
  151. //
  152. // The End state represents an end to the session described by the protocol. When an actor
  153. // transitions to the End state its function returns.
  154. // The generated actor implementation is the sender of the Activate message.
  155. // When a state is expecting a Reply message, an error occurs if the message is not received
  156. // in a timely manner.
  157. #[derive(Serialize, Deserialize)]
  158. pub struct Ping;
  159. impl CallMsg for Ping {
  160. type Reply = PingReply;
  161. }
  162. // I was tempted to name this "Pong", but the proc macro wouldn't think to do that.
  163. #[derive(Serialize, Deserialize)]
  164. pub struct PingReply;
  165. trait ClientInit2 {
  166. type AfterActivate: SentPing2;
  167. type HandleActivateFut: Future<Output = Result<(Self::AfterActivate, Ping)>>;
  168. fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
  169. }
  170. trait ServerInit2 {
  171. type AfterActivate: Listening2;
  172. type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
  173. fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
  174. }
  175. trait Listening2 {
  176. type HandlePingFut: Future<Output = Result<(End, PingReply)>>;
  177. fn handle_ping(self, msg: Ping) -> Self::HandlePingFut;
  178. }
  179. trait SentPing2 {
  180. type HandleReplyFut: Future<Output = Result<End>>;
  181. fn handle_reply(self, msg: PingReply) -> Self::HandleReplyFut;
  182. }
  183. #[derive(Serialize, Deserialize)]
  184. enum PingProtocolMsg {
  185. Ping(Ping),
  186. PingReply(PingReply),
  187. }
  188. impl CallMsg for PingProtocolMsg {
  189. type Reply = PingProtocolMsg;
  190. }
  191. impl SendMsg for PingProtocolMsg {}
  192. struct ClientInitState;
  193. impl ClientInit2 for ClientInitState {
  194. type AfterActivate = ClientState;
  195. type HandleActivateFut = impl Future<Output = Result<(Self::AfterActivate, Ping)>>;
  196. fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
  197. ready(Ok((ClientState, Ping)))
  198. }
  199. }
  200. struct ClientState;
  201. impl SentPing2 for ClientState {
  202. type HandleReplyFut = Ready<Result<End>>;
  203. fn handle_reply(self, _msg: PingReply) -> Self::HandleReplyFut {
  204. ready(Ok(End))
  205. }
  206. }
  207. #[allow(dead_code)]
  208. enum PingClientState {
  209. Init(ClientInitState),
  210. SentPing(ClientState),
  211. End(End),
  212. }
  213. struct ServerInitState;
  214. struct ServerState;
  215. impl ServerInit2 for ServerInitState {
  216. type AfterActivate = ServerState;
  217. type HandleActivateFut = Ready<Result<Self::AfterActivate>>;
  218. fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
  219. ready(Ok(ServerState))
  220. }
  221. }
  222. impl Listening2 for ServerState {
  223. type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
  224. fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
  225. ready(Ok((End, PingReply)))
  226. }
  227. }
  228. #[allow(dead_code)]
  229. enum PingServerState {
  230. ServerInit(ServerInitState),
  231. Listening(ServerState),
  232. End(End),
  233. }
  234. async fn ping_server(
  235. counter: Arc<AtomicU8>,
  236. rt: &'static Runtime,
  237. mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
  238. act_id: Uuid,
  239. ) {
  240. let mut state = {
  241. let init = ServerInitState;
  242. let state = init
  243. .handle_activate(Activate::new(rt, act_id))
  244. .await
  245. .unwrap();
  246. PingServerState::Listening(state)
  247. };
  248. while let Some(envelope) = mailbox.recv().await {
  249. let (msg, replier, _from) = envelope.split();
  250. match (state, msg) {
  251. (PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => {
  252. let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
  253. state = PingServerState::End(new_state);
  254. if let Err(_) = replier.unwrap().send(PingProtocolMsg::PingReply(reply)) {
  255. panic!("Failed to send Ping reply.");
  256. }
  257. }
  258. (_prev_state, _) => {
  259. panic!("Ping protocol violation.");
  260. // A real implementation should assign the previous state and log the error.
  261. // state = prev_state;
  262. }
  263. }
  264. if let PingServerState::End(_) = state {
  265. break;
  266. }
  267. }
  268. counter.fetch_sub(1, Ordering::SeqCst);
  269. }
  270. async fn ping_client(
  271. counter: Arc<AtomicU8>,
  272. server_name: ActorName,
  273. rt: &'static Runtime,
  274. _mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
  275. act_id: Uuid,
  276. ) {
  277. let init = ClientInitState;
  278. let (state, msg) = init
  279. .handle_activate(Activate::new(rt, act_id))
  280. .await
  281. .unwrap();
  282. let from = rt.actor_name(act_id);
  283. let reply = rt
  284. .call(server_name, from, PingProtocolMsg::Ping(msg))
  285. .await
  286. .unwrap();
  287. if let PingProtocolMsg::PingReply(msg) = reply {
  288. state.handle_reply(msg).await.unwrap();
  289. } else {
  290. panic!("Incorrect message type sent in reply to Ping.");
  291. }
  292. counter.fetch_sub(1, Ordering::SeqCst);
  293. }
  294. #[test]
  295. fn ping_pong_test() {
  296. ASYNC_RT.block_on(async {
  297. let counter = Arc::new(AtomicU8::new(2));
  298. let server_name = {
  299. let counter = counter.clone();
  300. RUNTIME
  301. .activate(move |rt, mailbox, act_id| ping_server(counter, rt, mailbox, act_id))
  302. .await
  303. };
  304. let client_name = {
  305. let server_name = server_name.clone();
  306. let counter = counter.clone();
  307. RUNTIME
  308. .activate(move |rt, mailbox, act_id| {
  309. ping_client(counter, server_name, rt, mailbox, act_id)
  310. })
  311. .await
  312. };
  313. let deadline = Instant::now() + Duration::from_millis(500);
  314. while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline {
  315. tokio::time::sleep(Duration::from_millis(20)).await;
  316. }
  317. // Check that both tasks finished successfully and we didn't just timeout.
  318. assert_eq!(0, counter.load(Ordering::SeqCst));
  319. // TODO: Should actor which return be removed from the runtime automatically?
  320. RUNTIME.take(&server_name).await.unwrap();
  321. RUNTIME.take(&client_name).await.unwrap();
  322. });
  323. }
  324. }
  325. mod travel_agency {
  326. use super::*;
  327. // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
  328. // example in the survey paper "Behavioral Types in Programming Languages."
  329. // Note that the Choosing state can send messages at any time, not just in response to another
  330. // message because there is a transition from Choosing that doesn't use the receive operator
  331. // (`?`).
  332. protocol! {
  333. named TravelAgency;
  334. let agency = [Listening];
  335. let customer = [Choosing];
  336. Choosing -> Choosing, >service(Listening)!Query;
  337. Choosing -> Choosing, >service(Listening)!Accept;
  338. Choosing -> Choosing, >service(Listening)!Reject;
  339. Listening?Query -> Listening, >Choosing!Query::Reply;
  340. Choosing?Query::Reply -> Choosing;
  341. Listening?Accept -> End, >Choosing!Accept::Reply;
  342. Choosing?Accept::Reply -> End;
  343. Listening?Reject -> End, >Choosing!Reject::Reply;
  344. Choosing?Reject::Reply -> End;
  345. }
  346. #[derive(Serialize, Deserialize)]
  347. pub struct Query;
  348. impl CallMsg for Query {
  349. type Reply = ();
  350. }
  351. #[derive(Serialize, Deserialize)]
  352. pub struct Reject;
  353. impl CallMsg for Reject {
  354. type Reply = ();
  355. }
  356. #[derive(Serialize, Deserialize)]
  357. pub struct Accept;
  358. impl CallMsg for Accept {
  359. type Reply = ();
  360. }
  361. }
  362. #[allow(dead_code)]
  363. mod client_callback {
  364. use super::*;
  365. #[derive(Serialize, Deserialize)]
  366. pub struct Register;
  367. #[derive(Serialize, Deserialize)]
  368. pub struct Completed {
  369. value: usize,
  370. }
  371. protocol! {
  372. named ClientCallback;
  373. let server = [Listening];
  374. let worker = [Working];
  375. let client = [Unregistered, Registered];
  376. Unregistered -> Registered, >service(Listening)!Register[Registered];
  377. Listening?Register[Registered] -> Listening, Working[Registered];
  378. Working[Registered] -> End, >Registered!Completed;
  379. Registered?Completed -> End;
  380. }
  381. struct UnregisteredState {
  382. factor: usize,
  383. }
  384. impl Unregistered for UnregisteredState {
  385. type OnSendRegisterRegistered = RegisteredState;
  386. type OnSendRegisterFut = Ready<Result<Self::OnSendRegisterRegistered>>;
  387. fn on_send_register(self, _arg: &mut Register) -> Self::OnSendRegisterFut {
  388. ready(Ok(RegisteredState {
  389. factor: self.factor,
  390. result: None,
  391. }))
  392. }
  393. }
  394. struct RegisteredState {
  395. factor: usize,
  396. result: Option<usize>,
  397. }
  398. impl Registered for RegisteredState {
  399. type HandleCompletedFut = Ready<Result<End>>;
  400. fn handle_completed(mut self, arg: Completed) -> Self::HandleCompletedFut {
  401. self.result = Some(self.factor * arg.value);
  402. ready(Ok(End))
  403. }
  404. }
  405. struct ListeningState;
  406. impl Listening for ListeningState {
  407. type HandleRegisterListening = ListeningState;
  408. type HandleRegisterWorking = WorkingState;
  409. type HandleRegisterFut = Ready<Result<(ListeningState, WorkingState)>>;
  410. fn handle_register(self, _arg: Register) -> Self::HandleRegisterFut {
  411. ready(Ok((self, WorkingState)))
  412. }
  413. }
  414. struct WorkingState;
  415. impl Working for WorkingState {
  416. type OnSendCompletedFut = Ready<Result<(End, Completed)>>;
  417. fn on_send_completed(self) -> Self::OnSendCompletedFut {
  418. ready(Ok((End, Completed { value: 42 })))
  419. }
  420. }
  421. use ::tokio::sync::Mutex;
  422. enum ClientState<Init: Unregistered> {
  423. Unregistered(Init),
  424. Registered(Init::OnSendRegisterRegistered),
  425. End(End),
  426. }
  427. impl<Init: Unregistered> ClientState<Init> {
  428. pub fn name(&self) -> &'static str {
  429. match self {
  430. Self::Unregistered(_) => "Unregistered",
  431. Self::Registered(_) => "Registered",
  432. Self::End(_) => "End",
  433. }
  434. }
  435. }
  436. struct ClientHandle<Init: Unregistered> {
  437. runtime: &'static Runtime,
  438. state: Arc<Mutex<Option<ClientState<Init>>>>,
  439. name: ActorName,
  440. }
  441. impl<Init: Unregistered> ClientHandle<Init> {
  442. async fn send_register(&self, to: ServiceName, mut msg: Register) -> Result<()> {
  443. let mut guard = self.state.lock().await;
  444. let state = guard
  445. .take()
  446. .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
  447. let new_state = match state {
  448. ClientState::Unregistered(state) => {
  449. let new_state = state.on_send_register(&mut msg).await?;
  450. let msg = ClientCallbackMsgs::Register(msg);
  451. self.runtime
  452. .send_service(to, self.name.clone(), msg)
  453. .await?;
  454. // QUESTION: Should `on_send_register` be required to return the previous state
  455. // if it encounters an error?
  456. ClientState::Registered(new_state)
  457. }
  458. state => state,
  459. };
  460. *guard = Some(new_state);
  461. Ok(())
  462. }
  463. }
  464. async fn start_client<Init>(init: Init, runtime: &'static Runtime) -> ClientHandle<Init>
  465. where
  466. Init: 'static + Unregistered,
  467. {
  468. let state = Arc::new(Mutex::new(Some(ClientState::Unregistered(init))));
  469. let name = {
  470. let state = state.clone();
  471. runtime.activate(move |_, mut mailbox, _act_id| async move {
  472. while let Some(envelope) = mailbox.recv().await {
  473. let mut guard = state.lock().await;
  474. let state = guard.take()
  475. .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
  476. let (msg, _replier, _from) = envelope.split();
  477. let new_state = match (state, msg) {
  478. (ClientState::Registered(curr_state), ClientCallbackMsgs::Completed(msg)) => {
  479. match curr_state.handle_completed(msg).await {
  480. Ok(next) => ClientState::<Init>::End(next),
  481. Err(err) => {
  482. log::error!("Failed to handle 'Completed' message in 'Registered' state: {err}");
  483. panic!("We can't transition to a new state because we gave ownership away and that method failed!")
  484. }
  485. }
  486. }
  487. (state, msg) => {
  488. log::error!("Unexpected message '{}' in state '{}'.", msg.name(), state.name());
  489. state
  490. }
  491. };
  492. *guard = Some(new_state);
  493. }
  494. }).await
  495. };
  496. ClientHandle {
  497. runtime,
  498. state,
  499. name,
  500. }
  501. }
  502. async fn start_server<Init>(init: Init, runtime: &'static Runtime) -> ActorName
  503. where
  504. Init: 'static + Listening<HandleRegisterListening = Init>,
  505. {
  506. enum ServerState<S: Listening> {
  507. Listening(S),
  508. }
  509. impl<S: Listening> ServerState<S> {
  510. fn name(&self) -> &'static str {
  511. match self {
  512. Self::Listening(_) => "Listening",
  513. }
  514. }
  515. }
  516. runtime
  517. .activate(move |_, mut mailbox, _act_id| async move {
  518. let mut state = ServerState::Listening(init);
  519. while let Some(envelope) = mailbox.recv().await {
  520. let (msg, _replier, from) = envelope.split();
  521. let new_state = match (state, msg) {
  522. (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
  523. match curr_state.handle_register(msg).await {
  524. Ok((new_state, working_state)) => {
  525. start_worker(working_state, from, runtime).await;
  526. ServerState::Listening(new_state)
  527. }
  528. Err(err) => {
  529. log::error!("Failed to handle the Register message: {err}");
  530. todo!("Need to recover the previous state from err.")
  531. }
  532. }
  533. }
  534. (state, msg) => {
  535. log::error!(
  536. "Unexpected message '{}' in state '{}'.",
  537. msg.name(),
  538. state.name()
  539. );
  540. state
  541. }
  542. };
  543. state = new_state;
  544. }
  545. })
  546. .await
  547. }
  548. async fn start_worker<Init>(
  549. init: Init,
  550. owned: ActorName,
  551. runtime: &'static Runtime,
  552. ) -> ActorName
  553. where
  554. Init: 'static + Working,
  555. {
  556. enum WorkerState<S: Working> {
  557. Working(S),
  558. }
  559. runtime
  560. .activate::<ClientCallbackMsgs, _, _>(move |_, _, act_id| async move {
  561. let msg = match init.on_send_completed().await {
  562. Ok((End, msg)) => msg,
  563. Err(err) => {
  564. log::error!("Failed to send Completed message: {err}");
  565. return;
  566. }
  567. };
  568. let from = runtime.actor_name(act_id);
  569. let msg = ClientCallbackMsgs::Completed(msg);
  570. if let Err(err) = runtime.send(owned, from, msg).await {
  571. log::error!("Failed to send Completed message: {err}");
  572. }
  573. })
  574. .await
  575. }
  576. }