runtime_tests.rs 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673
  1. #![feature(impl_trait_in_assoc_type)]
  2. use btrun::model::*;
  3. use btrun::test_setup;
  4. use btrun::*;
  5. use btlib::Result;
  6. use btproto::protocol;
  7. use lazy_static::lazy_static;
  8. use log;
  9. use serde::{Deserialize, Serialize};
  10. use std::{
  11. future::{ready, Future, Ready},
  12. sync::{
  13. atomic::{AtomicU8, Ordering},
  14. Arc,
  15. },
  16. };
  17. test_setup!();
  18. mod ping_pong {
  19. use super::*;
  20. use btlib::bterr;
  21. // The following code is a proof-of-concept for what types should be generated for a
  22. // simple ping-pong protocol:
  23. protocol! {
  24. named PingProtocol;
  25. let server = [Server];
  26. let client = [Client];
  27. Client -> End, >service(Server)!Ping;
  28. Server?Ping -> End, >Client!Ping::Reply;
  29. }
  30. //
  31. // In words, the protocol is described as follows.
  32. // 1. When the Listening state receives the Ping message it returns the End state and a
  33. // Ping::Reply message to be sent to the SentPing state.
  34. // 2. When the SentPing state receives the Ping::Reply message it returns the End state.
  35. //
  36. // The End state represents an end to the session described by the protocol. When an actor
  37. // transitions to the End state its function returns.
  38. // When a state is expecting a Reply message, an error occurs if the message is not received
  39. // in a timely manner.
  40. enum PingClientState<T: Client> {
  41. Client(T),
  42. End(End),
  43. }
  44. impl<T: Client> PingClientState<T> {
  45. const fn name(&self) -> &'static str {
  46. match self {
  47. Self::Client(_) => "Client",
  48. Self::End(_) => "End",
  49. }
  50. }
  51. }
  52. struct ClientHandle<T: Client> {
  53. state: Option<PingClientState<T>>,
  54. runtime: &'static Runtime,
  55. }
  56. impl<T: Client> ClientHandle<T> {
  57. async fn send_ping(&mut self, mut msg: Ping, service: ServiceAddr) -> Result<PingReply> {
  58. let state = self
  59. .state
  60. .take()
  61. .ok_or_else(|| bterr!("State was not returned."))?;
  62. let (new_state, result) = match state {
  63. PingClientState::Client(state) => match state.on_send_ping(&mut msg).await {
  64. TransResult::Ok((new_state, _)) => {
  65. let new_state = PingClientState::End(new_state);
  66. let result = self
  67. .runtime
  68. .call_service(service, PingProtocolMsgs::Ping(msg))
  69. .await;
  70. (new_state, result)
  71. }
  72. TransResult::Abort { from, err } => {
  73. let new_state = PingClientState::Client(from);
  74. (new_state, Err(err))
  75. }
  76. TransResult::Fatal { err } => return Err(err),
  77. },
  78. state => {
  79. let result = Err(bterr!("Can't send Ping in state {}.", state.name()));
  80. (state, result)
  81. }
  82. };
  83. self.state = Some(new_state);
  84. let reply = result?;
  85. match reply {
  86. PingProtocolMsgs::PingReply(reply) => Ok(reply),
  87. msg => Err(bterr!(
  88. "Unexpected message type sent in reply: {}",
  89. msg.name()
  90. )),
  91. }
  92. }
  93. }
  94. async fn spawn_client<T: Client>(init: T, runtime: &'static Runtime) -> ClientHandle<T> {
  95. let state = Some(PingClientState::Client(init));
  96. ClientHandle { state, runtime }
  97. }
  98. async fn register_server<Init, F>(
  99. make_init: F,
  100. rt: &'static Runtime,
  101. id: ServiceId,
  102. ) -> Result<ServiceName>
  103. where
  104. Init: 'static + Server,
  105. F: 'static + Send + Sync + Clone + Fn() -> Init,
  106. {
  107. enum ServerState<S> {
  108. Server(S),
  109. End(End),
  110. }
  111. async fn server_loop<Init, F>(
  112. _runtime: &'static Runtime,
  113. make_init: F,
  114. mut mailbox: Mailbox<PingProtocolMsgs>,
  115. _act_id: ActorId,
  116. ) where
  117. Init: 'static + Server,
  118. F: 'static + Send + Sync + FnOnce() -> Init,
  119. {
  120. let mut state = ServerState::Server(make_init());
  121. while let Some(envelope) = mailbox.recv().await {
  122. let (msg, msg_kind) = envelope.split();
  123. state = match (state, msg) {
  124. (ServerState::Server(listening_state), PingProtocolMsgs::Ping(msg)) => {
  125. match listening_state.handle_ping(msg).await {
  126. TransResult::Ok((new_state, reply)) => match msg_kind {
  127. EnvelopeKind::Call { reply: replier } => {
  128. let replier =
  129. replier.expect("The reply has already been sent.");
  130. if let Err(_) = replier.send(PingProtocolMsgs::PingReply(reply))
  131. {
  132. panic!("Failed to send Ping reply.");
  133. }
  134. ServerState::End(new_state)
  135. }
  136. _ => panic!("'Ping' was expected to be a Call message."),
  137. },
  138. TransResult::Abort { from, err } => {
  139. log::warn!("Aborted transition from the {} while handling the {} message: {}", "Server", "Ping", err);
  140. ServerState::Server(from)
  141. }
  142. TransResult::Fatal { err } => {
  143. panic!("Fatal error while handling Ping message in Server state: {err}");
  144. }
  145. }
  146. }
  147. (state, _) => state,
  148. };
  149. if let ServerState::End(_) = state {
  150. break;
  151. }
  152. }
  153. }
  154. rt.register::<PingProtocolMsgs, _>(id, move |runtime| {
  155. let make_init = make_init.clone();
  156. let fut = async move {
  157. let actor_impl = runtime
  158. .spawn(move |_, mailbox, act_id| {
  159. server_loop(runtime, make_init, mailbox, act_id)
  160. })
  161. .await;
  162. Ok(actor_impl)
  163. };
  164. Box::pin(fut)
  165. })
  166. .await
  167. }
  168. #[derive(Serialize, Deserialize)]
  169. pub struct Ping;
  170. impl CallMsg for Ping {
  171. type Reply = PingReply;
  172. }
  173. #[derive(Serialize, Deserialize)]
  174. pub struct PingReply;
  175. struct ClientState {
  176. counter: Arc<AtomicU8>,
  177. }
  178. impl ClientState {
  179. fn new(counter: Arc<AtomicU8>) -> Self {
  180. counter.fetch_add(1, Ordering::SeqCst);
  181. Self { counter }
  182. }
  183. }
  184. impl Client for ClientState {
  185. fn actor_impl() -> String {
  186. "client".into()
  187. }
  188. type OnSendPingFut = impl Future<Output = TransResult<Self, (End, PingReply)>>;
  189. fn on_send_ping(self, _msg: &mut Ping) -> Self::OnSendPingFut {
  190. self.counter.fetch_sub(1, Ordering::SeqCst);
  191. ready(TransResult::Ok((End, PingReply)))
  192. }
  193. }
  194. struct ServerState {
  195. counter: Arc<AtomicU8>,
  196. }
  197. impl ServerState {
  198. fn new(counter: Arc<AtomicU8>) -> Self {
  199. counter.fetch_add(1, Ordering::SeqCst);
  200. Self { counter }
  201. }
  202. }
  203. impl Server for ServerState {
  204. fn actor_impl() -> String {
  205. "server".into()
  206. }
  207. type HandlePingFut = impl Future<Output = TransResult<Self, (End, PingReply)>>;
  208. fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
  209. self.counter.fetch_sub(1, Ordering::SeqCst);
  210. ready(TransResult::Ok((End, PingReply)))
  211. }
  212. }
  213. #[test]
  214. fn ping_pong_test() {
  215. ASYNC_RT.block_on(async {
  216. const SERVICE_ID: &str = "PingPongProtocolServer";
  217. let service_id = ServiceId::from(SERVICE_ID);
  218. let counter = Arc::new(AtomicU8::new(0));
  219. let service_name = {
  220. let service_counter = counter.clone();
  221. let make_init = move || {
  222. let server_counter = service_counter.clone();
  223. ServerState::new(server_counter)
  224. };
  225. register_server(make_init, &RUNTIME, service_id.clone())
  226. .await
  227. .unwrap()
  228. };
  229. let mut client_handle = spawn_client(ClientState::new(counter.clone()), &RUNTIME).await;
  230. let service_addr = ServiceAddr::new(service_name, true);
  231. client_handle.send_ping(Ping, service_addr).await.unwrap();
  232. assert_eq!(0, counter.load(Ordering::SeqCst));
  233. RUNTIME.deregister(&service_id, None).await.unwrap();
  234. });
  235. }
  236. }
  237. mod travel_agency {
  238. use super::*;
  239. // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
  240. // example in the survey paper "Behavioral Types in Programming Languages."
  241. // Note that the Choosing state can send messages at any time, not just in response to another
  242. // message because there is a transition from Choosing that doesn't use the receive operator
  243. // (`?`).
  244. protocol! {
  245. named TravelAgency;
  246. let agency = [Listening];
  247. let customer = [Choosing];
  248. Choosing -> Choosing, >service(Listening)!Query;
  249. Choosing -> Choosing, >service(Listening)!Accept;
  250. Choosing -> Choosing, >service(Listening)!Reject;
  251. Listening?Query -> Listening, >Choosing!Query::Reply;
  252. Choosing?Query::Reply -> Choosing;
  253. Listening?Accept -> End, >Choosing!Accept::Reply;
  254. Choosing?Accept::Reply -> End;
  255. Listening?Reject -> End, >Choosing!Reject::Reply;
  256. Choosing?Reject::Reply -> End;
  257. }
  258. #[derive(Serialize, Deserialize)]
  259. pub struct Query;
  260. impl CallMsg for Query {
  261. type Reply = ();
  262. }
  263. #[derive(Serialize, Deserialize)]
  264. pub struct Reject;
  265. impl CallMsg for Reject {
  266. type Reply = ();
  267. }
  268. #[derive(Serialize, Deserialize)]
  269. pub struct Accept;
  270. impl CallMsg for Accept {
  271. type Reply = ();
  272. }
  273. }
  274. #[allow(dead_code)]
  275. mod client_callback {
  276. use super::*;
  277. use std::{panic::panic_any, time::Duration};
  278. use tokio::{sync::oneshot, time::timeout};
  279. #[derive(Serialize, Deserialize)]
  280. pub struct Register {
  281. factor: usize,
  282. }
  283. #[derive(Serialize, Deserialize)]
  284. pub struct Completed {
  285. value: usize,
  286. }
  287. protocol! {
  288. named ClientCallback;
  289. let server = [Listening];
  290. let worker = [Working];
  291. let client = [Unregistered, Registered];
  292. Unregistered -> Registered, >service(Listening)!Register[Registered];
  293. Listening?Register[Registered] -> Listening, Working[Registered];
  294. Working[Registered] -> End, >Registered!Completed;
  295. Registered?Completed -> End;
  296. }
  297. struct UnregisteredState {
  298. sender: oneshot::Sender<usize>,
  299. }
  300. impl Unregistered for UnregisteredState {
  301. fn actor_impl() -> String {
  302. "client".into()
  303. }
  304. type OnSendRegisterRegistered = RegisteredState;
  305. type OnSendRegisterFut = Ready<TransResult<Self, Self::OnSendRegisterRegistered>>;
  306. fn on_send_register(self, _arg: &mut Register) -> Self::OnSendRegisterFut {
  307. ready(TransResult::Ok(RegisteredState {
  308. sender: self.sender,
  309. }))
  310. }
  311. }
  312. struct RegisteredState {
  313. sender: oneshot::Sender<usize>,
  314. }
  315. impl Registered for RegisteredState {
  316. type HandleCompletedFut = Ready<TransResult<Self, End>>;
  317. fn handle_completed(self, arg: Completed) -> Self::HandleCompletedFut {
  318. self.sender.send(arg.value).unwrap();
  319. ready(TransResult::Ok(End))
  320. }
  321. }
  322. struct ListeningState {
  323. multiple: usize,
  324. }
  325. impl Listening for ListeningState {
  326. fn actor_impl() -> String {
  327. "server".into()
  328. }
  329. type HandleRegisterListening = ListeningState;
  330. type HandleRegisterWorking = WorkingState;
  331. type HandleRegisterFut = Ready<TransResult<Self, (ListeningState, WorkingState)>>;
  332. fn handle_register(self, arg: Register) -> Self::HandleRegisterFut {
  333. let multiple = self.multiple;
  334. ready(TransResult::Ok((
  335. self,
  336. WorkingState {
  337. factor: arg.factor,
  338. multiple,
  339. },
  340. )))
  341. }
  342. }
  343. struct WorkingState {
  344. factor: usize,
  345. multiple: usize,
  346. }
  347. impl Working for WorkingState {
  348. fn actor_impl() -> String {
  349. "worker".into()
  350. }
  351. type OnSendCompletedFut = Ready<TransResult<Self, (End, Completed)>>;
  352. fn on_send_completed(self) -> Self::OnSendCompletedFut {
  353. let value = self.multiple * self.factor;
  354. ready(TransResult::Ok((End, Completed { value })))
  355. }
  356. }
  357. use ::tokio::sync::Mutex;
  358. enum ClientState<Init: Unregistered> {
  359. Unregistered(Init),
  360. Registered(Init::OnSendRegisterRegistered),
  361. End(End),
  362. }
  363. impl<Init: Unregistered> ClientState<Init> {
  364. pub fn name(&self) -> &'static str {
  365. match self {
  366. Self::Unregistered(_) => "Unregistered",
  367. Self::Registered(_) => "Registered",
  368. Self::End(_) => "End",
  369. }
  370. }
  371. }
  372. struct ClientHandle<Init: Unregistered> {
  373. runtime: &'static Runtime,
  374. state: Arc<Mutex<Option<ClientState<Init>>>>,
  375. name: ActorName,
  376. }
  377. impl<Init: Unregistered> ClientHandle<Init> {
  378. async fn send_register(&self, to: ServiceAddr, mut msg: Register) -> Result<()> {
  379. let mut guard = self.state.lock().await;
  380. let state = guard
  381. .take()
  382. .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
  383. let new_state = match state {
  384. ClientState::Unregistered(state) => match state.on_send_register(&mut msg).await {
  385. TransResult::Ok(new_state) => {
  386. let msg = ClientCallbackMsgs::Register(msg);
  387. self.runtime
  388. .send_service(to, self.name.clone(), msg)
  389. .await?;
  390. ClientState::Registered(new_state)
  391. }
  392. TransResult::Abort { from, err } => {
  393. log::warn!(
  394. "Aborted transition from the {} state: {}",
  395. "Unregistered",
  396. err
  397. );
  398. ClientState::Unregistered(from)
  399. }
  400. TransResult::Fatal { err } => {
  401. return Err(err);
  402. }
  403. },
  404. state => state,
  405. };
  406. *guard = Some(new_state);
  407. Ok(())
  408. }
  409. }
  410. async fn spawn_client<Init>(init: Init, runtime: &'static Runtime) -> ClientHandle<Init>
  411. where
  412. Init: 'static + Unregistered,
  413. {
  414. let state = Arc::new(Mutex::new(Some(ClientState::Unregistered(init))));
  415. let name = {
  416. let state = state.clone();
  417. runtime.spawn(move |_, mut mailbox, _act_id| async move {
  418. while let Some(envelope) = mailbox.recv().await {
  419. let mut guard = state.lock().await;
  420. let state = guard.take()
  421. .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
  422. let (msg, _kind) = envelope.split();
  423. let new_state = match (state, msg) {
  424. (ClientState::Registered(curr_state), ClientCallbackMsgs::Completed(msg)) => {
  425. match curr_state.handle_completed(msg).await {
  426. TransResult::Ok(next) => ClientState::<Init>::End(next),
  427. TransResult::Abort { from, err } => {
  428. log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Registered", "Completed", err);
  429. ClientState::Registered(from)
  430. }
  431. TransResult::Fatal { err } => {
  432. panic_any(ActorPanic {
  433. actor_impl: Init::actor_impl(),
  434. state: "Registered",
  435. message: "Completed",
  436. kind: TransKind::Receive,
  437. err
  438. });
  439. }
  440. }
  441. }
  442. (state, msg) => {
  443. log::error!("Unexpected message {} in state {}.", msg.name(), state.name());
  444. state
  445. }
  446. };
  447. *guard = Some(new_state);
  448. }
  449. }).await
  450. };
  451. ClientHandle {
  452. runtime,
  453. state,
  454. name,
  455. }
  456. }
  457. async fn register_server<Init, F>(
  458. make_init: F,
  459. runtime: &'static Runtime,
  460. service_id: ServiceId,
  461. ) -> Result<ServiceName>
  462. where
  463. Init: 'static + Listening<HandleRegisterListening = Init>,
  464. F: 'static + Send + Sync + Clone + Fn() -> Init,
  465. {
  466. enum ServerState<S: Listening> {
  467. Listening(S),
  468. }
  469. impl<S: Listening> ServerState<S> {
  470. fn name(&self) -> &'static str {
  471. match self {
  472. Self::Listening(_) => "Listening",
  473. }
  474. }
  475. }
  476. async fn server_loop<Init, F>(
  477. runtime: &'static Runtime,
  478. make_init: F,
  479. mut mailbox: Mailbox<ClientCallbackMsgs>,
  480. _act_id: ActorId,
  481. ) where
  482. Init: 'static + Listening<HandleRegisterListening = Init>,
  483. F: 'static + Send + Sync + Fn() -> Init,
  484. {
  485. let mut state = ServerState::Listening(make_init());
  486. while let Some(envelope) = mailbox.recv().await {
  487. let (msg, msg_kind) = envelope.split();
  488. let new_state = match (state, msg) {
  489. (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
  490. match curr_state.handle_register(msg).await {
  491. TransResult::Ok((new_state, working_state)) => {
  492. if let EnvelopeKind::Send { from, .. } = msg_kind {
  493. start_worker(working_state, from, runtime).await;
  494. } else {
  495. log::error!("Expected Register to be a Send message.");
  496. }
  497. ServerState::Listening(new_state)
  498. }
  499. TransResult::Abort { from, err } => {
  500. log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Listening", "Register", err);
  501. ServerState::Listening(from)
  502. }
  503. TransResult::Fatal { err } => {
  504. panic_any(ActorPanic {
  505. actor_impl: Init::actor_impl(),
  506. state: "Listening",
  507. message: "Register",
  508. kind: TransKind::Receive,
  509. err,
  510. });
  511. }
  512. }
  513. }
  514. (state, msg) => {
  515. log::error!(
  516. "Unexpected message {} in state {}.",
  517. msg.name(),
  518. state.name()
  519. );
  520. state
  521. }
  522. };
  523. state = new_state;
  524. }
  525. }
  526. runtime
  527. .register::<ClientCallbackMsgs, _>(service_id, move |runtime: &'static Runtime| {
  528. let make_init = make_init.clone();
  529. let fut = async move {
  530. let make_init = make_init.clone();
  531. let actor_impl = runtime
  532. .spawn(move |_, mailbox, act_id| {
  533. server_loop(runtime, make_init, mailbox, act_id)
  534. })
  535. .await;
  536. Ok(actor_impl)
  537. };
  538. Box::pin(fut)
  539. })
  540. .await
  541. }
  542. async fn start_worker<Init>(
  543. init: Init,
  544. owned: ActorName,
  545. runtime: &'static Runtime,
  546. ) -> ActorName
  547. where
  548. Init: 'static + Working,
  549. {
  550. enum WorkerState<S: Working> {
  551. Working(S),
  552. }
  553. runtime
  554. .spawn::<ClientCallbackMsgs, _, _>(move |_, _, act_id| async move {
  555. let msg = match init.on_send_completed().await {
  556. TransResult::Ok((End, msg)) => msg,
  557. TransResult::Abort { err, .. } | TransResult::Fatal { err } => {
  558. panic_any(ActorPanic {
  559. actor_impl: Init::actor_impl(),
  560. state: "Working",
  561. message: "Completed",
  562. kind: TransKind::Send,
  563. err,
  564. })
  565. }
  566. };
  567. let from = runtime.actor_name(act_id);
  568. let msg = ClientCallbackMsgs::Completed(msg);
  569. runtime.send(owned, from, msg).await.unwrap_or_else(|err| {
  570. panic_any(ActorPanic {
  571. actor_impl: Init::actor_impl(),
  572. state: "Working",
  573. message: "Completed",
  574. kind: TransKind::Send,
  575. err,
  576. });
  577. });
  578. })
  579. .await
  580. }
  581. #[test]
  582. fn client_callback_protocol() {
  583. ASYNC_RT.block_on(async {
  584. const SERVICE_ID: &str = "ClientCallbackProtocolListening";
  585. let service_id = ServiceId::from(SERVICE_ID);
  586. let service_name = {
  587. let make_init = move || ListeningState { multiple: 2 };
  588. register_server(make_init, &RUNTIME, service_id.clone())
  589. .await
  590. .unwrap()
  591. };
  592. let (sender, receiver) = oneshot::channel();
  593. let client_handle = spawn_client(UnregisteredState { sender }, &RUNTIME).await;
  594. let service_addr = ServiceAddr::new(service_name, false);
  595. client_handle
  596. .send_register(service_addr, Register { factor: 21 })
  597. .await
  598. .unwrap();
  599. let value = timeout(Duration::from_millis(500), receiver)
  600. .await
  601. .unwrap()
  602. .unwrap();
  603. assert_eq!(42, value);
  604. });
  605. }
  606. }