#![feature(impl_trait_in_assoc_type)] use btrun::*; use btlib::{ crypto::{ConcreteCreds, CredStore, CredsPriv}, log::BuilderExt, Result, }; use btlib_tests::TEST_STORE; use btproto::protocol; use btserde::to_vec; use bttp::{BlockAddr, Transmitter}; use ctor::ctor; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use std::{ future::{ready, Future, Ready}, net::{IpAddr, Ipv4Addr}, sync::{ atomic::{AtomicU8, Ordering}, Arc, }, time::{Duration, Instant}, }; use tokio::{runtime::Builder, sync::mpsc}; use uuid::Uuid; const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); lazy_static! { static ref RUNTIME_CREDS: Arc = TEST_STORE.node_creds().unwrap(); } declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone()); lazy_static! { /// A tokio async runtime. /// /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime /// is created for each test /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime). /// This creates a problem, because the first test thread to access the `RUNTIME` static /// will initialize its `Receiver` in its runtime, which will stop running at the end of /// the test. Hence subsequent tests will not be able to send remote messages to this /// `Runtime`. /// /// By creating a single async runtime which is used by all of the tests, we can avoid this /// problem. static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread() .enable_all() .build() .unwrap(); } /// The log level to use when running tests. const LOG_LEVEL: &str = "warn"; #[ctor] fn ctor() { std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL)); env_logger::Builder::from_default_env().btformat().init(); } #[derive(Serialize, Deserialize)] struct EchoMsg(String); impl CallMsg for EchoMsg { type Reply = EchoMsg; } async fn echo( _rt: &'static Runtime, mut mailbox: mpsc::Receiver>, _act_id: Uuid, ) { while let Some(envelope) = mailbox.recv().await { let (msg, replier, ..) = envelope.split(); if let Some(replier) = replier { if let Err(_) = replier.send(msg) { panic!("failed to send reply"); } } } } #[test] fn local_call() { ASYNC_RT.block_on(async { const EXPECTED: &str = "hello"; let name = RUNTIME.activate(echo).await; let from = ActorName::new(name.path().clone(), Uuid::default()); let reply = RUNTIME .call(name.clone(), from, EchoMsg(EXPECTED.into())) .await .unwrap(); assert_eq!(EXPECTED, reply.0); RUNTIME.take(&name).await.unwrap(); }) } #[test] fn remote_call() { ASYNC_RT.block_on(async { const EXPECTED: &str = "hello"; let actor_name = RUNTIME.activate(echo).await; let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap()); let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path)); let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone()) .await .unwrap(); let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap(); let wire_msg = WireMsg::new( actor_name.clone(), RUNTIME.actor_name(Uuid::default()), &buf, ); let reply = transmitter .call(wire_msg, ReplyCallback::::new()) .await .unwrap() .unwrap(); assert_eq!(EXPECTED, reply.0); RUNTIME.take(&actor_name).await.unwrap(); }); } /// Tests the `num_running` method. /// /// This test uses its own runtime and so can use the `#[tokio::test]` attribute. #[tokio::test] async fn num_running() { declare_runtime!( LOCAL_RT, // This needs to be different from the address where `RUNTIME` is listening. IpAddr::from([127, 0, 0, 2]), TEST_STORE.node_creds().unwrap() ); assert_eq!(0, LOCAL_RT.num_running().await); let name = LOCAL_RT.activate(echo).await; assert_eq!(1, LOCAL_RT.num_running().await); LOCAL_RT.take(&name).await.unwrap(); assert_eq!(0, LOCAL_RT.num_running().await); } mod ping_pong { use super::*; // The following code is a proof-of-concept for what types should be generated for a // simple ping-pong protocol: protocol! { named PingPongProtocol; let server = [Server]; let client = [Client, Waiting]; Client -> Waiting, >service(Server)!Ping; Server?Ping -> End, >Waiting!Ping::Reply; Waiting?Ping::Reply -> End; } // // In words, the protocol is described as follows. // 1. The ClientInit state receives the Activate message. It returns the SentPing state and a // Ping message to be sent to the Listening state. // 2. The ServerInit state receives the Activate message. It returns the Listening state. // 3. When the Listening state receives the Ping message it returns the End state and a // Ping::Reply message to be sent to the SentPing state. // 4. When the SentPing state receives the Ping::Reply message it returns the End state. // // The End state represents an end to the session described by the protocol. When an actor // transitions to the End state its function returns. // The generated actor implementation is the sender of the Activate message. // When a state is expecting a Reply message, an error occurs if the message is not received // in a timely manner. #[derive(Serialize, Deserialize)] pub struct Ping; impl CallMsg for Ping { type Reply = PingReply; } // I was tempted to name this "Pong", but the proc macro wouldn't think to do that. #[derive(Serialize, Deserialize)] pub struct PingReply; trait ClientInit2 { type AfterActivate: SentPing2; type HandleActivateFut: Future>; fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut; } trait ServerInit2 { type AfterActivate: Listening2; type HandleActivateFut: Future>; fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut; } trait Listening2 { type HandlePingFut: Future>; fn handle_ping(self, msg: Ping) -> Self::HandlePingFut; } trait SentPing2 { type HandleReplyFut: Future>; fn handle_reply(self, msg: PingReply) -> Self::HandleReplyFut; } #[derive(Serialize, Deserialize)] enum PingProtocolMsg { Ping(Ping), PingReply(PingReply), } impl CallMsg for PingProtocolMsg { type Reply = PingProtocolMsg; } impl SendMsg for PingProtocolMsg {} struct ClientInitState; impl ClientInit2 for ClientInitState { type AfterActivate = ClientState; type HandleActivateFut = impl Future>; fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut { ready(Ok((ClientState, Ping))) } } struct ClientState; impl SentPing2 for ClientState { type HandleReplyFut = Ready>; fn handle_reply(self, _msg: PingReply) -> Self::HandleReplyFut { ready(Ok(End)) } } #[allow(dead_code)] enum PingClientState { Init(ClientInitState), SentPing(ClientState), End(End), } struct ServerInitState; struct ServerState; impl ServerInit2 for ServerInitState { type AfterActivate = ServerState; type HandleActivateFut = Ready>; fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut { ready(Ok(ServerState)) } } impl Listening2 for ServerState { type HandlePingFut = impl Future>; fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut { ready(Ok((End, PingReply))) } } #[allow(dead_code)] enum PingServerState { ServerInit(ServerInitState), Listening(ServerState), End(End), } async fn ping_server( counter: Arc, rt: &'static Runtime, mut mailbox: mpsc::Receiver>, act_id: Uuid, ) { let mut state = { let init = ServerInitState; let state = init .handle_activate(Activate::new(rt, act_id)) .await .unwrap(); PingServerState::Listening(state) }; while let Some(envelope) = mailbox.recv().await { let (msg, replier, _from) = envelope.split(); match (state, msg) { (PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => { let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap(); state = PingServerState::End(new_state); if let Err(_) = replier.unwrap().send(PingProtocolMsg::PingReply(reply)) { panic!("Failed to send Ping reply."); } } (_prev_state, _) => { panic!("Ping protocol violation."); // A real implementation should assign the previous state and log the error. // state = prev_state; } } if let PingServerState::End(_) = state { break; } } counter.fetch_sub(1, Ordering::SeqCst); } async fn ping_client( counter: Arc, server_name: ActorName, rt: &'static Runtime, _mailbox: mpsc::Receiver>, act_id: Uuid, ) { let init = ClientInitState; let (state, msg) = init .handle_activate(Activate::new(rt, act_id)) .await .unwrap(); let from = rt.actor_name(act_id); let reply = rt .call(server_name, from, PingProtocolMsg::Ping(msg)) .await .unwrap(); if let PingProtocolMsg::PingReply(msg) = reply { state.handle_reply(msg).await.unwrap(); } else { panic!("Incorrect message type sent in reply to Ping."); } counter.fetch_sub(1, Ordering::SeqCst); } #[test] fn ping_pong_test() { ASYNC_RT.block_on(async { let counter = Arc::new(AtomicU8::new(2)); let server_name = { let counter = counter.clone(); RUNTIME .activate(move |rt, mailbox, act_id| ping_server(counter, rt, mailbox, act_id)) .await }; let client_name = { let server_name = server_name.clone(); let counter = counter.clone(); RUNTIME .activate(move |rt, mailbox, act_id| { ping_client(counter, server_name, rt, mailbox, act_id) }) .await }; let deadline = Instant::now() + Duration::from_millis(500); while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline { tokio::time::sleep(Duration::from_millis(20)).await; } // Check that both tasks finished successfully and we didn't just timeout. assert_eq!(0, counter.load(Ordering::SeqCst)); // TODO: Should actor which return be removed from the runtime automatically? RUNTIME.take(&server_name).await.unwrap(); RUNTIME.take(&client_name).await.unwrap(); }); } } mod travel_agency { use super::*; // Here's another protocol example. This is the Customer and Travel Agency protocol used as an // example in the survey paper "Behavioral Types in Programming Languages." // Note that the Choosing state can send messages at any time, not just in response to another // message because there is a transition from Choosing that doesn't use the receive operator // (`?`). protocol! { named TravelAgency; let agency = [Listening]; let customer = [Choosing]; Choosing -> Choosing, >service(Listening)!Query; Choosing -> Choosing, >service(Listening)!Accept; Choosing -> Choosing, >service(Listening)!Reject; Listening?Query -> Listening, >Choosing!Query::Reply; Choosing?Query::Reply -> Choosing; Listening?Accept -> End, >Choosing!Accept::Reply; Choosing?Accept::Reply -> End; Listening?Reject -> End, >Choosing!Reject::Reply; Choosing?Reject::Reply -> End; } #[derive(Serialize, Deserialize)] pub struct Query; impl CallMsg for Query { type Reply = (); } #[derive(Serialize, Deserialize)] pub struct Reject; impl CallMsg for Reject { type Reply = (); } #[derive(Serialize, Deserialize)] pub struct Accept; impl CallMsg for Accept { type Reply = (); } }