runtime_tests.rs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. #![feature(impl_trait_in_assoc_type)]
  2. use btrun::model::*;
  3. use btrun::test_setup;
  4. use btrun::*;
  5. use btlib::Result;
  6. use btproto::protocol;
  7. use log;
  8. use once_cell::sync::Lazy;
  9. use serde::{Deserialize, Serialize};
  10. use std::{
  11. future::{ready, Future, Ready},
  12. sync::{
  13. atomic::{AtomicU8, Ordering},
  14. Arc,
  15. },
  16. };
  17. test_setup!();
  18. mod ping_pong {
  19. use super::*;
  20. use btlib::bterr;
  21. // The following code is a proof-of-concept for what types should be generated for a
  22. // simple ping-pong protocol:
  23. protocol! {
  24. named PingProtocol;
  25. let server = [Server];
  26. let client = [Client];
  27. Client -> End, >service(Server)!Ping;
  28. Server?Ping -> End, >Client!Ping::Reply;
  29. }
  30. //
  31. // In words, the protocol is described as follows.
  32. // 1. When the Listening state receives the Ping message it returns the End state and a
  33. // Ping::Reply message to be sent to the SentPing state.
  34. // 2. When the SentPing state receives the Ping::Reply message it returns the End state.
  35. //
  36. // The End state represents an end to the session described by the protocol. When an actor
  37. // transitions to the End state its function returns.
  38. // When a state is expecting a Reply message, an error occurs if the message is not received
  39. // in a timely manner.
  40. enum PingClientState<T: Client> {
  41. Client(T),
  42. End(End),
  43. }
  44. impl<T: Client> PingClientState<T> {
  45. const fn name(&self) -> &'static str {
  46. match self {
  47. Self::Client(_) => "Client",
  48. Self::End(_) => "End",
  49. }
  50. }
  51. }
  52. struct ClientHandle<T: Client> {
  53. state: Option<PingClientState<T>>,
  54. client_name: ActorName,
  55. runtime: &'static Runtime,
  56. }
  57. impl<T: Client> ClientHandle<T> {
  58. async fn send_ping(&mut self, mut msg: Ping, service: ServiceAddr) -> Result<PingReply> {
  59. let state = self
  60. .state
  61. .take()
  62. .ok_or_else(|| bterr!("State was not returned."))?;
  63. let (new_state, result) = match state {
  64. PingClientState::Client(state) => match state.on_send_ping(&mut msg).await {
  65. TransResult::Ok((new_state, _)) => {
  66. let new_state = PingClientState::End(new_state);
  67. let result = self
  68. .runtime
  69. .call_service(
  70. service,
  71. self.client_name.clone(),
  72. PingProtocolMsgs::Ping(msg),
  73. )
  74. .await;
  75. (new_state, result)
  76. }
  77. TransResult::Abort { from, err } => {
  78. let new_state = PingClientState::Client(from);
  79. (new_state, Err(err))
  80. }
  81. TransResult::Fatal { err } => return Err(err),
  82. },
  83. state => {
  84. let result = Err(bterr!("Can't send Ping in state {}.", state.name()));
  85. (state, result)
  86. }
  87. };
  88. self.state = Some(new_state);
  89. let reply = result?;
  90. match reply {
  91. PingProtocolMsgs::PingReply(reply) => Ok(reply),
  92. msg => Err(bterr!(
  93. "Unexpected message type sent in reply: {}",
  94. msg.name()
  95. )),
  96. }
  97. }
  98. }
  99. async fn spawn_client_manual<T: Client>(init: T, runtime: &'static Runtime) -> ClientHandle<T> {
  100. let state = Some(PingClientState::Client(init));
  101. let client_name = runtime.spawn(None, do_nothing_actor).await.unwrap();
  102. ClientHandle {
  103. state,
  104. client_name,
  105. runtime,
  106. }
  107. }
  108. async fn register_server_manual<Init, F>(
  109. make_init: F,
  110. rt: &'static Runtime,
  111. id: ServiceId,
  112. ) -> Result<ServiceName>
  113. where
  114. Init: 'static + Server,
  115. F: 'static + Send + Sync + Clone + Fn() -> Init,
  116. {
  117. enum ServerState<S> {
  118. Server(S),
  119. End(End),
  120. }
  121. impl<S> Named for ServerState<S> {
  122. fn name(&self) -> Arc<String> {
  123. static SERVER_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("Server".into()));
  124. static END_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("End".into()));
  125. match self {
  126. Self::Server(_) => SERVER_NAME.clone(),
  127. Self::End(_) => END_NAME.clone(),
  128. }
  129. }
  130. }
  131. async fn server_loop<Init, F>(
  132. _runtime: &'static Runtime,
  133. make_init: F,
  134. mut mailbox: Mailbox<PingProtocolMsgs>,
  135. actor_id: ActorId,
  136. ) -> ActorResult
  137. where
  138. Init: 'static + Server,
  139. F: 'static + Send + Sync + FnOnce() -> Init,
  140. {
  141. let mut state = ServerState::Server(make_init());
  142. while let Some(envelope) = mailbox.recv().await {
  143. state = match envelope {
  144. Envelope::Call {
  145. msg,
  146. reply: replier,
  147. ..
  148. } => match (state, msg) {
  149. (ServerState::Server(listening_state), PingProtocolMsgs::Ping(msg)) => {
  150. match listening_state.handle_ping(msg).await {
  151. TransResult::Ok((new_state, reply)) => {
  152. let replier = replier
  153. .ok_or_else(|| bterr!("Reply has already been sent."))
  154. .unwrap();
  155. //let replier =
  156. // replier.expect("The reply has already been sent.");
  157. if let Err(_) = replier.send(PingProtocolMsgs::PingReply(reply))
  158. {
  159. return Err(ActorError::new(
  160. bterr!("Failed to send Ping reply."),
  161. ActorErrorPayload {
  162. actor_id,
  163. actor_impl: Init::actor_impl(),
  164. state: Init::state_name(),
  165. message: PingProtocolMsgKinds::Ping.name(),
  166. kind: TransKind::Receive,
  167. },
  168. ));
  169. }
  170. ServerState::End(new_state)
  171. }
  172. TransResult::Abort { from, err } => {
  173. log::warn!("Aborted transition from the {} while handling the {} message: {}", "Server", "Ping", err);
  174. ServerState::Server(from)
  175. }
  176. TransResult::Fatal { err } => {
  177. return Err(ActorError::new(
  178. err,
  179. ActorErrorPayload {
  180. actor_id,
  181. actor_impl: Init::actor_impl(),
  182. state: Init::state_name(),
  183. message: PingProtocolMsgKinds::Ping.name(),
  184. kind: TransKind::Receive,
  185. },
  186. ));
  187. }
  188. }
  189. }
  190. (state, _) => state,
  191. },
  192. envelope => {
  193. return Err(ActorError::new(
  194. bterr!("Unexpected envelope type: {}", envelope.name()),
  195. ActorErrorPayload {
  196. actor_id,
  197. actor_impl: Init::actor_impl(),
  198. state: state.name(),
  199. message: envelope.msg_name(),
  200. kind: TransKind::Receive,
  201. },
  202. ))
  203. }
  204. };
  205. if let ServerState::End(_) = state {
  206. break;
  207. }
  208. }
  209. Ok(actor_id)
  210. }
  211. rt.register::<PingProtocolMsgs, _>(id, move |runtime| {
  212. let make_init = make_init.clone();
  213. let fut = async move {
  214. let actor_impl = runtime
  215. .spawn(None, move |mailbox, act_id, runtime| {
  216. server_loop(runtime, make_init, mailbox, act_id)
  217. })
  218. .await
  219. .unwrap();
  220. Ok(actor_impl)
  221. };
  222. Box::pin(fut)
  223. })
  224. .await
  225. }
  226. #[derive(Serialize, Deserialize)]
  227. pub struct Ping;
  228. impl CallMsg for Ping {
  229. type Reply = PingReply;
  230. }
  231. #[derive(Serialize, Deserialize)]
  232. pub struct PingReply;
  233. struct ClientState {
  234. counter: Arc<AtomicU8>,
  235. }
  236. impl ClientState {
  237. fn new(counter: Arc<AtomicU8>) -> Self {
  238. counter.fetch_add(1, Ordering::SeqCst);
  239. Self { counter }
  240. }
  241. }
  242. impl Client for ClientState {
  243. actor_name!("ping_client");
  244. type OnSendPingFut = impl Future<Output = TransResult<Self, (End, PingReply)>>;
  245. fn on_send_ping(self, _msg: &mut Ping) -> Self::OnSendPingFut {
  246. self.counter.fetch_sub(1, Ordering::SeqCst);
  247. ready(TransResult::Ok((End, PingReply)))
  248. }
  249. }
  250. struct ServerState {
  251. counter: Arc<AtomicU8>,
  252. }
  253. impl ServerState {
  254. fn new(counter: Arc<AtomicU8>) -> Self {
  255. counter.fetch_add(1, Ordering::SeqCst);
  256. Self { counter }
  257. }
  258. }
  259. impl Server for ServerState {
  260. actor_name!("ping_server");
  261. type HandlePingFut = impl Future<Output = TransResult<Self, (End, PingReply)>>;
  262. fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
  263. self.counter.fetch_sub(1, Ordering::SeqCst);
  264. ready(TransResult::Ok((End, PingReply)))
  265. }
  266. }
  267. #[test]
  268. fn ping_pong_test() {
  269. ASYNC_RT.block_on(async {
  270. const SERVICE_ID: &str = "PingPongProtocolServer";
  271. let service_id = ServiceId::from(SERVICE_ID);
  272. let counter = Arc::new(AtomicU8::new(0));
  273. let service_name = {
  274. let service_counter = counter.clone();
  275. let make_init = move || {
  276. let server_counter = service_counter.clone();
  277. ServerState::new(server_counter)
  278. };
  279. register_server_manual(make_init, &RUNTIME, service_id.clone())
  280. .await
  281. .unwrap()
  282. };
  283. let mut client_handle =
  284. spawn_client_manual(ClientState::new(counter.clone()), &RUNTIME).await;
  285. let service_addr = ServiceAddr::new(service_name, true);
  286. client_handle.send_ping(Ping, service_addr).await.unwrap();
  287. assert_eq!(0, counter.load(Ordering::SeqCst));
  288. RUNTIME.deregister(&service_id, None).await.unwrap();
  289. });
  290. }
  291. }
  292. mod travel_agency {
  293. use super::*;
  294. // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
  295. // example in the survey paper "Behavioral Types in Programming Languages."
  296. // Note that the Choosing state can send messages at any time, not just in response to another
  297. // message because there is a transition from Choosing that doesn't use the receive operator
  298. // (`?`).
  299. protocol! {
  300. named TravelAgency;
  301. let agency = [Listening];
  302. let customer = [Choosing];
  303. Choosing -> Choosing, >service(Listening)!Query;
  304. Choosing -> Choosing, >service(Listening)!Accept;
  305. Choosing -> Choosing, >service(Listening)!Reject;
  306. Listening?Query -> Listening, >Choosing!Query::Reply;
  307. Choosing?Query::Reply -> Choosing;
  308. Listening?Accept -> End, >Choosing!Accept::Reply;
  309. Choosing?Accept::Reply -> End;
  310. Listening?Reject -> End, >Choosing!Reject::Reply;
  311. Choosing?Reject::Reply -> End;
  312. }
  313. #[derive(Serialize, Deserialize)]
  314. pub struct Query;
  315. impl CallMsg for Query {
  316. type Reply = ();
  317. }
  318. #[derive(Serialize, Deserialize)]
  319. pub struct Reject;
  320. impl CallMsg for Reject {
  321. type Reply = ();
  322. }
  323. #[derive(Serialize, Deserialize)]
  324. pub struct Accept;
  325. impl CallMsg for Accept {
  326. type Reply = ();
  327. }
  328. }
  329. #[allow(dead_code)]
  330. mod client_callback {
  331. use super::*;
  332. use btlib::bterr;
  333. use once_cell::sync::Lazy;
  334. use std::{panic::panic_any, time::Duration};
  335. use tokio::{sync::oneshot, time::timeout};
  336. #[derive(Serialize, Deserialize)]
  337. pub struct Register {
  338. factor: usize,
  339. }
  340. #[derive(Serialize, Deserialize)]
  341. pub struct Completed {
  342. value: usize,
  343. }
  344. protocol! {
  345. named ClientCallback;
  346. let server = [Listening];
  347. let worker = [Working];
  348. let client = [Unregistered, Registered];
  349. Unregistered -> Registered, >service(Listening)!Register[Registered];
  350. Listening?Register[Registered] -> Listening, Working[Registered];
  351. Working[Registered] -> End, >Registered!Completed;
  352. Registered?Completed -> End;
  353. }
  354. struct UnregisteredState {
  355. sender: oneshot::Sender<usize>,
  356. }
  357. impl Unregistered for UnregisteredState {
  358. actor_name!("callback_client");
  359. type OnSendRegisterRegistered = RegisteredState;
  360. type OnSendRegisterFut = Ready<TransResult<Self, Self::OnSendRegisterRegistered>>;
  361. fn on_send_register(self, _arg: &mut Register) -> Self::OnSendRegisterFut {
  362. ready(TransResult::Ok(RegisteredState {
  363. sender: self.sender,
  364. }))
  365. }
  366. }
  367. struct RegisteredState {
  368. sender: oneshot::Sender<usize>,
  369. }
  370. impl Registered for RegisteredState {
  371. type HandleCompletedFut = Ready<TransResult<Self, End>>;
  372. fn handle_completed(self, arg: Completed) -> Self::HandleCompletedFut {
  373. self.sender.send(arg.value).unwrap();
  374. ready(TransResult::Ok(End))
  375. }
  376. }
  377. struct ListeningState {
  378. multiple: usize,
  379. }
  380. impl Listening for ListeningState {
  381. actor_name!("callback_server");
  382. type HandleRegisterListening = ListeningState;
  383. type HandleRegisterWorking = WorkingState;
  384. type HandleRegisterFut = Ready<TransResult<Self, (ListeningState, WorkingState)>>;
  385. fn handle_register(self, arg: Register) -> Self::HandleRegisterFut {
  386. let multiple = self.multiple;
  387. ready(TransResult::Ok((
  388. self,
  389. WorkingState {
  390. factor: arg.factor,
  391. multiple,
  392. },
  393. )))
  394. }
  395. }
  396. struct WorkingState {
  397. factor: usize,
  398. multiple: usize,
  399. }
  400. impl Working for WorkingState {
  401. actor_name!("callback_worker");
  402. type OnSendCompletedFut = Ready<TransResult<Self, (End, Completed)>>;
  403. fn on_send_completed(self) -> Self::OnSendCompletedFut {
  404. let value = self.multiple * self.factor;
  405. ready(TransResult::Ok((End, Completed { value })))
  406. }
  407. }
  408. use ::tokio::sync::Mutex;
  409. enum ClientState<Init: Unregistered> {
  410. Unregistered(Init),
  411. Registered(Init::OnSendRegisterRegistered),
  412. End(End),
  413. }
  414. impl<Init: Unregistered> Named for ClientState<Init> {
  415. fn name(&self) -> Arc<String> {
  416. static UNREGISTERED_NAME: Lazy<Arc<String>> =
  417. Lazy::new(|| Arc::new("Unregistered".into()));
  418. static REGISTERED_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("Registered".into()));
  419. static END_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("End".into()));
  420. match self {
  421. Self::Unregistered(_) => UNREGISTERED_NAME.clone(),
  422. Self::Registered(_) => REGISTERED_NAME.clone(),
  423. Self::End(_) => END_NAME.clone(),
  424. }
  425. }
  426. }
  427. struct ClientHandle<Init: Unregistered> {
  428. runtime: &'static Runtime,
  429. state: Arc<Mutex<Option<ClientState<Init>>>>,
  430. name: ActorName,
  431. }
  432. impl<Init: Unregistered> ClientHandle<Init> {
  433. async fn send_register(&self, to: ServiceAddr, mut msg: Register) -> Result<()> {
  434. let mut guard = self.state.lock().await;
  435. let state = guard
  436. .take()
  437. .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
  438. let new_state = match state {
  439. ClientState::Unregistered(state) => match state.on_send_register(&mut msg).await {
  440. TransResult::Ok(new_state) => {
  441. let msg = ClientCallbackMsgs::Register(msg);
  442. self.runtime
  443. .send_service(to, self.name.clone(), msg)
  444. .await?;
  445. ClientState::Registered(new_state)
  446. }
  447. TransResult::Abort { from, err } => {
  448. log::warn!(
  449. "Aborted transition from the {} state: {}",
  450. "Unregistered",
  451. err
  452. );
  453. ClientState::Unregistered(from)
  454. }
  455. TransResult::Fatal { err } => {
  456. return Err(err);
  457. }
  458. },
  459. state => state,
  460. };
  461. *guard = Some(new_state);
  462. Ok(())
  463. }
  464. }
  465. async fn spawn_client_manual<Init>(init: Init, runtime: &'static Runtime) -> ClientHandle<Init>
  466. where
  467. Init: 'static + Unregistered,
  468. {
  469. let state = Arc::new(Mutex::new(Some(ClientState::Unregistered(init))));
  470. let name = {
  471. let state = state.clone();
  472. runtime.spawn(None, move |mut mailbox, actor_id, _| async move {
  473. while let Some(envelope) = mailbox.recv().await {
  474. let mut guard = state.lock().await;
  475. let state = guard.take()
  476. .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
  477. let new_state = match envelope {
  478. Envelope::Send { msg, .. } => {
  479. match (state, msg) {
  480. (ClientState::Registered(curr_state), ClientCallbackMsgs::Completed(msg)) => {
  481. match curr_state.handle_completed(msg).await {
  482. TransResult::Ok(next) => ClientState::<Init>::End(next),
  483. TransResult::Abort { from, err } => {
  484. log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Registered", "Completed", err);
  485. ClientState::Registered(from)
  486. }
  487. TransResult::Fatal { err } => {
  488. panic_any(ActorError::new(
  489. err,
  490. ActorErrorPayload {
  491. actor_id,
  492. actor_impl: Init::actor_impl(),
  493. state: Init::OnSendRegisterRegistered::state_name(),
  494. message: ClientCallbackMsgKinds::Completed.name(),
  495. kind: TransKind::Receive,
  496. }));
  497. }
  498. }
  499. }
  500. (state, msg) => {
  501. log::error!("Unexpected message {} in state {}.", msg.name(), state.name());
  502. state
  503. }
  504. }
  505. }
  506. envelope => return Err(ActorError::new(
  507. bterr!("Unexpected envelope type: {}", envelope.name()),
  508. ActorErrorPayload {
  509. actor_id,
  510. actor_impl: Init::actor_impl(),
  511. state: state.name(),
  512. message: envelope.msg_name(),
  513. kind: TransKind::Receive,
  514. }))
  515. };
  516. *guard = Some(new_state);
  517. }
  518. Ok(actor_id)
  519. }).await.unwrap()
  520. };
  521. ClientHandle {
  522. runtime,
  523. state,
  524. name,
  525. }
  526. }
  527. async fn register_server_manual<Init, F>(
  528. make_init: F,
  529. runtime: &'static Runtime,
  530. service_id: ServiceId,
  531. ) -> Result<ServiceName>
  532. where
  533. Init: 'static + Listening<HandleRegisterListening = Init>,
  534. F: 'static + Send + Sync + Clone + Fn() -> Init,
  535. {
  536. enum ServerState<Init: Listening> {
  537. Listening(Init),
  538. }
  539. impl<S: Listening> Named for ServerState<S> {
  540. fn name(&self) -> Arc<String> {
  541. static LISTENING_NAME: Lazy<Arc<String>> =
  542. Lazy::new(|| Arc::new("Listening".into()));
  543. match self {
  544. Self::Listening(_) => LISTENING_NAME.clone(),
  545. }
  546. }
  547. }
  548. async fn server_loop<Init, F>(
  549. runtime: &'static Runtime,
  550. make_init: F,
  551. mut mailbox: Mailbox<ClientCallbackMsgs>,
  552. actor_id: ActorId,
  553. ) -> ActorResult
  554. where
  555. Init: 'static + Listening<HandleRegisterListening = Init>,
  556. F: 'static + Send + Sync + Fn() -> Init,
  557. {
  558. let mut state = ServerState::Listening(make_init());
  559. while let Some(envelope) = mailbox.recv().await {
  560. let new_state = match envelope {
  561. Envelope::Send { msg, from, .. } => match (state, msg) {
  562. (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
  563. match curr_state.handle_register(msg).await {
  564. TransResult::Ok((new_state, working_state)) => {
  565. start_worker_manual(working_state, from, runtime).await;
  566. ServerState::Listening(new_state)
  567. }
  568. TransResult::Abort { from, err } => {
  569. log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Listening", "Register", err);
  570. ServerState::Listening(from)
  571. }
  572. TransResult::Fatal { err } => {
  573. let err = ActorError::new(
  574. err,
  575. ActorErrorPayload {
  576. actor_id,
  577. actor_impl: Init::actor_impl(),
  578. state: Init::state_name(),
  579. message: ClientCallbackMsgKinds::Register.name(),
  580. kind: TransKind::Receive,
  581. },
  582. );
  583. panic_any(format!("{err}"));
  584. }
  585. }
  586. }
  587. (state, msg) => {
  588. log::error!(
  589. "Unexpected message {} in state {}.",
  590. msg.name(),
  591. state.name()
  592. );
  593. state
  594. }
  595. },
  596. envelope => {
  597. return Err(ActorError::new(
  598. bterr!("Unexpected envelope type: {}", envelope.name()),
  599. ActorErrorPayload {
  600. actor_id,
  601. actor_impl: Init::actor_impl(),
  602. state: state.name(),
  603. message: envelope.msg_name(),
  604. kind: TransKind::Receive,
  605. },
  606. ))
  607. }
  608. };
  609. state = new_state;
  610. }
  611. Ok(actor_id)
  612. }
  613. runtime
  614. .register::<ClientCallbackMsgs, _>(service_id, move |runtime: &'static Runtime| {
  615. let make_init = make_init.clone();
  616. let fut = async move {
  617. let make_init = make_init.clone();
  618. let actor_impl = runtime
  619. .spawn(None, move |mailbox, act_id, runtime| {
  620. server_loop(runtime, make_init, mailbox, act_id)
  621. })
  622. .await
  623. .unwrap();
  624. Ok(actor_impl)
  625. };
  626. Box::pin(fut)
  627. })
  628. .await
  629. }
  630. async fn start_worker_manual<Init>(
  631. init: Init,
  632. owned: ActorName,
  633. runtime: &'static Runtime,
  634. ) -> ActorName
  635. where
  636. Init: 'static + Working,
  637. {
  638. enum WorkerState<S: Working> {
  639. Working(S),
  640. }
  641. runtime
  642. .spawn::<ClientCallbackMsgs, _, _>(
  643. Some(owned.clone()),
  644. move |_, actor_id, _| async move {
  645. let msg = match init.on_send_completed().await {
  646. TransResult::Ok((End, msg)) => msg,
  647. TransResult::Abort { err, .. } | TransResult::Fatal { err } => {
  648. let err = ActorError::new(
  649. err,
  650. ActorErrorPayload {
  651. actor_id,
  652. actor_impl: Init::actor_impl(),
  653. state: Init::state_name(),
  654. message: ClientCallbackMsgKinds::Completed.name(),
  655. kind: TransKind::Send,
  656. },
  657. );
  658. panic_any(format!("{err}"))
  659. }
  660. };
  661. let from = runtime.actor_name(actor_id);
  662. let msg = ClientCallbackMsgs::Completed(msg);
  663. runtime.send(owned, from, msg).await.unwrap_or_else(|err| {
  664. let err = ActorError::new(
  665. err,
  666. ActorErrorPayload {
  667. actor_id,
  668. actor_impl: Init::actor_impl(),
  669. state: Init::state_name(),
  670. message: ClientCallbackMsgKinds::Completed.name(),
  671. kind: TransKind::Send,
  672. },
  673. );
  674. panic_any(format!("{err}"));
  675. });
  676. Ok(actor_id)
  677. },
  678. )
  679. .await
  680. .unwrap()
  681. }
  682. #[test]
  683. fn client_callback_protocol() {
  684. ASYNC_RT.block_on(async {
  685. const SERVICE_ID: &str = "ClientCallbackProtocolListening";
  686. let service_id = ServiceId::from(SERVICE_ID);
  687. let service_name = {
  688. let make_init = move || ListeningState { multiple: 2 };
  689. register_server_manual(make_init, &RUNTIME, service_id.clone())
  690. .await
  691. .unwrap()
  692. };
  693. let (sender, receiver) = oneshot::channel();
  694. let client_handle = spawn_client_manual(UnregisteredState { sender }, &RUNTIME).await;
  695. let service_addr = ServiceAddr::new(service_name, false);
  696. client_handle
  697. .send_register(service_addr, Register { factor: 21 })
  698. .await
  699. .unwrap();
  700. let value = timeout(Duration::from_millis(500), receiver)
  701. .await
  702. .unwrap()
  703. .unwrap();
  704. assert_eq!(42, value);
  705. });
  706. }
  707. }