|
- use btrun::model::*;
- use btrun::test_setup;
- use btrun::*;
- use btproto::protocol;
- use log;
- use serde::{Deserialize, Serialize};
- use std::{
- future::{ready, Ready},
- sync::{
- atomic::{AtomicU8, Ordering},
- Arc,
- },
- };
- test_setup!();
- mod ping_pong {
- use super::*;
- use btlib::bterr;
- // The following code is a proof-of-concept for what types should be generated for a
- // simple ping-pong protocol:
- protocol! {
- named PingProtocol;
- let server = [Server];
- let client = [Client];
- Client -> End, >service(Server)!Ping;
- Server?Ping -> End, >Client!Ping::Reply;
- }
- //
- // In words, the protocol is described as follows.
- // 1. When the Listening state receives the Ping message it returns the End state and a
- // Ping::Reply message to be sent to the SentPing state.
- // 2. When the SentPing state receives the Ping::Reply message it returns the End state.
- //
- // The End state represents an end to the session described by the protocol. When an actor
- // transitions to the End state its function returns.
- // When a state is expecting a Reply message, an error occurs if the message is not received
- // in a timely manner.
- #[derive(Serialize, Deserialize)]
- pub struct Ping;
- impl CallMsg for Ping {
- type Reply = PingReply;
- }
- #[derive(Serialize, Deserialize)]
- pub struct PingReply;
- struct ClientImpl {
- counter: Arc<AtomicU8>,
- }
- impl ClientImpl {
- fn new(counter: Arc<AtomicU8>) -> Self {
- counter.fetch_add(1, Ordering::SeqCst);
- Self { counter }
- }
- }
- impl Client for ClientImpl {
- actor_name!("ping_client");
- type OnSendPingReturn = ();
- type OnSendPingFut = Ready<TransResult<Self, (End, ())>>;
- fn on_send_ping(self, _msg: PingReply) -> Self::OnSendPingFut {
- self.counter.fetch_sub(1, Ordering::SeqCst);
- ready(TransResult::Ok((End, ())))
- }
- }
- struct ServerState {
- counter: Arc<AtomicU8>,
- }
- impl ServerState {
- fn new(counter: Arc<AtomicU8>) -> Self {
- let count = counter.fetch_add(1, Ordering::SeqCst);
- log::info!("New service provider started. Count is now '{count}'.");
- Self { counter }
- }
- }
- impl Server for ServerState {
- actor_name!("ping_server");
- type HandlePingFut = Ready<TransResult<Self, (End, PingReply)>>;
- fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
- self.counter.fetch_sub(1, Ordering::SeqCst);
- ready(TransResult::Ok((End, PingReply)))
- }
- }
- #[test]
- fn ping_pong_test() {
- ASYNC_RT.block_on(async {
- const SERVICE_ID: &str = "PingPongProtocolServer";
- let service_id = ServiceId::from(SERVICE_ID);
- let counter = Arc::new(AtomicU8::new(0));
- let service_name = {
- let service_counter = counter.clone();
- let make_init = move || {
- let server_counter = service_counter.clone();
- ServerState::new(server_counter)
- };
- register_server(&RUNTIME, service_id.clone(), make_init)
- .await
- .unwrap()
- };
- let client_handle = spawn_client(ClientImpl::new(counter.clone()), &RUNTIME).await;
- let service_addr = ServiceAddr::new(service_name, true);
- client_handle.send_ping(service_addr, Ping).await.unwrap();
- assert_eq!(0, counter.load(Ordering::SeqCst));
- });
- }
- }
- mod travel_agency {
- use super::*;
- // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
- // example in the survey paper "Behavioral Types in Programming Languages."
- // Note that the Choosing state can send messages at any time, not just in response to another
- // message because there is a transition from Choosing that doesn't use the receive operator
- // (`?`).
- protocol! {
- named TravelAgency;
- let agency = [Listening];
- let customer = [Choosing];
- Choosing -> Choosing, >service(Listening)!Query;
- Choosing -> Choosing, >service(Listening)!Accept;
- Choosing -> Choosing, >service(Listening)!Reject;
- Listening?Query -> Listening, >Choosing!Query::Reply;
- Listening?Accept -> End, >Choosing!Accept::Reply;
- Listening?Reject -> End, >Choosing!Reject::Reply;
- }
- #[derive(Serialize, Deserialize)]
- pub struct Query;
- impl CallMsg for Query {
- type Reply = ();
- }
- #[derive(Serialize, Deserialize)]
- pub struct Reject;
- impl CallMsg for Reject {
- type Reply = ();
- }
- #[derive(Serialize, Deserialize)]
- pub struct Accept;
- impl CallMsg for Accept {
- type Reply = ();
- }
- }
- mod client_callback {
- use super::*;
- use btlib::bterr;
- use std::time::Duration;
- use tokio::{sync::oneshot, time::timeout};
- #[derive(Serialize, Deserialize)]
- pub struct Register {
- factor: usize,
- }
- #[derive(Serialize, Deserialize)]
- pub struct Completed {
- value: usize,
- }
- protocol! {
- named ClientCallback;
- let server = [Listening];
- let worker = [Working];
- let client = [Unregistered, Registered];
- Unregistered -> Registered, >service(Listening)!Register[*Registered];
- Listening?Register[*Registered] -> Listening, Working[*Registered];
- Working[*Registered] -> End, >Registered!Completed;
- Registered?Completed -> End;
- }
- struct UnregisteredState {
- sender: oneshot::Sender<usize>,
- }
- impl Unregistered for UnregisteredState {
- actor_name!("callback_client");
- type OnSendRegisterReturn = ();
- type OnSendRegisterRegistered = RegisteredState;
- type OnSendRegisterFut = Ready<TransResult<Self, (Self::OnSendRegisterRegistered, ())>>;
- fn on_send_register(self) -> Self::OnSendRegisterFut {
- ready(TransResult::Ok((
- RegisteredState {
- sender: self.sender,
- },
- (),
- )))
- }
- }
- struct RegisteredState {
- sender: oneshot::Sender<usize>,
- }
- impl Registered for RegisteredState {
- type HandleCompletedFut = Ready<TransResult<Self, End>>;
- fn handle_completed(self, arg: Completed) -> Self::HandleCompletedFut {
- self.sender.send(arg.value).unwrap();
- ready(TransResult::Ok(End))
- }
- }
- struct ListeningState {
- multiply_by: usize,
- }
- impl Listening for ListeningState {
- actor_name!("callback_server");
- type HandleRegisterListening = ListeningState;
- type HandleRegisterWorking = WorkingState;
- type HandleRegisterFut = Ready<TransResult<Self, (ListeningState, WorkingState)>>;
- fn handle_register(self, arg: Register) -> Self::HandleRegisterFut {
- let multiple = self.multiply_by;
- ready(TransResult::Ok((
- self,
- WorkingState {
- factor: arg.factor,
- multiple,
- },
- )))
- }
- }
- struct WorkingState {
- factor: usize,
- multiple: usize,
- }
- impl Working for WorkingState {
- actor_name!("callback_worker");
- type OnSendCompletedFut = Ready<TransResult<Self, (End, Completed)>>;
- fn on_send_completed(self) -> Self::OnSendCompletedFut {
- let value = self.multiple * self.factor;
- ready(TransResult::Ok((End, Completed { value })))
- }
- }
- #[test]
- fn client_callback_protocol() {
- ASYNC_RT.block_on(async {
- const SERVICE_ID: &str = "ClientCallbackProtocolListening";
- let factor = 21usize;
- let multiply_by = 2usize;
- let expected = multiply_by * factor;
- let service_id = ServiceId::from(SERVICE_ID);
- let service_name = {
- let make_init = move || ListeningState { multiply_by };
- register_server(&RUNTIME, service_id.clone(), make_init)
- .await
- .unwrap()
- };
- let (sender, receiver) = oneshot::channel();
- let client_handle = spawn_client(UnregisteredState { sender }, &RUNTIME).await;
- let service_addr = ServiceAddr::new(service_name, false);
- client_handle
- .send_register(service_addr, Register { factor })
- .await
- .unwrap();
- let value = timeout(Duration::from_millis(500), receiver)
- .await
- .unwrap()
- .unwrap();
- assert_eq!(expected, value);
- });
- }
- }
|