runtime_tests.rs 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. #![feature(impl_trait_in_assoc_type)]
  2. use btrun::model::*;
  3. use btrun::test_setup;
  4. use btrun::*;
  5. use btlib::Result;
  6. use btproto::protocol;
  7. use log;
  8. use once_cell::sync::Lazy;
  9. use serde::{Deserialize, Serialize};
  10. use std::{
  11. future::{ready, Future, Ready},
  12. sync::{
  13. atomic::{AtomicU8, Ordering},
  14. Arc,
  15. },
  16. };
  17. test_setup!();
  18. mod ping_pong {
  19. use super::*;
  20. use btlib::bterr;
  21. // The following code is a proof-of-concept for what types should be generated for a
  22. // simple ping-pong protocol:
  23. protocol! {
  24. named PingProtocol;
  25. let server = [Server];
  26. let client = [Client];
  27. Client -> End, >service(Server)!Ping;
  28. Server?Ping -> End, >Client!Ping::Reply;
  29. }
  30. //
  31. // In words, the protocol is described as follows.
  32. // 1. When the Listening state receives the Ping message it returns the End state and a
  33. // Ping::Reply message to be sent to the SentPing state.
  34. // 2. When the SentPing state receives the Ping::Reply message it returns the End state.
  35. //
  36. // The End state represents an end to the session described by the protocol. When an actor
  37. // transitions to the End state its function returns.
  38. // When a state is expecting a Reply message, an error occurs if the message is not received
  39. // in a timely manner.
  40. enum PingClientState<T: Client> {
  41. Client(T),
  42. End(End),
  43. }
  44. impl<T: Client> PingClientState<T> {
  45. const fn name(&self) -> &'static str {
  46. match self {
  47. Self::Client(_) => "Client",
  48. Self::End(_) => "End",
  49. }
  50. }
  51. }
  52. struct ClientHandle<T: Client> {
  53. state: Option<PingClientState<T>>,
  54. runtime: &'static Runtime,
  55. }
  56. impl<T: Client> ClientHandle<T> {
  57. async fn send_ping(&mut self, mut msg: Ping, service: ServiceAddr) -> Result<PingReply> {
  58. let state = self
  59. .state
  60. .take()
  61. .ok_or_else(|| bterr!("State was not returned."))?;
  62. let (new_state, result) = match state {
  63. PingClientState::Client(state) => match state.on_send_ping(&mut msg).await {
  64. TransResult::Ok((new_state, _)) => {
  65. let new_state = PingClientState::End(new_state);
  66. let result = self
  67. .runtime
  68. .call_service(service, PingProtocolMsgs::Ping(msg))
  69. .await;
  70. (new_state, result)
  71. }
  72. TransResult::Abort { from, err } => {
  73. let new_state = PingClientState::Client(from);
  74. (new_state, Err(err))
  75. }
  76. TransResult::Fatal { err } => return Err(err),
  77. },
  78. state => {
  79. let result = Err(bterr!("Can't send Ping in state {}.", state.name()));
  80. (state, result)
  81. }
  82. };
  83. self.state = Some(new_state);
  84. let reply = result?;
  85. match reply {
  86. PingProtocolMsgs::PingReply(reply) => Ok(reply),
  87. msg => Err(bterr!(
  88. "Unexpected message type sent in reply: {}",
  89. msg.name()
  90. )),
  91. }
  92. }
  93. }
  94. async fn spawn_client<T: Client>(init: T, runtime: &'static Runtime) -> ClientHandle<T> {
  95. let state = Some(PingClientState::Client(init));
  96. ClientHandle { state, runtime }
  97. }
  98. async fn register_server<Init, F>(
  99. make_init: F,
  100. rt: &'static Runtime,
  101. id: ServiceId,
  102. ) -> Result<ServiceName>
  103. where
  104. Init: 'static + Server,
  105. F: 'static + Send + Sync + Clone + Fn() -> Init,
  106. {
  107. enum ServerState<S> {
  108. Server(S),
  109. End(End),
  110. }
  111. impl<S> Named for ServerState<S> {
  112. fn name(&self) -> Arc<String> {
  113. static SERVER_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("Server".into()));
  114. static END_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("End".into()));
  115. match self {
  116. Self::Server(_) => SERVER_NAME.clone(),
  117. Self::End(_) => END_NAME.clone(),
  118. }
  119. }
  120. }
  121. async fn server_loop<Init, F>(
  122. _runtime: &'static Runtime,
  123. make_init: F,
  124. mut mailbox: Mailbox<PingProtocolMsgs>,
  125. actor_id: ActorId,
  126. ) -> ActorResult
  127. where
  128. Init: 'static + Server,
  129. F: 'static + Send + Sync + FnOnce() -> Init,
  130. {
  131. let mut state = ServerState::Server(make_init());
  132. while let Some(envelope) = mailbox.recv().await {
  133. state = match envelope {
  134. Envelope::Call {
  135. msg,
  136. reply: replier,
  137. ..
  138. } => match (state, msg) {
  139. (ServerState::Server(listening_state), PingProtocolMsgs::Ping(msg)) => {
  140. match listening_state.handle_ping(msg).await {
  141. TransResult::Ok((new_state, reply)) => {
  142. let replier =
  143. replier.expect("The reply has already been sent.");
  144. if let Err(_) = replier.send(PingProtocolMsgs::PingReply(reply))
  145. {
  146. return Err(ActorError::new(
  147. bterr!("Failed to send Ping reply."),
  148. ActorErrorPayload {
  149. actor_id,
  150. actor_impl: Init::actor_impl(),
  151. state: Init::state_name(),
  152. message: PingProtocolMsgKinds::Ping.name(),
  153. kind: TransKind::Receive,
  154. },
  155. ));
  156. }
  157. ServerState::End(new_state)
  158. }
  159. TransResult::Abort { from, err } => {
  160. log::warn!("Aborted transition from the {} while handling the {} message: {}", "Server", "Ping", err);
  161. ServerState::Server(from)
  162. }
  163. TransResult::Fatal { err } => {
  164. return Err(ActorError::new(
  165. err,
  166. ActorErrorPayload {
  167. actor_id,
  168. actor_impl: Init::actor_impl(),
  169. state: Init::state_name(),
  170. message: PingProtocolMsgKinds::Ping.name(),
  171. kind: TransKind::Receive,
  172. },
  173. ));
  174. }
  175. }
  176. }
  177. (state, _) => state,
  178. },
  179. envelope => {
  180. return Err(ActorError::new(
  181. bterr!("Unexpected envelope type: {}", envelope.name()),
  182. ActorErrorPayload {
  183. actor_id,
  184. actor_impl: Init::actor_impl(),
  185. state: state.name(),
  186. message: envelope.msg_name(),
  187. kind: TransKind::Receive,
  188. },
  189. ))
  190. }
  191. };
  192. if let ServerState::End(_) = state {
  193. break;
  194. }
  195. }
  196. Ok(actor_id)
  197. }
  198. rt.register::<PingProtocolMsgs, _>(id, move |runtime| {
  199. let make_init = make_init.clone();
  200. let fut = async move {
  201. let actor_impl = runtime
  202. .spawn(None, move |mailbox, act_id, runtime| {
  203. server_loop(runtime, make_init, mailbox, act_id)
  204. })
  205. .await
  206. .unwrap();
  207. Ok(actor_impl)
  208. };
  209. Box::pin(fut)
  210. })
  211. .await
  212. }
  213. #[derive(Serialize, Deserialize)]
  214. pub struct Ping;
  215. impl CallMsg for Ping {
  216. type Reply = PingReply;
  217. }
  218. #[derive(Serialize, Deserialize)]
  219. pub struct PingReply;
  220. struct ClientState {
  221. counter: Arc<AtomicU8>,
  222. }
  223. impl ClientState {
  224. fn new(counter: Arc<AtomicU8>) -> Self {
  225. counter.fetch_add(1, Ordering::SeqCst);
  226. Self { counter }
  227. }
  228. }
  229. impl Client for ClientState {
  230. actor_name!("ping_client");
  231. type OnSendPingFut = impl Future<Output = TransResult<Self, (End, PingReply)>>;
  232. fn on_send_ping(self, _msg: &mut Ping) -> Self::OnSendPingFut {
  233. self.counter.fetch_sub(1, Ordering::SeqCst);
  234. ready(TransResult::Ok((End, PingReply)))
  235. }
  236. }
  237. struct ServerState {
  238. counter: Arc<AtomicU8>,
  239. }
  240. impl ServerState {
  241. fn new(counter: Arc<AtomicU8>) -> Self {
  242. counter.fetch_add(1, Ordering::SeqCst);
  243. Self { counter }
  244. }
  245. }
  246. impl Server for ServerState {
  247. actor_name!("ping_server");
  248. type HandlePingFut = impl Future<Output = TransResult<Self, (End, PingReply)>>;
  249. fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
  250. self.counter.fetch_sub(1, Ordering::SeqCst);
  251. ready(TransResult::Ok((End, PingReply)))
  252. }
  253. }
  254. #[test]
  255. fn ping_pong_test() {
  256. ASYNC_RT.block_on(async {
  257. const SERVICE_ID: &str = "PingPongProtocolServer";
  258. let service_id = ServiceId::from(SERVICE_ID);
  259. let counter = Arc::new(AtomicU8::new(0));
  260. let service_name = {
  261. let service_counter = counter.clone();
  262. let make_init = move || {
  263. let server_counter = service_counter.clone();
  264. ServerState::new(server_counter)
  265. };
  266. register_server(make_init, &RUNTIME, service_id.clone())
  267. .await
  268. .unwrap()
  269. };
  270. let mut client_handle = spawn_client(ClientState::new(counter.clone()), &RUNTIME).await;
  271. let service_addr = ServiceAddr::new(service_name, true);
  272. client_handle.send_ping(Ping, service_addr).await.unwrap();
  273. assert_eq!(0, counter.load(Ordering::SeqCst));
  274. RUNTIME.deregister(&service_id, None).await.unwrap();
  275. });
  276. }
  277. }
  278. mod travel_agency {
  279. use super::*;
  280. // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
  281. // example in the survey paper "Behavioral Types in Programming Languages."
  282. // Note that the Choosing state can send messages at any time, not just in response to another
  283. // message because there is a transition from Choosing that doesn't use the receive operator
  284. // (`?`).
  285. protocol! {
  286. named TravelAgency;
  287. let agency = [Listening];
  288. let customer = [Choosing];
  289. Choosing -> Choosing, >service(Listening)!Query;
  290. Choosing -> Choosing, >service(Listening)!Accept;
  291. Choosing -> Choosing, >service(Listening)!Reject;
  292. Listening?Query -> Listening, >Choosing!Query::Reply;
  293. Choosing?Query::Reply -> Choosing;
  294. Listening?Accept -> End, >Choosing!Accept::Reply;
  295. Choosing?Accept::Reply -> End;
  296. Listening?Reject -> End, >Choosing!Reject::Reply;
  297. Choosing?Reject::Reply -> End;
  298. }
  299. #[derive(Serialize, Deserialize)]
  300. pub struct Query;
  301. impl CallMsg for Query {
  302. type Reply = ();
  303. }
  304. #[derive(Serialize, Deserialize)]
  305. pub struct Reject;
  306. impl CallMsg for Reject {
  307. type Reply = ();
  308. }
  309. #[derive(Serialize, Deserialize)]
  310. pub struct Accept;
  311. impl CallMsg for Accept {
  312. type Reply = ();
  313. }
  314. }
  315. #[allow(dead_code)]
  316. mod client_callback {
  317. use super::*;
  318. use btlib::bterr;
  319. use once_cell::sync::Lazy;
  320. use std::{panic::panic_any, time::Duration};
  321. use tokio::{sync::oneshot, time::timeout};
  322. #[derive(Serialize, Deserialize)]
  323. pub struct Register {
  324. factor: usize,
  325. }
  326. #[derive(Serialize, Deserialize)]
  327. pub struct Completed {
  328. value: usize,
  329. }
  330. protocol! {
  331. named ClientCallback;
  332. let server = [Listening];
  333. let worker = [Working];
  334. let client = [Unregistered, Registered];
  335. Unregistered -> Registered, >service(Listening)!Register[Registered];
  336. Listening?Register[Registered] -> Listening, Working[Registered];
  337. Working[Registered] -> End, >Registered!Completed;
  338. Registered?Completed -> End;
  339. }
  340. struct UnregisteredState {
  341. sender: oneshot::Sender<usize>,
  342. }
  343. impl Unregistered for UnregisteredState {
  344. actor_name!("callback_client");
  345. type OnSendRegisterRegistered = RegisteredState;
  346. type OnSendRegisterFut = Ready<TransResult<Self, Self::OnSendRegisterRegistered>>;
  347. fn on_send_register(self, _arg: &mut Register) -> Self::OnSendRegisterFut {
  348. ready(TransResult::Ok(RegisteredState {
  349. sender: self.sender,
  350. }))
  351. }
  352. }
  353. struct RegisteredState {
  354. sender: oneshot::Sender<usize>,
  355. }
  356. impl Registered for RegisteredState {
  357. type HandleCompletedFut = Ready<TransResult<Self, End>>;
  358. fn handle_completed(self, arg: Completed) -> Self::HandleCompletedFut {
  359. self.sender.send(arg.value).unwrap();
  360. ready(TransResult::Ok(End))
  361. }
  362. }
  363. struct ListeningState {
  364. multiple: usize,
  365. }
  366. impl Listening for ListeningState {
  367. actor_name!("callback_server");
  368. type HandleRegisterListening = ListeningState;
  369. type HandleRegisterWorking = WorkingState;
  370. type HandleRegisterFut = Ready<TransResult<Self, (ListeningState, WorkingState)>>;
  371. fn handle_register(self, arg: Register) -> Self::HandleRegisterFut {
  372. let multiple = self.multiple;
  373. ready(TransResult::Ok((
  374. self,
  375. WorkingState {
  376. factor: arg.factor,
  377. multiple,
  378. },
  379. )))
  380. }
  381. }
  382. struct WorkingState {
  383. factor: usize,
  384. multiple: usize,
  385. }
  386. impl Working for WorkingState {
  387. actor_name!("callback_worker");
  388. type OnSendCompletedFut = Ready<TransResult<Self, (End, Completed)>>;
  389. fn on_send_completed(self) -> Self::OnSendCompletedFut {
  390. let value = self.multiple * self.factor;
  391. ready(TransResult::Ok((End, Completed { value })))
  392. }
  393. }
  394. use ::tokio::sync::Mutex;
  395. enum ClientState<Init: Unregistered> {
  396. Unregistered(Init),
  397. Registered(Init::OnSendRegisterRegistered),
  398. End(End),
  399. }
  400. impl<Init: Unregistered> Named for ClientState<Init> {
  401. fn name(&self) -> Arc<String> {
  402. static UNREGISTERED_NAME: Lazy<Arc<String>> =
  403. Lazy::new(|| Arc::new("Unregistered".into()));
  404. static REGISTERED_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("Registered".into()));
  405. static END_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("End".into()));
  406. match self {
  407. Self::Unregistered(_) => UNREGISTERED_NAME.clone(),
  408. Self::Registered(_) => REGISTERED_NAME.clone(),
  409. Self::End(_) => END_NAME.clone(),
  410. }
  411. }
  412. }
  413. struct ClientHandle<Init: Unregistered> {
  414. runtime: &'static Runtime,
  415. state: Arc<Mutex<Option<ClientState<Init>>>>,
  416. name: ActorName,
  417. }
  418. impl<Init: Unregistered> ClientHandle<Init> {
  419. async fn send_register(&self, to: ServiceAddr, mut msg: Register) -> Result<()> {
  420. let mut guard = self.state.lock().await;
  421. let state = guard
  422. .take()
  423. .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
  424. let new_state = match state {
  425. ClientState::Unregistered(state) => match state.on_send_register(&mut msg).await {
  426. TransResult::Ok(new_state) => {
  427. let msg = ClientCallbackMsgs::Register(msg);
  428. self.runtime
  429. .send_service(to, self.name.clone(), msg)
  430. .await?;
  431. ClientState::Registered(new_state)
  432. }
  433. TransResult::Abort { from, err } => {
  434. log::warn!(
  435. "Aborted transition from the {} state: {}",
  436. "Unregistered",
  437. err
  438. );
  439. ClientState::Unregistered(from)
  440. }
  441. TransResult::Fatal { err } => {
  442. return Err(err);
  443. }
  444. },
  445. state => state,
  446. };
  447. *guard = Some(new_state);
  448. Ok(())
  449. }
  450. }
  451. async fn spawn_client<Init>(init: Init, runtime: &'static Runtime) -> ClientHandle<Init>
  452. where
  453. Init: 'static + Unregistered,
  454. {
  455. let state = Arc::new(Mutex::new(Some(ClientState::Unregistered(init))));
  456. let name = {
  457. let state = state.clone();
  458. runtime.spawn(None, move |mut mailbox, actor_id, _| async move {
  459. while let Some(envelope) = mailbox.recv().await {
  460. let mut guard = state.lock().await;
  461. let state = guard.take()
  462. .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
  463. let new_state = match envelope {
  464. Envelope::Send { msg, .. } => {
  465. match (state, msg) {
  466. (ClientState::Registered(curr_state), ClientCallbackMsgs::Completed(msg)) => {
  467. match curr_state.handle_completed(msg).await {
  468. TransResult::Ok(next) => ClientState::<Init>::End(next),
  469. TransResult::Abort { from, err } => {
  470. log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Registered", "Completed", err);
  471. ClientState::Registered(from)
  472. }
  473. TransResult::Fatal { err } => {
  474. panic_any(ActorError::new(
  475. err,
  476. ActorErrorPayload {
  477. actor_id,
  478. actor_impl: Init::actor_impl(),
  479. state: Init::OnSendRegisterRegistered::state_name(),
  480. message: ClientCallbackMsgKinds::Completed.name(),
  481. kind: TransKind::Receive,
  482. }));
  483. }
  484. }
  485. }
  486. (state, msg) => {
  487. log::error!("Unexpected message {} in state {}.", msg.name(), state.name());
  488. state
  489. }
  490. }
  491. }
  492. envelope => return Err(ActorError::new(
  493. bterr!("Unexpected envelope type: {}", envelope.name()),
  494. ActorErrorPayload {
  495. actor_id,
  496. actor_impl: Init::actor_impl(),
  497. state: state.name(),
  498. message: envelope.msg_name(),
  499. kind: TransKind::Receive,
  500. }))
  501. };
  502. *guard = Some(new_state);
  503. }
  504. Ok(actor_id)
  505. }).await.unwrap()
  506. };
  507. ClientHandle {
  508. runtime,
  509. state,
  510. name,
  511. }
  512. }
  513. async fn register_server<Init, F>(
  514. make_init: F,
  515. runtime: &'static Runtime,
  516. service_id: ServiceId,
  517. ) -> Result<ServiceName>
  518. where
  519. Init: 'static + Listening<HandleRegisterListening = Init>,
  520. F: 'static + Send + Sync + Clone + Fn() -> Init,
  521. {
  522. enum ServerState<S: Listening> {
  523. Listening(S),
  524. }
  525. impl<S: Listening> Named for ServerState<S> {
  526. fn name(&self) -> Arc<String> {
  527. static LISTENING_NAME: Lazy<Arc<String>> =
  528. Lazy::new(|| Arc::new("Listening".into()));
  529. match self {
  530. Self::Listening(_) => LISTENING_NAME.clone(),
  531. }
  532. }
  533. }
  534. async fn server_loop<Init, F>(
  535. runtime: &'static Runtime,
  536. make_init: F,
  537. mut mailbox: Mailbox<ClientCallbackMsgs>,
  538. actor_id: ActorId,
  539. ) -> ActorResult
  540. where
  541. Init: 'static + Listening<HandleRegisterListening = Init>,
  542. F: 'static + Send + Sync + Fn() -> Init,
  543. {
  544. let mut state = ServerState::Listening(make_init());
  545. while let Some(envelope) = mailbox.recv().await {
  546. let new_state = match envelope {
  547. Envelope::Send { msg, from, .. } => match (state, msg) {
  548. (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
  549. match curr_state.handle_register(msg).await {
  550. TransResult::Ok((new_state, working_state)) => {
  551. start_worker(working_state, from, runtime).await;
  552. ServerState::Listening(new_state)
  553. }
  554. TransResult::Abort { from, err } => {
  555. log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Listening", "Register", err);
  556. ServerState::Listening(from)
  557. }
  558. TransResult::Fatal { err } => {
  559. let err = ActorError::new(
  560. err,
  561. ActorErrorPayload {
  562. actor_id,
  563. actor_impl: Init::actor_impl(),
  564. state: Init::state_name(),
  565. message: ClientCallbackMsgKinds::Register.name(),
  566. kind: TransKind::Receive,
  567. },
  568. );
  569. panic_any(format!("{err}"));
  570. }
  571. }
  572. }
  573. (state, msg) => {
  574. log::error!(
  575. "Unexpected message {} in state {}.",
  576. msg.name(),
  577. state.name()
  578. );
  579. state
  580. }
  581. },
  582. envelope => {
  583. return Err(ActorError::new(
  584. bterr!("Unexpected envelope type: {}", envelope.name()),
  585. ActorErrorPayload {
  586. actor_id,
  587. actor_impl: Init::actor_impl(),
  588. state: state.name(),
  589. message: envelope.msg_name(),
  590. kind: TransKind::Receive,
  591. },
  592. ))
  593. }
  594. };
  595. state = new_state;
  596. }
  597. Ok(actor_id)
  598. }
  599. runtime
  600. .register::<ClientCallbackMsgs, _>(service_id, move |runtime: &'static Runtime| {
  601. let make_init = make_init.clone();
  602. let fut = async move {
  603. let make_init = make_init.clone();
  604. let actor_impl = runtime
  605. .spawn(None, move |mailbox, act_id, runtime| {
  606. server_loop(runtime, make_init, mailbox, act_id)
  607. })
  608. .await
  609. .unwrap();
  610. Ok(actor_impl)
  611. };
  612. Box::pin(fut)
  613. })
  614. .await
  615. }
  616. async fn start_worker<Init>(
  617. init: Init,
  618. owned: ActorName,
  619. runtime: &'static Runtime,
  620. ) -> ActorName
  621. where
  622. Init: 'static + Working,
  623. {
  624. enum WorkerState<S: Working> {
  625. Working(S),
  626. }
  627. runtime
  628. .spawn::<ClientCallbackMsgs, _, _>(None, move |_, actor_id, _| async move {
  629. let msg = match init.on_send_completed().await {
  630. TransResult::Ok((End, msg)) => msg,
  631. TransResult::Abort { err, .. } | TransResult::Fatal { err } => {
  632. let err = ActorError::new(
  633. err,
  634. ActorErrorPayload {
  635. actor_id,
  636. actor_impl: Init::actor_impl(),
  637. state: Init::state_name(),
  638. message: ClientCallbackMsgKinds::Completed.name(),
  639. kind: TransKind::Send,
  640. },
  641. );
  642. panic_any(format!("{err}"))
  643. }
  644. };
  645. let from = runtime.actor_name(actor_id);
  646. let msg = ClientCallbackMsgs::Completed(msg);
  647. runtime.send(owned, from, msg).await.unwrap_or_else(|err| {
  648. let err = ActorError::new(
  649. err,
  650. ActorErrorPayload {
  651. actor_id,
  652. actor_impl: Init::actor_impl(),
  653. state: Init::state_name(),
  654. message: ClientCallbackMsgKinds::Completed.name(),
  655. kind: TransKind::Send,
  656. },
  657. );
  658. panic_any(format!("{err}"));
  659. });
  660. Ok(actor_id)
  661. })
  662. .await
  663. .unwrap()
  664. }
  665. #[test]
  666. fn client_callback_protocol() {
  667. ASYNC_RT.block_on(async {
  668. const SERVICE_ID: &str = "ClientCallbackProtocolListening";
  669. let service_id = ServiceId::from(SERVICE_ID);
  670. let service_name = {
  671. let make_init = move || ListeningState { multiple: 2 };
  672. register_server(make_init, &RUNTIME, service_id.clone())
  673. .await
  674. .unwrap()
  675. };
  676. let (sender, receiver) = oneshot::channel();
  677. let client_handle = spawn_client(UnregisteredState { sender }, &RUNTIME).await;
  678. let service_addr = ServiceAddr::new(service_name, false);
  679. client_handle
  680. .send_register(service_addr, Register { factor: 21 })
  681. .await
  682. .unwrap();
  683. let value = timeout(Duration::from_millis(500), receiver)
  684. .await
  685. .unwrap()
  686. .unwrap();
  687. assert_eq!(42, value);
  688. });
  689. }
  690. }