runtime_tests.rs 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838
  1. #![feature(impl_trait_in_assoc_type)]
  2. use btrun::model::*;
  3. use btrun::test_setup;
  4. use btrun::*;
  5. use btlib::Result;
  6. use btproto::protocol;
  7. use log;
  8. use once_cell::sync::Lazy;
  9. use serde::{Deserialize, Serialize};
  10. use std::{
  11. future::{ready, Future, Ready},
  12. sync::{
  13. atomic::{AtomicU8, Ordering},
  14. Arc,
  15. },
  16. };
  17. test_setup!();
  18. mod ping_pong {
  19. use super::*;
  20. use btlib::bterr;
  21. // The following code is a proof-of-concept for what types should be generated for a
  22. // simple ping-pong protocol:
  23. protocol! {
  24. named PingProtocol;
  25. let server = [Server];
  26. let client = [Client];
  27. Client -> End, >service(Server)!Ping;
  28. Server?Ping -> End, >Client!Ping::Reply;
  29. }
  30. //
  31. // In words, the protocol is described as follows.
  32. // 1. When the Listening state receives the Ping message it returns the End state and a
  33. // Ping::Reply message to be sent to the SentPing state.
  34. // 2. When the SentPing state receives the Ping::Reply message it returns the End state.
  35. //
  36. // The End state represents an end to the session described by the protocol. When an actor
  37. // transitions to the End state its function returns.
  38. // When a state is expecting a Reply message, an error occurs if the message is not received
  39. // in a timely manner.
  40. enum PingClientState<T: Client> {
  41. Client(T),
  42. End(End),
  43. }
  44. impl<T: Client> PingClientState<T> {
  45. const fn name(&self) -> &'static str {
  46. match self {
  47. Self::Client(_) => "Client",
  48. Self::End(_) => "End",
  49. }
  50. }
  51. }
  52. struct ClientHandleManual<T: Client> {
  53. state: Option<PingClientState<T>>,
  54. client_name: ActorName,
  55. runtime: &'static Runtime,
  56. }
  57. impl<T: Client> ClientHandleManual<T> {
  58. async fn send_ping(
  59. mut self,
  60. msg: Ping,
  61. service: ServiceAddr,
  62. ) -> TransResult<Self, (ClientHandleManual<T>, T::OnSendPingReturn)> {
  63. let state = if let Some(state) = self.state.take() {
  64. state
  65. } else {
  66. return TransResult::Abort {
  67. from: self,
  68. err: bterr!("The shared state was not returned."),
  69. };
  70. };
  71. match state {
  72. PingClientState::Client(state) => {
  73. let result = self
  74. .runtime
  75. .call_service(
  76. service,
  77. self.client_name.clone(),
  78. PingProtocolMsgs::Ping(msg),
  79. )
  80. .await;
  81. let reply_enum = match result {
  82. Ok(reply_enum) => reply_enum,
  83. Err(err) => {
  84. self.state = Some(PingClientState::Client(state));
  85. return TransResult::Abort { from: self, err };
  86. }
  87. };
  88. if let PingProtocolMsgs::PingReply(reply) = reply_enum {
  89. match state.on_send_ping(reply).await {
  90. TransResult::Ok((new_state, return_var)) => {
  91. self.state = Some(PingClientState::End(new_state));
  92. TransResult::Ok((self, return_var))
  93. }
  94. TransResult::Abort { from, err } => {
  95. self.state = Some(PingClientState::Client(from));
  96. TransResult::Abort { from: self, err }
  97. }
  98. TransResult::Fatal { err } => return TransResult::Fatal { err },
  99. }
  100. } else {
  101. TransResult::Abort {
  102. from: self,
  103. err: bterr!("Unexpected reply type."),
  104. }
  105. }
  106. }
  107. state => {
  108. let err = bterr!("Can't send Ping in state {}.", state.name());
  109. self.state = Some(state);
  110. TransResult::Abort { from: self, err }
  111. }
  112. }
  113. }
  114. }
  115. async fn spawn_client_manual<T: Client>(
  116. init: T,
  117. runtime: &'static Runtime,
  118. ) -> ClientHandleManual<T> {
  119. let state = Some(PingClientState::Client(init));
  120. let client_name = runtime.spawn(None, do_nothing_actor).await.unwrap();
  121. ClientHandleManual {
  122. state,
  123. client_name,
  124. runtime,
  125. }
  126. }
  127. async fn register_server_manual<Init, F>(
  128. make_init: F,
  129. rt: &'static Runtime,
  130. id: ServiceId,
  131. ) -> Result<ServiceName>
  132. where
  133. Init: 'static + Server,
  134. F: 'static + Send + Sync + Clone + Fn() -> Init,
  135. {
  136. enum ServerState<S> {
  137. Server(S),
  138. End(End),
  139. }
  140. impl<S> Named for ServerState<S> {
  141. fn name(&self) -> Arc<String> {
  142. static SERVER_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("Server".into()));
  143. static END_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("End".into()));
  144. match self {
  145. Self::Server(_) => SERVER_NAME.clone(),
  146. Self::End(_) => END_NAME.clone(),
  147. }
  148. }
  149. }
  150. async fn server_loop<Init, F>(
  151. _runtime: &'static Runtime,
  152. make_init: F,
  153. mut mailbox: Mailbox<PingProtocolMsgs>,
  154. actor_id: ActorId,
  155. ) -> ActorResult
  156. where
  157. Init: 'static + Server,
  158. F: 'static + Send + Sync + FnOnce() -> Init,
  159. {
  160. let mut state = ServerState::Server(make_init());
  161. while let Some(envelope) = mailbox.recv().await {
  162. state = match envelope {
  163. Envelope::Call {
  164. msg,
  165. reply: replier,
  166. ..
  167. } => match (state, msg) {
  168. (ServerState::Server(listening_state), PingProtocolMsgs::Ping(msg)) => {
  169. match listening_state.handle_ping(msg).await {
  170. TransResult::Ok((new_state, reply)) => {
  171. let replier = replier
  172. .ok_or_else(|| bterr!("Reply has already been sent."))
  173. .unwrap();
  174. if let Err(_) = replier.send(PingProtocolMsgs::PingReply(reply))
  175. {
  176. return Err(ActorError::new(
  177. bterr!("Failed to send Ping reply."),
  178. ActorErrorPayload {
  179. actor_id,
  180. actor_impl: Init::actor_impl(),
  181. state: Init::state_name(),
  182. message: PingProtocolMsgKinds::Ping.name(),
  183. kind: TransKind::Receive,
  184. },
  185. ));
  186. }
  187. ServerState::End(new_state)
  188. }
  189. TransResult::Abort { from, err } => {
  190. log::warn!("Aborted transition from the {} while handling the {} message: {}", "Server", "Ping", err);
  191. ServerState::Server(from)
  192. }
  193. TransResult::Fatal { err } => {
  194. return Err(ActorError::new(
  195. err,
  196. ActorErrorPayload {
  197. actor_id,
  198. actor_impl: Init::actor_impl(),
  199. state: Init::state_name(),
  200. message: PingProtocolMsgKinds::Ping.name(),
  201. kind: TransKind::Receive,
  202. },
  203. ));
  204. }
  205. }
  206. }
  207. (state, _) => state,
  208. },
  209. envelope => {
  210. return Err(ActorError::new(
  211. bterr!("Unexpected envelope type: {}", envelope.name()),
  212. ActorErrorPayload {
  213. actor_id,
  214. actor_impl: Init::actor_impl(),
  215. state: state.name(),
  216. message: envelope.msg_name(),
  217. kind: TransKind::Receive,
  218. },
  219. ))
  220. }
  221. };
  222. if let ServerState::End(_) = state {
  223. break;
  224. }
  225. }
  226. Ok(actor_id)
  227. }
  228. rt.register::<PingProtocolMsgs, _>(id, move |runtime| {
  229. let make_init = make_init.clone();
  230. let fut = async move {
  231. let actor_impl = runtime
  232. .spawn(None, move |mailbox, act_id, runtime| {
  233. server_loop(runtime, make_init, mailbox, act_id)
  234. })
  235. .await
  236. .unwrap();
  237. Ok(actor_impl)
  238. };
  239. Box::pin(fut)
  240. })
  241. .await
  242. }
  243. #[derive(Serialize, Deserialize)]
  244. pub struct Ping;
  245. impl CallMsg for Ping {
  246. type Reply = PingReply;
  247. }
  248. #[derive(Serialize, Deserialize)]
  249. pub struct PingReply;
  250. struct ClientImpl {
  251. counter: Arc<AtomicU8>,
  252. }
  253. impl ClientImpl {
  254. fn new(counter: Arc<AtomicU8>) -> Self {
  255. counter.fetch_add(1, Ordering::SeqCst);
  256. Self { counter }
  257. }
  258. }
  259. impl Client for ClientImpl {
  260. actor_name!("ping_client");
  261. type OnSendPingReturn = ();
  262. type OnSendPingFut = impl Future<Output = TransResult<Self, (End, ())>>;
  263. fn on_send_ping(self, _msg: PingReply) -> Self::OnSendPingFut {
  264. self.counter.fetch_sub(1, Ordering::SeqCst);
  265. ready(TransResult::Ok((End, ())))
  266. }
  267. }
  268. struct ServerState {
  269. counter: Arc<AtomicU8>,
  270. }
  271. impl ServerState {
  272. fn new(counter: Arc<AtomicU8>) -> Self {
  273. counter.fetch_add(1, Ordering::SeqCst);
  274. Self { counter }
  275. }
  276. }
  277. impl Server for ServerState {
  278. actor_name!("ping_server");
  279. type HandlePingFut = impl Future<Output = TransResult<Self, (End, PingReply)>>;
  280. fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
  281. self.counter.fetch_sub(1, Ordering::SeqCst);
  282. ready(TransResult::Ok((End, PingReply)))
  283. }
  284. }
  285. #[test]
  286. fn ping_pong_test() {
  287. ASYNC_RT.block_on(async {
  288. const SERVICE_ID: &str = "PingPongProtocolServer";
  289. let service_id = ServiceId::from(SERVICE_ID);
  290. let counter = Arc::new(AtomicU8::new(0));
  291. let service_name = {
  292. let service_counter = counter.clone();
  293. let make_init = move || {
  294. let server_counter = service_counter.clone();
  295. ServerState::new(server_counter)
  296. };
  297. register_server_manual(make_init, &RUNTIME, service_id.clone())
  298. .await
  299. .unwrap()
  300. };
  301. let client_handle =
  302. spawn_client_manual(ClientImpl::new(counter.clone()), &RUNTIME).await;
  303. let service_addr = ServiceAddr::new(service_name, true);
  304. client_handle.send_ping(Ping, service_addr).await.unwrap();
  305. assert_eq!(0, counter.load(Ordering::SeqCst));
  306. });
  307. }
  308. }
  309. mod travel_agency {
  310. use super::*;
  311. // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
  312. // example in the survey paper "Behavioral Types in Programming Languages."
  313. // Note that the Choosing state can send messages at any time, not just in response to another
  314. // message because there is a transition from Choosing that doesn't use the receive operator
  315. // (`?`).
  316. protocol! {
  317. named TravelAgency;
  318. let agency = [Listening];
  319. let customer = [Choosing];
  320. Choosing -> Choosing, >service(Listening)!Query;
  321. Choosing -> Choosing, >service(Listening)!Accept;
  322. Choosing -> Choosing, >service(Listening)!Reject;
  323. Listening?Query -> Listening, >Choosing!Query::Reply;
  324. Listening?Accept -> End, >Choosing!Accept::Reply;
  325. Listening?Reject -> End, >Choosing!Reject::Reply;
  326. }
  327. #[derive(Serialize, Deserialize)]
  328. pub struct Query;
  329. impl CallMsg for Query {
  330. type Reply = ();
  331. }
  332. #[derive(Serialize, Deserialize)]
  333. pub struct Reject;
  334. impl CallMsg for Reject {
  335. type Reply = ();
  336. }
  337. #[derive(Serialize, Deserialize)]
  338. pub struct Accept;
  339. impl CallMsg for Accept {
  340. type Reply = ();
  341. }
  342. }
  343. #[allow(dead_code)]
  344. mod client_callback {
  345. use super::*;
  346. use btlib::bterr;
  347. use once_cell::sync::Lazy;
  348. use std::{marker::PhantomData, panic::panic_any, time::Duration};
  349. use tokio::{sync::oneshot, time::timeout};
  350. #[derive(Serialize, Deserialize)]
  351. pub struct Register {
  352. factor: usize,
  353. }
  354. #[derive(Serialize, Deserialize)]
  355. pub struct Completed {
  356. value: usize,
  357. }
  358. protocol! {
  359. named ClientCallback;
  360. let server = [Listening];
  361. let worker = [Working];
  362. let client = [Unregistered, Registered];
  363. Unregistered -> Registered, >service(Listening)!Register[Registered];
  364. Listening?Register[Registered] -> Listening, Working[Registered];
  365. Working[Registered] -> End, >Registered!Completed;
  366. Registered?Completed -> End;
  367. }
  368. struct UnregisteredState {
  369. sender: oneshot::Sender<usize>,
  370. }
  371. impl Unregistered for UnregisteredState {
  372. actor_name!("callback_client");
  373. type OnSendRegisterReturn = ();
  374. type OnSendRegisterRegistered = RegisteredState;
  375. type OnSendRegisterFut = Ready<TransResult<Self, (Self::OnSendRegisterRegistered, ())>>;
  376. fn on_send_register(self) -> Self::OnSendRegisterFut {
  377. ready(TransResult::Ok((
  378. RegisteredState {
  379. sender: self.sender,
  380. },
  381. (),
  382. )))
  383. }
  384. }
  385. struct RegisteredState {
  386. sender: oneshot::Sender<usize>,
  387. }
  388. impl Registered for RegisteredState {
  389. type HandleCompletedFut = Ready<TransResult<Self, End>>;
  390. fn handle_completed(self, arg: Completed) -> Self::HandleCompletedFut {
  391. self.sender.send(arg.value).unwrap();
  392. ready(TransResult::Ok(End))
  393. }
  394. }
  395. struct ListeningState {
  396. multiple: usize,
  397. }
  398. impl Listening for ListeningState {
  399. actor_name!("callback_server");
  400. type HandleRegisterListening = ListeningState;
  401. type HandleRegisterWorking = WorkingState;
  402. type HandleRegisterFut = Ready<TransResult<Self, (ListeningState, WorkingState)>>;
  403. fn handle_register(self, arg: Register) -> Self::HandleRegisterFut {
  404. let multiple = self.multiple;
  405. ready(TransResult::Ok((
  406. self,
  407. WorkingState {
  408. factor: arg.factor,
  409. multiple,
  410. },
  411. )))
  412. }
  413. }
  414. struct WorkingState {
  415. factor: usize,
  416. multiple: usize,
  417. }
  418. impl Working for WorkingState {
  419. actor_name!("callback_worker");
  420. type OnSendCompletedFut = Ready<TransResult<Self, (End, Completed)>>;
  421. fn on_send_completed(self) -> Self::OnSendCompletedFut {
  422. let value = self.multiple * self.factor;
  423. ready(TransResult::Ok((End, Completed { value })))
  424. }
  425. }
  426. use ::tokio::sync::Mutex;
  427. enum ClientStateManual<Init: Unregistered> {
  428. Unregistered(Init),
  429. Registered(Init::OnSendRegisterRegistered),
  430. End(End),
  431. }
  432. impl<Init: Unregistered> Named for ClientStateManual<Init> {
  433. fn name(&self) -> Arc<String> {
  434. static UNREGISTERED_NAME: Lazy<Arc<String>> =
  435. Lazy::new(|| Arc::new("Unregistered".into()));
  436. static REGISTERED_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("Registered".into()));
  437. static END_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("End".into()));
  438. match self {
  439. Self::Unregistered(_) => UNREGISTERED_NAME.clone(),
  440. Self::Registered(_) => REGISTERED_NAME.clone(),
  441. Self::End(_) => END_NAME.clone(),
  442. }
  443. }
  444. }
  445. struct ClientHandleManual<Init: Unregistered, State> {
  446. runtime: &'static Runtime,
  447. state: Arc<Mutex<Option<ClientStateManual<Init>>>>,
  448. name: ActorName,
  449. type_state: PhantomData<State>,
  450. }
  451. impl<Init: Unregistered, State> ClientHandleManual<Init, State> {
  452. fn new_type<NewState>(self) -> ClientHandleManual<Init, NewState> {
  453. ClientHandleManual {
  454. runtime: self.runtime,
  455. state: self.state,
  456. name: self.name,
  457. type_state: PhantomData,
  458. }
  459. }
  460. }
  461. impl<
  462. Init: Unregistered,
  463. State: Unregistered<OnSendRegisterRegistered = NewState>,
  464. NewState: Registered,
  465. > ClientHandleManual<Init, State>
  466. {
  467. async fn send_register(
  468. self,
  469. to: ServiceAddr,
  470. msg: Register,
  471. ) -> TransResult<
  472. Self,
  473. (
  474. ClientHandleManual<Init, NewState>,
  475. Init::OnSendRegisterReturn,
  476. ),
  477. > {
  478. let mut guard = self.state.lock().await;
  479. let state = guard
  480. .take()
  481. .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
  482. match state {
  483. ClientStateManual::Unregistered(state) => match state.on_send_register().await {
  484. TransResult::Ok((new_state, return_var)) => {
  485. let msg = ClientCallbackMsgs::Register(msg);
  486. let result = self.runtime.send_service(to, self.name.clone(), msg).await;
  487. if let Err(err) = result {
  488. return TransResult::Fatal { err };
  489. }
  490. *guard = Some(ClientStateManual::Registered(new_state));
  491. drop(guard);
  492. TransResult::Ok((self.new_type(), return_var))
  493. }
  494. TransResult::Abort { from, err } => {
  495. *guard = Some(ClientStateManual::Unregistered(from));
  496. drop(guard);
  497. return TransResult::Abort { from: self, err };
  498. }
  499. TransResult::Fatal { err } => {
  500. return TransResult::Fatal { err };
  501. }
  502. },
  503. state => {
  504. let name = state.name();
  505. *guard = Some(state);
  506. drop(guard);
  507. TransResult::Abort {
  508. from: self,
  509. err: bterr!(
  510. "Unexpected state '{}' for '{}' method.",
  511. name,
  512. "send_register"
  513. ),
  514. }
  515. }
  516. }
  517. }
  518. }
  519. async fn spawn_client_manual<Init>(
  520. init: Init,
  521. runtime: &'static Runtime,
  522. ) -> ClientHandleManual<Init, Init>
  523. where
  524. Init: 'static + Unregistered,
  525. {
  526. let state = Arc::new(Mutex::new(Some(ClientStateManual::Unregistered(init))));
  527. let name = {
  528. let state = state.clone();
  529. runtime.spawn(None, move |mut mailbox, actor_id, _| async move {
  530. while let Some(envelope) = mailbox.recv().await {
  531. let mut guard = state.lock().await;
  532. let state = guard.take()
  533. .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
  534. let new_state = match envelope {
  535. Envelope::Send { msg, .. } => {
  536. match (state, msg) {
  537. (ClientStateManual::Registered(curr_state), ClientCallbackMsgs::Completed(msg)) => {
  538. match curr_state.handle_completed(msg).await {
  539. TransResult::Ok(next) => ClientStateManual::<Init>::End(next),
  540. TransResult::Abort { from, err } => {
  541. log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Registered", "Completed", err);
  542. ClientStateManual::Registered(from)
  543. }
  544. TransResult::Fatal { err } => {
  545. panic_any(ActorError::new(
  546. err,
  547. ActorErrorPayload {
  548. actor_id,
  549. actor_impl: Init::actor_impl(),
  550. state: Init::OnSendRegisterRegistered::state_name(),
  551. message: ClientCallbackMsgKinds::Completed.name(),
  552. kind: TransKind::Receive,
  553. }));
  554. }
  555. }
  556. }
  557. (state, msg) => {
  558. log::error!("Unexpected message {} in state {}.", msg.name(), state.name());
  559. state
  560. }
  561. }
  562. }
  563. envelope => return Err(ActorError::new(
  564. bterr!("Unexpected envelope type: {}", envelope.name()),
  565. ActorErrorPayload {
  566. actor_id,
  567. actor_impl: Init::actor_impl(),
  568. state: state.name(),
  569. message: envelope.msg_name(),
  570. kind: TransKind::Receive,
  571. }))
  572. };
  573. *guard = Some(new_state);
  574. if let Some(state) = &*guard {
  575. if let ClientStateManual::End(_) = state {
  576. break;
  577. }
  578. }
  579. }
  580. Ok(actor_id)
  581. }).await.unwrap()
  582. };
  583. ClientHandleManual {
  584. runtime,
  585. state,
  586. name,
  587. type_state: PhantomData,
  588. }
  589. }
  590. async fn register_server_manual<Init, F>(
  591. make_init: F,
  592. runtime: &'static Runtime,
  593. service_id: ServiceId,
  594. ) -> Result<ServiceName>
  595. where
  596. Init: 'static + Listening<HandleRegisterListening = Init>,
  597. F: 'static + Send + Sync + Clone + Fn() -> Init,
  598. {
  599. enum ServerState<Init: Listening> {
  600. Listening(Init),
  601. }
  602. impl<S: Listening> Named for ServerState<S> {
  603. fn name(&self) -> Arc<String> {
  604. static LISTENING_NAME: Lazy<Arc<String>> =
  605. Lazy::new(|| Arc::new("Listening".into()));
  606. match self {
  607. Self::Listening(_) => LISTENING_NAME.clone(),
  608. }
  609. }
  610. }
  611. async fn server_loop<Init, F>(
  612. runtime: &'static Runtime,
  613. make_init: F,
  614. mut mailbox: Mailbox<ClientCallbackMsgs>,
  615. actor_id: ActorId,
  616. ) -> ActorResult
  617. where
  618. Init: 'static + Listening<HandleRegisterListening = Init>,
  619. F: 'static + Send + Sync + Fn() -> Init,
  620. {
  621. let mut state = ServerState::Listening(make_init());
  622. while let Some(envelope) = mailbox.recv().await {
  623. let new_state = match envelope {
  624. Envelope::Send { msg, from, .. } => match (state, msg) {
  625. (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
  626. match curr_state.handle_register(msg).await {
  627. TransResult::Ok((new_state, working_state)) => {
  628. start_worker_manual(working_state, from, runtime).await;
  629. ServerState::Listening(new_state)
  630. }
  631. TransResult::Abort { from, err } => {
  632. log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Listening", "Register", err);
  633. ServerState::Listening(from)
  634. }
  635. TransResult::Fatal { err } => {
  636. let err = ActorError::new(
  637. err,
  638. ActorErrorPayload {
  639. actor_id,
  640. actor_impl: Init::actor_impl(),
  641. state: Init::state_name(),
  642. message: ClientCallbackMsgKinds::Register.name(),
  643. kind: TransKind::Receive,
  644. },
  645. );
  646. panic_any(format!("{err}"));
  647. }
  648. }
  649. }
  650. (state, msg) => {
  651. log::error!(
  652. "Unexpected message {} in state {}.",
  653. msg.name(),
  654. state.name()
  655. );
  656. state
  657. }
  658. },
  659. envelope => {
  660. return Err(ActorError::new(
  661. bterr!("Unexpected envelope type: {}", envelope.name()),
  662. ActorErrorPayload {
  663. actor_id,
  664. actor_impl: Init::actor_impl(),
  665. state: state.name(),
  666. message: envelope.msg_name(),
  667. kind: TransKind::Receive,
  668. },
  669. ))
  670. }
  671. };
  672. state = new_state;
  673. }
  674. Ok(actor_id)
  675. }
  676. runtime
  677. .register::<ClientCallbackMsgs, _>(service_id, move |runtime: &'static Runtime| {
  678. let make_init = make_init.clone();
  679. let fut = async move {
  680. let make_init = make_init.clone();
  681. let actor_impl = runtime
  682. .spawn(None, move |mailbox, act_id, runtime| {
  683. server_loop(runtime, make_init, mailbox, act_id)
  684. })
  685. .await
  686. .unwrap();
  687. Ok(actor_impl)
  688. };
  689. Box::pin(fut)
  690. })
  691. .await
  692. }
  693. async fn start_worker_manual<Init>(
  694. init: Init,
  695. owned: ActorName,
  696. runtime: &'static Runtime,
  697. ) -> ActorName
  698. where
  699. Init: 'static + Working,
  700. {
  701. enum WorkerState<S: Working> {
  702. Working(S),
  703. }
  704. runtime
  705. .spawn::<ClientCallbackMsgs, _, _>(
  706. Some(owned.clone()),
  707. move |_, actor_id, _| async move {
  708. let msg = match init.on_send_completed().await {
  709. TransResult::Ok((End, msg)) => msg,
  710. TransResult::Abort { err, .. } | TransResult::Fatal { err } => {
  711. let err = ActorError::new(
  712. err,
  713. ActorErrorPayload {
  714. actor_id,
  715. actor_impl: Init::actor_impl(),
  716. state: Init::state_name(),
  717. message: ClientCallbackMsgKinds::Completed.name(),
  718. kind: TransKind::Send,
  719. },
  720. );
  721. panic_any(format!("{err}"))
  722. }
  723. };
  724. let from = runtime.actor_name(actor_id);
  725. let msg = ClientCallbackMsgs::Completed(msg);
  726. runtime.send(owned, from, msg).await.unwrap_or_else(|err| {
  727. let err = ActorError::new(
  728. err,
  729. ActorErrorPayload {
  730. actor_id,
  731. actor_impl: Init::actor_impl(),
  732. state: Init::state_name(),
  733. message: ClientCallbackMsgKinds::Completed.name(),
  734. kind: TransKind::Send,
  735. },
  736. );
  737. panic_any(format!("{err}"));
  738. });
  739. Ok(actor_id)
  740. },
  741. )
  742. .await
  743. .unwrap()
  744. }
  745. #[test]
  746. fn client_callback_protocol() {
  747. ASYNC_RT.block_on(async {
  748. const SERVICE_ID: &str = "ClientCallbackProtocolListening";
  749. let service_id = ServiceId::from(SERVICE_ID);
  750. let service_name = {
  751. let make_init = move || ListeningState { multiple: 2 };
  752. register_server_manual(make_init, &RUNTIME, service_id.clone())
  753. .await
  754. .unwrap()
  755. };
  756. let (sender, receiver) = oneshot::channel();
  757. let client_handle = spawn_client_manual(UnregisteredState { sender }, &RUNTIME).await;
  758. let service_addr = ServiceAddr::new(service_name, false);
  759. client_handle
  760. .send_register(service_addr, Register { factor: 21 })
  761. .await
  762. .unwrap();
  763. let value = timeout(Duration::from_millis(500), receiver)
  764. .await
  765. .unwrap()
  766. .unwrap();
  767. assert_eq!(42, value);
  768. });
  769. }
  770. }