use btrun::model::*; use btrun::test_setup; use btrun::*; use btproto::protocol; use log; use serde::{Deserialize, Serialize}; use std::{ future::{ready, Ready}, sync::{ atomic::{AtomicU8, Ordering}, Arc, }, }; test_setup!(); mod ping_pong { use super::*; use btlib::bterr; // The following code is a proof-of-concept for what types should be generated for a // simple ping-pong protocol: protocol! { named PingProtocol; let server = [Server]; let client = [Client]; Client -> End, >service(Server)!Ping; Server?Ping -> End, >Client!Ping::Reply; } // // In words, the protocol is described as follows. // 1. When the Listening state receives the Ping message it returns the End state and a // Ping::Reply message to be sent to the SentPing state. // 2. When the SentPing state receives the Ping::Reply message it returns the End state. // // The End state represents an end to the session described by the protocol. When an actor // transitions to the End state its function returns. // When a state is expecting a Reply message, an error occurs if the message is not received // in a timely manner. #[derive(Serialize, Deserialize)] pub struct Ping; impl CallMsg for Ping { type Reply = PingReply; } #[derive(Serialize, Deserialize)] pub struct PingReply; struct ClientImpl { counter: Arc, } impl ClientImpl { fn new(counter: Arc) -> Self { counter.fetch_add(1, Ordering::SeqCst); Self { counter } } } impl Client for ClientImpl { actor_name!("ping_client"); type OnSendPingReturn = (); type OnSendPingFut = Ready>; fn on_send_ping(self, _msg: PingReply) -> Self::OnSendPingFut { self.counter.fetch_sub(1, Ordering::SeqCst); ready(TransResult::Ok((End, ()))) } } struct ServerState { counter: Arc, } impl ServerState { fn new(counter: Arc) -> Self { let count = counter.fetch_add(1, Ordering::SeqCst); log::info!("New service provider started. Count is now '{count}'."); Self { counter } } } impl Server for ServerState { actor_name!("ping_server"); type HandlePingFut = Ready>; fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut { self.counter.fetch_sub(1, Ordering::SeqCst); ready(TransResult::Ok((End, PingReply))) } } #[test] fn ping_pong_test() { ASYNC_RT.block_on(async { const SERVICE_ID: &str = "PingPongProtocolServer"; let service_id = ServiceId::from(SERVICE_ID); let counter = Arc::new(AtomicU8::new(0)); let service_name = { let service_counter = counter.clone(); let make_init = move || { let server_counter = service_counter.clone(); ServerState::new(server_counter) }; register_server(&RUNTIME, service_id.clone(), make_init) .await .unwrap() }; let client_handle = spawn_client(ClientImpl::new(counter.clone()), &RUNTIME).await; let service_addr = ServiceAddr::new(service_name, true); client_handle.send_ping(service_addr, Ping).await.unwrap(); assert_eq!(0, counter.load(Ordering::SeqCst)); }); } } mod travel_agency { use super::*; // Here's another protocol example. This is the Customer and Travel Agency protocol used as an // example in the survey paper "Behavioral Types in Programming Languages." // Note that the Choosing state can send messages at any time, not just in response to another // message because there is a transition from Choosing that doesn't use the receive operator // (`?`). protocol! { named TravelAgency; let agency = [Listening]; let customer = [Choosing]; Choosing -> Choosing, >service(Listening)!Query; Choosing -> Choosing, >service(Listening)!Accept; Choosing -> Choosing, >service(Listening)!Reject; Listening?Query -> Listening, >Choosing!Query::Reply; Listening?Accept -> End, >Choosing!Accept::Reply; Listening?Reject -> End, >Choosing!Reject::Reply; } #[derive(Serialize, Deserialize)] pub struct Query; impl CallMsg for Query { type Reply = (); } #[derive(Serialize, Deserialize)] pub struct Reject; impl CallMsg for Reject { type Reply = (); } #[derive(Serialize, Deserialize)] pub struct Accept; impl CallMsg for Accept { type Reply = (); } } mod client_callback { use super::*; use btlib::bterr; use std::time::Duration; use tokio::{sync::oneshot, time::timeout}; #[derive(Serialize, Deserialize)] pub struct Register { factor: usize, } #[derive(Serialize, Deserialize)] pub struct Completed { value: usize, } protocol! { named ClientCallback; let server = [Listening]; let worker = [Working]; let client = [Unregistered, Registered]; Unregistered -> Registered, >service(Listening)!Register[*Registered]; Listening?Register[*Registered] -> Listening, Working[*Registered]; Working[*Registered] -> End, >Registered!Completed; Registered?Completed -> End; } struct UnregisteredState { sender: oneshot::Sender, } impl Unregistered for UnregisteredState { actor_name!("callback_client"); type OnSendRegisterReturn = (); type OnSendRegisterRegistered = RegisteredState; type OnSendRegisterFut = Ready>; fn on_send_register(self) -> Self::OnSendRegisterFut { ready(TransResult::Ok(( RegisteredState { sender: self.sender, }, (), ))) } } struct RegisteredState { sender: oneshot::Sender, } impl Registered for RegisteredState { type HandleCompletedFut = Ready>; fn handle_completed(self, arg: Completed) -> Self::HandleCompletedFut { self.sender.send(arg.value).unwrap(); ready(TransResult::Ok(End)) } } struct ListeningState { multiply_by: usize, } impl Listening for ListeningState { actor_name!("callback_server"); type HandleRegisterListening = ListeningState; type HandleRegisterWorking = WorkingState; type HandleRegisterFut = Ready>; fn handle_register(self, arg: Register) -> Self::HandleRegisterFut { let multiple = self.multiply_by; ready(TransResult::Ok(( self, WorkingState { factor: arg.factor, multiple, }, ))) } } struct WorkingState { factor: usize, multiple: usize, } impl Working for WorkingState { actor_name!("callback_worker"); type OnSendCompletedFut = Ready>; fn on_send_completed(self) -> Self::OnSendCompletedFut { let value = self.multiple * self.factor; ready(TransResult::Ok((End, Completed { value }))) } } #[test] fn client_callback_protocol() { ASYNC_RT.block_on(async { const SERVICE_ID: &str = "ClientCallbackProtocolListening"; let factor = 21usize; let multiply_by = 2usize; let expected = multiply_by * factor; let service_id = ServiceId::from(SERVICE_ID); let service_name = { let make_init = move || ListeningState { multiply_by }; register_server(&RUNTIME, service_id.clone(), make_init) .await .unwrap() }; let (sender, receiver) = oneshot::channel(); let client_handle = spawn_client(UnregisteredState { sender }, &RUNTIME).await; let service_addr = ServiceAddr::new(service_name, false); client_handle .send_register(service_addr, Register { factor }) .await .unwrap(); let value = timeout(Duration::from_millis(500), receiver) .await .unwrap() .unwrap(); assert_eq!(expected, value); }); } }