runtime_tests.rs 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734
  1. #![feature(impl_trait_in_assoc_type)]
  2. use btrun::*;
  3. use btlib::{
  4. crypto::{ConcreteCreds, CredStore, CredsPriv},
  5. log::BuilderExt,
  6. Result,
  7. };
  8. use btlib_tests::TEST_STORE;
  9. use btproto::protocol;
  10. use btserde::to_vec;
  11. use bttp::{BlockAddr, Transmitter};
  12. use ctor::ctor;
  13. use lazy_static::lazy_static;
  14. use log;
  15. use serde::{Deserialize, Serialize};
  16. use std::{
  17. future::{ready, Future, Ready},
  18. net::{IpAddr, Ipv4Addr},
  19. sync::{
  20. atomic::{AtomicU8, Ordering},
  21. Arc,
  22. },
  23. };
  24. use tokio::{runtime::Builder, sync::mpsc};
  25. use uuid::Uuid;
  26. const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
  27. lazy_static! {
  28. static ref RUNTIME_CREDS: Arc<ConcreteCreds> = TEST_STORE.node_creds().unwrap();
  29. }
  30. declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
  31. lazy_static! {
  32. /// A tokio async runtime.
  33. ///
  34. /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
  35. /// is created for each test
  36. /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
  37. /// This creates a problem, because the first test thread to access the `RUNTIME` static
  38. /// will initialize its `Receiver` in its runtime, which will stop running at the end of
  39. /// the test. Hence subsequent tests will not be able to send remote messages to this
  40. /// `Runtime`.
  41. ///
  42. /// By creating a single async runtime which is used by all of the tests, we can avoid this
  43. /// problem.
  44. static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread()
  45. .enable_all()
  46. .build()
  47. .unwrap();
  48. }
  49. /// The log level to use when running tests.
  50. const LOG_LEVEL: &str = "warn";
  51. #[ctor]
  52. fn ctor() {
  53. std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
  54. env_logger::Builder::from_default_env().btformat().init();
  55. }
  56. #[derive(Serialize, Deserialize)]
  57. struct EchoMsg(String);
  58. impl CallMsg for EchoMsg {
  59. type Reply = EchoMsg;
  60. }
  61. async fn echo(
  62. _rt: &'static Runtime,
  63. mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
  64. _act_id: Uuid,
  65. ) {
  66. while let Some(envelope) = mailbox.recv().await {
  67. let (msg, kind) = envelope.split();
  68. match kind {
  69. EnvelopeKind::Call { reply } => {
  70. let replier = reply.unwrap_or_else(|| panic!("The reply has already been sent."));
  71. if let Err(_) = replier.send(msg) {
  72. panic!("failed to send reply");
  73. }
  74. }
  75. _ => panic!("Expected EchoMsg to be a Call Message."),
  76. }
  77. }
  78. }
  79. #[test]
  80. fn local_call() {
  81. ASYNC_RT.block_on(async {
  82. const EXPECTED: &str = "hello";
  83. let name = RUNTIME.spawn(echo).await;
  84. let from = ActorName::new(name.path().clone(), Uuid::default());
  85. let reply = RUNTIME
  86. .call(name.clone(), from, EchoMsg(EXPECTED.into()))
  87. .await
  88. .unwrap();
  89. assert_eq!(EXPECTED, reply.0);
  90. RUNTIME.take(&name).await.unwrap();
  91. })
  92. }
  93. #[test]
  94. fn remote_call() {
  95. ASYNC_RT.block_on(async {
  96. const EXPECTED: &str = "hello";
  97. let actor_name = RUNTIME.spawn(echo).await;
  98. let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
  99. let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
  100. let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
  101. .await
  102. .unwrap();
  103. let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
  104. let wire_msg = WireMsg::new(
  105. actor_name.clone(),
  106. RUNTIME.actor_name(Uuid::default()),
  107. &buf,
  108. );
  109. let reply = transmitter
  110. .call(wire_msg, ReplyCallback::<EchoMsg>::new())
  111. .await
  112. .unwrap()
  113. .unwrap();
  114. assert_eq!(EXPECTED, reply.0);
  115. RUNTIME.take(&actor_name).await.unwrap();
  116. });
  117. }
  118. /// Tests the `num_running` method.
  119. ///
  120. /// This test uses its own runtime and so can use the `#[tokio::test]` attribute.
  121. #[tokio::test]
  122. async fn num_running() {
  123. declare_runtime!(
  124. LOCAL_RT,
  125. // This needs to be different from the address where `RUNTIME` is listening.
  126. IpAddr::from([127, 0, 0, 2]),
  127. TEST_STORE.node_creds().unwrap()
  128. );
  129. assert_eq!(0, LOCAL_RT.num_running().await);
  130. let name = LOCAL_RT.spawn(echo).await;
  131. assert_eq!(1, LOCAL_RT.num_running().await);
  132. LOCAL_RT.take(&name).await.unwrap();
  133. assert_eq!(0, LOCAL_RT.num_running().await);
  134. }
  135. mod ping_pong {
  136. use super::*;
  137. use btlib::bterr;
  138. // The following code is a proof-of-concept for what types should be generated for a
  139. // simple ping-pong protocol:
  140. protocol! {
  141. named PingProtocol;
  142. let server = [Server];
  143. let client = [Client];
  144. Client -> End, >service(Server)!Ping;
  145. Server?Ping -> End, >Client!Ping::Reply;
  146. }
  147. //
  148. // In words, the protocol is described as follows.
  149. // 1. The ClientInit state receives the Activate message. It returns the SentPing state and a
  150. // Ping message to be sent to the Listening state.
  151. // 2. The ServerInit state receives the Activate message. It returns the Listening state.
  152. // 3. When the Listening state receives the Ping message it returns the End state and a
  153. // Ping::Reply message to be sent to the SentPing state.
  154. // 4. When the SentPing state receives the Ping::Reply message it returns the End state.
  155. //
  156. // The End state represents an end to the session described by the protocol. When an actor
  157. // transitions to the End state its function returns.
  158. // The generated actor implementation is the sender of the Activate message.
  159. // When a state is expecting a Reply message, an error occurs if the message is not received
  160. // in a timely manner.
  161. enum PingClientState<T: Client> {
  162. Client(T),
  163. End(End),
  164. }
  165. impl<T: Client> PingClientState<T> {
  166. const fn name(&self) -> &'static str {
  167. match self {
  168. Self::Client(_) => "Client",
  169. Self::End(_) => "End",
  170. }
  171. }
  172. }
  173. struct ClientHandle<T: Client> {
  174. state: Option<PingClientState<T>>,
  175. runtime: &'static Runtime,
  176. }
  177. impl<T: Client> ClientHandle<T> {
  178. async fn send_ping(&mut self, mut msg: Ping, service: ServiceAddr) -> Result<PingReply> {
  179. let state = self
  180. .state
  181. .take()
  182. .ok_or_else(|| bterr!("State was not returned."))?;
  183. let (new_state, result) = match state {
  184. PingClientState::Client(state) => {
  185. let (new_state, _) = state.on_send_ping(&mut msg).await?;
  186. let new_state = PingClientState::End(new_state);
  187. let result = self
  188. .runtime
  189. .call_service(service, PingProtocolMsgs::Ping(msg))
  190. .await;
  191. (new_state, result)
  192. }
  193. state => {
  194. let result = Err(bterr!("Can't send Ping in state {}.", state.name()));
  195. (state, result)
  196. }
  197. };
  198. self.state = Some(new_state);
  199. let reply = result?;
  200. match reply {
  201. PingProtocolMsgs::PingReply(reply) => Ok(reply),
  202. msg => Err(bterr!(
  203. "Unexpected message type sent in reply: {}",
  204. msg.name()
  205. )),
  206. }
  207. }
  208. }
  209. async fn spawn_client<T: Client>(init: T, runtime: &'static Runtime) -> ClientHandle<T> {
  210. let state = Some(PingClientState::Client(init));
  211. ClientHandle { state, runtime }
  212. }
  213. async fn register_server<Init, F>(
  214. make_init: F,
  215. rt: &'static Runtime,
  216. id: ServiceId,
  217. ) -> Result<ServiceName>
  218. where
  219. Init: 'static + Server,
  220. F: 'static + Send + Sync + Clone + Fn() -> Init,
  221. {
  222. enum ServerState<S> {
  223. Server(S),
  224. End(End),
  225. }
  226. async fn server_loop<Init, F>(
  227. _runtime: &'static Runtime,
  228. make_init: F,
  229. mut mailbox: Mailbox<PingProtocolMsgs>,
  230. _act_id: Uuid,
  231. ) where
  232. Init: 'static + Server,
  233. F: 'static + Send + Sync + FnOnce() -> Init,
  234. {
  235. let mut state = ServerState::Server(make_init());
  236. while let Some(envelope) = mailbox.recv().await {
  237. let (msg, msg_kind) = envelope.split();
  238. state = match (state, msg) {
  239. (ServerState::Server(listening_state), PingProtocolMsgs::Ping(msg)) => {
  240. let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
  241. match msg_kind {
  242. EnvelopeKind::Call { reply: replier } => {
  243. let replier = replier.expect("The reply has already been sent.");
  244. if let Err(_) = replier.send(PingProtocolMsgs::PingReply(reply)) {
  245. panic!("Failed to send Ping reply.");
  246. }
  247. ServerState::End(new_state)
  248. }
  249. _ => panic!("'Ping' was expected to be a Call message."),
  250. }
  251. }
  252. (state, _) => state,
  253. };
  254. if let ServerState::End(_) = state {
  255. break;
  256. }
  257. }
  258. }
  259. rt.register::<PingProtocolMsgs, _>(id, move |runtime| {
  260. let make_init = make_init.clone();
  261. let fut = async move {
  262. let actor_name = runtime
  263. .spawn(move |_, mailbox, act_id| {
  264. server_loop(runtime, make_init, mailbox, act_id)
  265. })
  266. .await;
  267. Ok(actor_name)
  268. };
  269. Box::pin(fut)
  270. })
  271. .await
  272. }
  273. #[derive(Serialize, Deserialize)]
  274. pub struct Ping;
  275. impl CallMsg for Ping {
  276. type Reply = PingReply;
  277. }
  278. #[derive(Serialize, Deserialize)]
  279. pub struct PingReply;
  280. struct ClientState {
  281. counter: Arc<AtomicU8>,
  282. }
  283. impl ClientState {
  284. fn new(counter: Arc<AtomicU8>) -> Self {
  285. counter.fetch_add(1, Ordering::SeqCst);
  286. Self { counter }
  287. }
  288. }
  289. impl Client for ClientState {
  290. type OnSendPingFut = impl Future<Output = Result<(End, PingReply)>>;
  291. fn on_send_ping(self, _msg: &mut Ping) -> Self::OnSendPingFut {
  292. self.counter.fetch_sub(1, Ordering::SeqCst);
  293. ready(Ok((End, PingReply)))
  294. }
  295. }
  296. struct ServerState {
  297. counter: Arc<AtomicU8>,
  298. }
  299. impl ServerState {
  300. fn new(counter: Arc<AtomicU8>) -> Self {
  301. counter.fetch_add(1, Ordering::SeqCst);
  302. Self { counter }
  303. }
  304. }
  305. impl Server for ServerState {
  306. type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
  307. fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
  308. self.counter.fetch_sub(1, Ordering::SeqCst);
  309. ready(Ok((End, PingReply)))
  310. }
  311. }
  312. #[test]
  313. fn ping_pong_test() {
  314. ASYNC_RT.block_on(async {
  315. const SERVICE_ID: &str = "PingPongProtocolServer";
  316. let service_id = ServiceId::from(SERVICE_ID);
  317. let counter = Arc::new(AtomicU8::new(0));
  318. let service_name = {
  319. let service_counter = counter.clone();
  320. let make_init = move || {
  321. let server_counter = service_counter.clone();
  322. ServerState::new(server_counter)
  323. };
  324. register_server(make_init, &RUNTIME, service_id.clone())
  325. .await
  326. .unwrap()
  327. };
  328. let mut client_handle = spawn_client(ClientState::new(counter.clone()), &RUNTIME).await;
  329. let service_addr = ServiceAddr::new(service_name, true);
  330. client_handle.send_ping(Ping, service_addr).await.unwrap();
  331. assert_eq!(0, counter.load(Ordering::SeqCst));
  332. RUNTIME.take_service(&service_id).await.unwrap();
  333. });
  334. }
  335. }
  336. mod travel_agency {
  337. use super::*;
  338. // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
  339. // example in the survey paper "Behavioral Types in Programming Languages."
  340. // Note that the Choosing state can send messages at any time, not just in response to another
  341. // message because there is a transition from Choosing that doesn't use the receive operator
  342. // (`?`).
  343. protocol! {
  344. named TravelAgency;
  345. let agency = [Listening];
  346. let customer = [Choosing];
  347. Choosing -> Choosing, >service(Listening)!Query;
  348. Choosing -> Choosing, >service(Listening)!Accept;
  349. Choosing -> Choosing, >service(Listening)!Reject;
  350. Listening?Query -> Listening, >Choosing!Query::Reply;
  351. Choosing?Query::Reply -> Choosing;
  352. Listening?Accept -> End, >Choosing!Accept::Reply;
  353. Choosing?Accept::Reply -> End;
  354. Listening?Reject -> End, >Choosing!Reject::Reply;
  355. Choosing?Reject::Reply -> End;
  356. }
  357. #[derive(Serialize, Deserialize)]
  358. pub struct Query;
  359. impl CallMsg for Query {
  360. type Reply = ();
  361. }
  362. #[derive(Serialize, Deserialize)]
  363. pub struct Reject;
  364. impl CallMsg for Reject {
  365. type Reply = ();
  366. }
  367. #[derive(Serialize, Deserialize)]
  368. pub struct Accept;
  369. impl CallMsg for Accept {
  370. type Reply = ();
  371. }
  372. }
  373. #[allow(dead_code)]
  374. mod client_callback {
  375. use super::*;
  376. use std::time::Duration;
  377. use tokio::{sync::oneshot, time::timeout};
  378. #[derive(Serialize, Deserialize)]
  379. pub struct Register {
  380. factor: usize,
  381. }
  382. #[derive(Serialize, Deserialize)]
  383. pub struct Completed {
  384. value: usize,
  385. }
  386. protocol! {
  387. named ClientCallback;
  388. let server = [Listening];
  389. let worker = [Working];
  390. let client = [Unregistered, Registered];
  391. Unregistered -> Registered, >service(Listening)!Register[Registered];
  392. Listening?Register[Registered] -> Listening, Working[Registered];
  393. Working[Registered] -> End, >Registered!Completed;
  394. Registered?Completed -> End;
  395. }
  396. struct UnregisteredState {
  397. sender: oneshot::Sender<usize>,
  398. }
  399. impl Unregistered for UnregisteredState {
  400. type OnSendRegisterRegistered = RegisteredState;
  401. type OnSendRegisterFut = Ready<Result<Self::OnSendRegisterRegistered>>;
  402. fn on_send_register(self, _arg: &mut Register) -> Self::OnSendRegisterFut {
  403. ready(Ok(RegisteredState {
  404. sender: self.sender,
  405. }))
  406. }
  407. }
  408. struct RegisteredState {
  409. sender: oneshot::Sender<usize>,
  410. }
  411. impl Registered for RegisteredState {
  412. type HandleCompletedFut = Ready<Result<End>>;
  413. fn handle_completed(self, arg: Completed) -> Self::HandleCompletedFut {
  414. self.sender.send(arg.value).unwrap();
  415. ready(Ok(End))
  416. }
  417. }
  418. struct ListeningState {
  419. multiple: usize,
  420. }
  421. impl Listening for ListeningState {
  422. type HandleRegisterListening = ListeningState;
  423. type HandleRegisterWorking = WorkingState;
  424. type HandleRegisterFut = Ready<Result<(ListeningState, WorkingState)>>;
  425. fn handle_register(self, arg: Register) -> Self::HandleRegisterFut {
  426. let multiple = self.multiple;
  427. ready(Ok((
  428. self,
  429. WorkingState {
  430. factor: arg.factor,
  431. multiple,
  432. },
  433. )))
  434. }
  435. }
  436. struct WorkingState {
  437. factor: usize,
  438. multiple: usize,
  439. }
  440. impl Working for WorkingState {
  441. type OnSendCompletedFut = Ready<Result<(End, Completed)>>;
  442. fn on_send_completed(self) -> Self::OnSendCompletedFut {
  443. let value = self.multiple * self.factor;
  444. ready(Ok((End, Completed { value })))
  445. }
  446. }
  447. use ::tokio::sync::Mutex;
  448. enum ClientState<Init: Unregistered> {
  449. Unregistered(Init),
  450. Registered(Init::OnSendRegisterRegistered),
  451. End(End),
  452. }
  453. impl<Init: Unregistered> ClientState<Init> {
  454. pub fn name(&self) -> &'static str {
  455. match self {
  456. Self::Unregistered(_) => "Unregistered",
  457. Self::Registered(_) => "Registered",
  458. Self::End(_) => "End",
  459. }
  460. }
  461. }
  462. struct ClientHandle<Init: Unregistered> {
  463. runtime: &'static Runtime,
  464. state: Arc<Mutex<Option<ClientState<Init>>>>,
  465. name: ActorName,
  466. }
  467. impl<Init: Unregistered> ClientHandle<Init> {
  468. async fn send_register(&self, to: ServiceAddr, mut msg: Register) -> Result<()> {
  469. let mut guard = self.state.lock().await;
  470. let state = guard
  471. .take()
  472. .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
  473. let new_state = match state {
  474. ClientState::Unregistered(state) => {
  475. let new_state = state.on_send_register(&mut msg).await?;
  476. let msg = ClientCallbackMsgs::Register(msg);
  477. self.runtime
  478. .send_service(to, self.name.clone(), msg)
  479. .await?;
  480. // QUESTION: Should `on_send_register` be required to return the previous state
  481. // if it encounters an error?
  482. ClientState::Registered(new_state)
  483. }
  484. state => state,
  485. };
  486. *guard = Some(new_state);
  487. Ok(())
  488. }
  489. }
  490. async fn spawn_client<Init>(init: Init, runtime: &'static Runtime) -> ClientHandle<Init>
  491. where
  492. Init: 'static + Unregistered,
  493. {
  494. let state = Arc::new(Mutex::new(Some(ClientState::Unregistered(init))));
  495. let name = {
  496. let state = state.clone();
  497. runtime.spawn(move |_, mut mailbox, _act_id| async move {
  498. while let Some(envelope) = mailbox.recv().await {
  499. let mut guard = state.lock().await;
  500. let state = guard.take()
  501. .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
  502. let (msg, _kind) = envelope.split();
  503. let new_state = match (state, msg) {
  504. (ClientState::Registered(curr_state), ClientCallbackMsgs::Completed(msg)) => {
  505. match curr_state.handle_completed(msg).await {
  506. Ok(next) => ClientState::<Init>::End(next),
  507. Err(err) => {
  508. log::error!("Failed to handle 'Completed' message in 'Registered' state: {err}");
  509. panic!("We can't transition to a new state because we gave ownership away and that method failed!")
  510. }
  511. }
  512. }
  513. (state, msg) => {
  514. log::error!("Unexpected message '{}' in state '{}'.", msg.name(), state.name());
  515. state
  516. }
  517. };
  518. *guard = Some(new_state);
  519. }
  520. }).await
  521. };
  522. ClientHandle {
  523. runtime,
  524. state,
  525. name,
  526. }
  527. }
  528. async fn register_server<Init, F>(
  529. make_init: F,
  530. runtime: &'static Runtime,
  531. service_id: ServiceId,
  532. ) -> Result<ServiceName>
  533. where
  534. Init: 'static + Listening<HandleRegisterListening = Init>,
  535. F: 'static + Send + Sync + Clone + Fn() -> Init,
  536. {
  537. enum ServerState<S: Listening> {
  538. Listening(S),
  539. }
  540. impl<S: Listening> ServerState<S> {
  541. fn name(&self) -> &'static str {
  542. match self {
  543. Self::Listening(_) => "Listening",
  544. }
  545. }
  546. }
  547. async fn server_loop<Init, F>(
  548. runtime: &'static Runtime,
  549. make_init: F,
  550. mut mailbox: Mailbox<ClientCallbackMsgs>,
  551. _act_id: Uuid,
  552. ) where
  553. Init: 'static + Listening<HandleRegisterListening = Init>,
  554. F: 'static + Send + Sync + Fn() -> Init,
  555. {
  556. let mut state = ServerState::Listening(make_init());
  557. while let Some(envelope) = mailbox.recv().await {
  558. let (msg, msg_kind) = envelope.split();
  559. let new_state = match (state, msg) {
  560. (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
  561. match curr_state.handle_register(msg).await {
  562. Ok((new_state, working_state)) => {
  563. if let EnvelopeKind::Send { from, .. } = msg_kind {
  564. start_worker(working_state, from, runtime).await;
  565. } else {
  566. log::error!("Expected Register to be a Send message.");
  567. }
  568. ServerState::Listening(new_state)
  569. }
  570. Err(err) => {
  571. log::error!("Failed to handle the Register message: {err}");
  572. todo!("Need to recover the previous state from err.")
  573. }
  574. }
  575. }
  576. (state, msg) => {
  577. log::error!(
  578. "Unexpected message '{}' in state '{}'.",
  579. msg.name(),
  580. state.name()
  581. );
  582. state
  583. }
  584. };
  585. state = new_state;
  586. }
  587. }
  588. runtime
  589. .register::<ClientCallbackMsgs, _>(service_id, move |runtime: &'static Runtime| {
  590. let make_init = make_init.clone();
  591. let fut = async move {
  592. let make_init = make_init.clone();
  593. let actor_name = runtime
  594. .spawn(move |_, mailbox, act_id| {
  595. server_loop(runtime, make_init, mailbox, act_id)
  596. })
  597. .await;
  598. Ok(actor_name)
  599. };
  600. Box::pin(fut)
  601. })
  602. .await
  603. }
  604. async fn start_worker<Init>(
  605. init: Init,
  606. owned: ActorName,
  607. runtime: &'static Runtime,
  608. ) -> ActorName
  609. where
  610. Init: 'static + Working,
  611. {
  612. enum WorkerState<S: Working> {
  613. Working(S),
  614. }
  615. runtime
  616. .spawn::<ClientCallbackMsgs, _, _>(move |_, _, act_id| async move {
  617. let msg = match init.on_send_completed().await {
  618. Ok((End, msg)) => msg,
  619. Err(err) => {
  620. log::error!("Failed to send Completed message: {err}");
  621. return;
  622. }
  623. };
  624. let from = runtime.actor_name(act_id);
  625. let msg = ClientCallbackMsgs::Completed(msg);
  626. if let Err(err) = runtime.send(owned, from, msg).await {
  627. log::error!("Failed to send Completed message: {err}");
  628. }
  629. })
  630. .await
  631. }
  632. #[test]
  633. fn client_callback_protocol() {
  634. ASYNC_RT.block_on(async {
  635. const SERVICE_ID: &str = "ClientCallbackProtocolListening";
  636. let service_id = ServiceId::from(SERVICE_ID);
  637. let service_name = {
  638. let make_init = move || ListeningState { multiple: 2 };
  639. register_server(make_init, &RUNTIME, service_id.clone())
  640. .await
  641. .unwrap()
  642. };
  643. let (sender, receiver) = oneshot::channel();
  644. let client_handle = spawn_client(UnregisteredState { sender }, &RUNTIME).await;
  645. let service_addr = ServiceAddr::new(service_name, false);
  646. client_handle
  647. .send_register(service_addr, Register { factor: 21 })
  648. .await
  649. .unwrap();
  650. let value = timeout(Duration::from_millis(500), receiver)
  651. .await
  652. .unwrap()
  653. .unwrap();
  654. assert_eq!(42, value);
  655. });
  656. }
  657. }