123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734 |
- #![feature(impl_trait_in_assoc_type)]
- use btrun::*;
- use btlib::{
- crypto::{ConcreteCreds, CredStore, CredsPriv},
- log::BuilderExt,
- Result,
- };
- use btlib_tests::TEST_STORE;
- use btproto::protocol;
- use btserde::to_vec;
- use bttp::{BlockAddr, Transmitter};
- use ctor::ctor;
- use lazy_static::lazy_static;
- use log;
- use serde::{Deserialize, Serialize};
- use std::{
- future::{ready, Future, Ready},
- net::{IpAddr, Ipv4Addr},
- sync::{
- atomic::{AtomicU8, Ordering},
- Arc,
- },
- };
- use tokio::{runtime::Builder, sync::mpsc};
- use uuid::Uuid;
- const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
- lazy_static! {
- static ref RUNTIME_CREDS: Arc<ConcreteCreds> = TEST_STORE.node_creds().unwrap();
- }
- declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
- lazy_static! {
- /// A tokio async runtime.
- ///
- /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
- /// is created for each test
- /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
- /// This creates a problem, because the first test thread to access the `RUNTIME` static
- /// will initialize its `Receiver` in its runtime, which will stop running at the end of
- /// the test. Hence subsequent tests will not be able to send remote messages to this
- /// `Runtime`.
- ///
- /// By creating a single async runtime which is used by all of the tests, we can avoid this
- /// problem.
- static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
- }
- /// The log level to use when running tests.
- const LOG_LEVEL: &str = "warn";
- #[ctor]
- fn ctor() {
- std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
- env_logger::Builder::from_default_env().btformat().init();
- }
- #[derive(Serialize, Deserialize)]
- struct EchoMsg(String);
- impl CallMsg for EchoMsg {
- type Reply = EchoMsg;
- }
- async fn echo(
- _rt: &'static Runtime,
- mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
- _act_id: Uuid,
- ) {
- while let Some(envelope) = mailbox.recv().await {
- let (msg, kind) = envelope.split();
- match kind {
- EnvelopeKind::Call { reply } => {
- let replier = reply.unwrap_or_else(|| panic!("The reply has already been sent."));
- if let Err(_) = replier.send(msg) {
- panic!("failed to send reply");
- }
- }
- _ => panic!("Expected EchoMsg to be a Call Message."),
- }
- }
- }
- #[test]
- fn local_call() {
- ASYNC_RT.block_on(async {
- const EXPECTED: &str = "hello";
- let name = RUNTIME.spawn(echo).await;
- let from = ActorName::new(name.path().clone(), Uuid::default());
- let reply = RUNTIME
- .call(name.clone(), from, EchoMsg(EXPECTED.into()))
- .await
- .unwrap();
- assert_eq!(EXPECTED, reply.0);
- RUNTIME.take(&name).await.unwrap();
- })
- }
- #[test]
- fn remote_call() {
- ASYNC_RT.block_on(async {
- const EXPECTED: &str = "hello";
- let actor_name = RUNTIME.spawn(echo).await;
- let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
- let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
- let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
- .await
- .unwrap();
- let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
- let wire_msg = WireMsg::new(
- actor_name.clone(),
- RUNTIME.actor_name(Uuid::default()),
- &buf,
- );
- let reply = transmitter
- .call(wire_msg, ReplyCallback::<EchoMsg>::new())
- .await
- .unwrap()
- .unwrap();
- assert_eq!(EXPECTED, reply.0);
- RUNTIME.take(&actor_name).await.unwrap();
- });
- }
- /// Tests the `num_running` method.
- ///
- /// This test uses its own runtime and so can use the `#[tokio::test]` attribute.
- #[tokio::test]
- async fn num_running() {
- declare_runtime!(
- LOCAL_RT,
- // This needs to be different from the address where `RUNTIME` is listening.
- IpAddr::from([127, 0, 0, 2]),
- TEST_STORE.node_creds().unwrap()
- );
- assert_eq!(0, LOCAL_RT.num_running().await);
- let name = LOCAL_RT.spawn(echo).await;
- assert_eq!(1, LOCAL_RT.num_running().await);
- LOCAL_RT.take(&name).await.unwrap();
- assert_eq!(0, LOCAL_RT.num_running().await);
- }
- mod ping_pong {
- use super::*;
- use btlib::bterr;
- // The following code is a proof-of-concept for what types should be generated for a
- // simple ping-pong protocol:
- protocol! {
- named PingProtocol;
- let server = [Server];
- let client = [Client];
- Client -> End, >service(Server)!Ping;
- Server?Ping -> End, >Client!Ping::Reply;
- }
- //
- // In words, the protocol is described as follows.
- // 1. The ClientInit state receives the Activate message. It returns the SentPing state and a
- // Ping message to be sent to the Listening state.
- // 2. The ServerInit state receives the Activate message. It returns the Listening state.
- // 3. When the Listening state receives the Ping message it returns the End state and a
- // Ping::Reply message to be sent to the SentPing state.
- // 4. When the SentPing state receives the Ping::Reply message it returns the End state.
- //
- // The End state represents an end to the session described by the protocol. When an actor
- // transitions to the End state its function returns.
- // The generated actor implementation is the sender of the Activate message.
- // When a state is expecting a Reply message, an error occurs if the message is not received
- // in a timely manner.
- enum PingClientState<T: Client> {
- Client(T),
- End(End),
- }
- impl<T: Client> PingClientState<T> {
- const fn name(&self) -> &'static str {
- match self {
- Self::Client(_) => "Client",
- Self::End(_) => "End",
- }
- }
- }
- struct ClientHandle<T: Client> {
- state: Option<PingClientState<T>>,
- runtime: &'static Runtime,
- }
- impl<T: Client> ClientHandle<T> {
- async fn send_ping(&mut self, mut msg: Ping, service: ServiceAddr) -> Result<PingReply> {
- let state = self
- .state
- .take()
- .ok_or_else(|| bterr!("State was not returned."))?;
- let (new_state, result) = match state {
- PingClientState::Client(state) => {
- let (new_state, _) = state.on_send_ping(&mut msg).await?;
- let new_state = PingClientState::End(new_state);
- let result = self
- .runtime
- .call_service(service, PingProtocolMsgs::Ping(msg))
- .await;
- (new_state, result)
- }
- state => {
- let result = Err(bterr!("Can't send Ping in state {}.", state.name()));
- (state, result)
- }
- };
- self.state = Some(new_state);
- let reply = result?;
- match reply {
- PingProtocolMsgs::PingReply(reply) => Ok(reply),
- msg => Err(bterr!(
- "Unexpected message type sent in reply: {}",
- msg.name()
- )),
- }
- }
- }
- async fn spawn_client<T: Client>(init: T, runtime: &'static Runtime) -> ClientHandle<T> {
- let state = Some(PingClientState::Client(init));
- ClientHandle { state, runtime }
- }
- async fn register_server<Init, F>(
- make_init: F,
- rt: &'static Runtime,
- id: ServiceId,
- ) -> Result<ServiceName>
- where
- Init: 'static + Server,
- F: 'static + Send + Sync + Clone + Fn() -> Init,
- {
- enum ServerState<S> {
- Server(S),
- End(End),
- }
- async fn server_loop<Init, F>(
- _runtime: &'static Runtime,
- make_init: F,
- mut mailbox: Mailbox<PingProtocolMsgs>,
- _act_id: Uuid,
- ) where
- Init: 'static + Server,
- F: 'static + Send + Sync + FnOnce() -> Init,
- {
- let mut state = ServerState::Server(make_init());
- while let Some(envelope) = mailbox.recv().await {
- let (msg, msg_kind) = envelope.split();
- state = match (state, msg) {
- (ServerState::Server(listening_state), PingProtocolMsgs::Ping(msg)) => {
- let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
- match msg_kind {
- EnvelopeKind::Call { reply: replier } => {
- let replier = replier.expect("The reply has already been sent.");
- if let Err(_) = replier.send(PingProtocolMsgs::PingReply(reply)) {
- panic!("Failed to send Ping reply.");
- }
- ServerState::End(new_state)
- }
- _ => panic!("'Ping' was expected to be a Call message."),
- }
- }
- (state, _) => state,
- };
- if let ServerState::End(_) = state {
- break;
- }
- }
- }
- rt.register::<PingProtocolMsgs, _>(id, move |runtime| {
- let make_init = make_init.clone();
- let fut = async move {
- let actor_name = runtime
- .spawn(move |_, mailbox, act_id| {
- server_loop(runtime, make_init, mailbox, act_id)
- })
- .await;
- Ok(actor_name)
- };
- Box::pin(fut)
- })
- .await
- }
- #[derive(Serialize, Deserialize)]
- pub struct Ping;
- impl CallMsg for Ping {
- type Reply = PingReply;
- }
- #[derive(Serialize, Deserialize)]
- pub struct PingReply;
- struct ClientState {
- counter: Arc<AtomicU8>,
- }
- impl ClientState {
- fn new(counter: Arc<AtomicU8>) -> Self {
- counter.fetch_add(1, Ordering::SeqCst);
- Self { counter }
- }
- }
- impl Client for ClientState {
- type OnSendPingFut = impl Future<Output = Result<(End, PingReply)>>;
- fn on_send_ping(self, _msg: &mut Ping) -> Self::OnSendPingFut {
- self.counter.fetch_sub(1, Ordering::SeqCst);
- ready(Ok((End, PingReply)))
- }
- }
- struct ServerState {
- counter: Arc<AtomicU8>,
- }
- impl ServerState {
- fn new(counter: Arc<AtomicU8>) -> Self {
- counter.fetch_add(1, Ordering::SeqCst);
- Self { counter }
- }
- }
- impl Server for ServerState {
- type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
- fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
- self.counter.fetch_sub(1, Ordering::SeqCst);
- ready(Ok((End, PingReply)))
- }
- }
- #[test]
- fn ping_pong_test() {
- ASYNC_RT.block_on(async {
- const SERVICE_ID: &str = "PingPongProtocolServer";
- let service_id = ServiceId::from(SERVICE_ID);
- let counter = Arc::new(AtomicU8::new(0));
- let service_name = {
- let service_counter = counter.clone();
- let make_init = move || {
- let server_counter = service_counter.clone();
- ServerState::new(server_counter)
- };
- register_server(make_init, &RUNTIME, service_id.clone())
- .await
- .unwrap()
- };
- let mut client_handle = spawn_client(ClientState::new(counter.clone()), &RUNTIME).await;
- let service_addr = ServiceAddr::new(service_name, true);
- client_handle.send_ping(Ping, service_addr).await.unwrap();
- assert_eq!(0, counter.load(Ordering::SeqCst));
- RUNTIME.take_service(&service_id).await.unwrap();
- });
- }
- }
- mod travel_agency {
- use super::*;
- // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
- // example in the survey paper "Behavioral Types in Programming Languages."
- // Note that the Choosing state can send messages at any time, not just in response to another
- // message because there is a transition from Choosing that doesn't use the receive operator
- // (`?`).
- protocol! {
- named TravelAgency;
- let agency = [Listening];
- let customer = [Choosing];
- Choosing -> Choosing, >service(Listening)!Query;
- Choosing -> Choosing, >service(Listening)!Accept;
- Choosing -> Choosing, >service(Listening)!Reject;
- Listening?Query -> Listening, >Choosing!Query::Reply;
- Choosing?Query::Reply -> Choosing;
- Listening?Accept -> End, >Choosing!Accept::Reply;
- Choosing?Accept::Reply -> End;
- Listening?Reject -> End, >Choosing!Reject::Reply;
- Choosing?Reject::Reply -> End;
- }
- #[derive(Serialize, Deserialize)]
- pub struct Query;
- impl CallMsg for Query {
- type Reply = ();
- }
- #[derive(Serialize, Deserialize)]
- pub struct Reject;
- impl CallMsg for Reject {
- type Reply = ();
- }
- #[derive(Serialize, Deserialize)]
- pub struct Accept;
- impl CallMsg for Accept {
- type Reply = ();
- }
- }
- #[allow(dead_code)]
- mod client_callback {
- use super::*;
- use std::time::Duration;
- use tokio::{sync::oneshot, time::timeout};
- #[derive(Serialize, Deserialize)]
- pub struct Register {
- factor: usize,
- }
- #[derive(Serialize, Deserialize)]
- pub struct Completed {
- value: usize,
- }
- protocol! {
- named ClientCallback;
- let server = [Listening];
- let worker = [Working];
- let client = [Unregistered, Registered];
- Unregistered -> Registered, >service(Listening)!Register[Registered];
- Listening?Register[Registered] -> Listening, Working[Registered];
- Working[Registered] -> End, >Registered!Completed;
- Registered?Completed -> End;
- }
- struct UnregisteredState {
- sender: oneshot::Sender<usize>,
- }
- impl Unregistered for UnregisteredState {
- type OnSendRegisterRegistered = RegisteredState;
- type OnSendRegisterFut = Ready<Result<Self::OnSendRegisterRegistered>>;
- fn on_send_register(self, _arg: &mut Register) -> Self::OnSendRegisterFut {
- ready(Ok(RegisteredState {
- sender: self.sender,
- }))
- }
- }
- struct RegisteredState {
- sender: oneshot::Sender<usize>,
- }
- impl Registered for RegisteredState {
- type HandleCompletedFut = Ready<Result<End>>;
- fn handle_completed(self, arg: Completed) -> Self::HandleCompletedFut {
- self.sender.send(arg.value).unwrap();
- ready(Ok(End))
- }
- }
- struct ListeningState {
- multiple: usize,
- }
- impl Listening for ListeningState {
- type HandleRegisterListening = ListeningState;
- type HandleRegisterWorking = WorkingState;
- type HandleRegisterFut = Ready<Result<(ListeningState, WorkingState)>>;
- fn handle_register(self, arg: Register) -> Self::HandleRegisterFut {
- let multiple = self.multiple;
- ready(Ok((
- self,
- WorkingState {
- factor: arg.factor,
- multiple,
- },
- )))
- }
- }
- struct WorkingState {
- factor: usize,
- multiple: usize,
- }
- impl Working for WorkingState {
- type OnSendCompletedFut = Ready<Result<(End, Completed)>>;
- fn on_send_completed(self) -> Self::OnSendCompletedFut {
- let value = self.multiple * self.factor;
- ready(Ok((End, Completed { value })))
- }
- }
- use ::tokio::sync::Mutex;
- enum ClientState<Init: Unregistered> {
- Unregistered(Init),
- Registered(Init::OnSendRegisterRegistered),
- End(End),
- }
- impl<Init: Unregistered> ClientState<Init> {
- pub fn name(&self) -> &'static str {
- match self {
- Self::Unregistered(_) => "Unregistered",
- Self::Registered(_) => "Registered",
- Self::End(_) => "End",
- }
- }
- }
- struct ClientHandle<Init: Unregistered> {
- runtime: &'static Runtime,
- state: Arc<Mutex<Option<ClientState<Init>>>>,
- name: ActorName,
- }
- impl<Init: Unregistered> ClientHandle<Init> {
- async fn send_register(&self, to: ServiceAddr, mut msg: Register) -> Result<()> {
- let mut guard = self.state.lock().await;
- let state = guard
- .take()
- .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
- let new_state = match state {
- ClientState::Unregistered(state) => {
- let new_state = state.on_send_register(&mut msg).await?;
- let msg = ClientCallbackMsgs::Register(msg);
- self.runtime
- .send_service(to, self.name.clone(), msg)
- .await?;
- // QUESTION: Should `on_send_register` be required to return the previous state
- // if it encounters an error?
- ClientState::Registered(new_state)
- }
- state => state,
- };
- *guard = Some(new_state);
- Ok(())
- }
- }
- async fn spawn_client<Init>(init: Init, runtime: &'static Runtime) -> ClientHandle<Init>
- where
- Init: 'static + Unregistered,
- {
- let state = Arc::new(Mutex::new(Some(ClientState::Unregistered(init))));
- let name = {
- let state = state.clone();
- runtime.spawn(move |_, mut mailbox, _act_id| async move {
- while let Some(envelope) = mailbox.recv().await {
- let mut guard = state.lock().await;
- let state = guard.take()
- .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
- let (msg, _kind) = envelope.split();
- let new_state = match (state, msg) {
- (ClientState::Registered(curr_state), ClientCallbackMsgs::Completed(msg)) => {
- match curr_state.handle_completed(msg).await {
- Ok(next) => ClientState::<Init>::End(next),
- Err(err) => {
- log::error!("Failed to handle 'Completed' message in 'Registered' state: {err}");
- panic!("We can't transition to a new state because we gave ownership away and that method failed!")
- }
- }
- }
- (state, msg) => {
- log::error!("Unexpected message '{}' in state '{}'.", msg.name(), state.name());
- state
- }
- };
- *guard = Some(new_state);
- }
- }).await
- };
- ClientHandle {
- runtime,
- state,
- name,
- }
- }
- async fn register_server<Init, F>(
- make_init: F,
- runtime: &'static Runtime,
- service_id: ServiceId,
- ) -> Result<ServiceName>
- where
- Init: 'static + Listening<HandleRegisterListening = Init>,
- F: 'static + Send + Sync + Clone + Fn() -> Init,
- {
- enum ServerState<S: Listening> {
- Listening(S),
- }
- impl<S: Listening> ServerState<S> {
- fn name(&self) -> &'static str {
- match self {
- Self::Listening(_) => "Listening",
- }
- }
- }
- async fn server_loop<Init, F>(
- runtime: &'static Runtime,
- make_init: F,
- mut mailbox: Mailbox<ClientCallbackMsgs>,
- _act_id: Uuid,
- ) where
- Init: 'static + Listening<HandleRegisterListening = Init>,
- F: 'static + Send + Sync + Fn() -> Init,
- {
- let mut state = ServerState::Listening(make_init());
- while let Some(envelope) = mailbox.recv().await {
- let (msg, msg_kind) = envelope.split();
- let new_state = match (state, msg) {
- (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
- match curr_state.handle_register(msg).await {
- Ok((new_state, working_state)) => {
- if let EnvelopeKind::Send { from, .. } = msg_kind {
- start_worker(working_state, from, runtime).await;
- } else {
- log::error!("Expected Register to be a Send message.");
- }
- ServerState::Listening(new_state)
- }
- Err(err) => {
- log::error!("Failed to handle the Register message: {err}");
- todo!("Need to recover the previous state from err.")
- }
- }
- }
- (state, msg) => {
- log::error!(
- "Unexpected message '{}' in state '{}'.",
- msg.name(),
- state.name()
- );
- state
- }
- };
- state = new_state;
- }
- }
- runtime
- .register::<ClientCallbackMsgs, _>(service_id, move |runtime: &'static Runtime| {
- let make_init = make_init.clone();
- let fut = async move {
- let make_init = make_init.clone();
- let actor_name = runtime
- .spawn(move |_, mailbox, act_id| {
- server_loop(runtime, make_init, mailbox, act_id)
- })
- .await;
- Ok(actor_name)
- };
- Box::pin(fut)
- })
- .await
- }
- async fn start_worker<Init>(
- init: Init,
- owned: ActorName,
- runtime: &'static Runtime,
- ) -> ActorName
- where
- Init: 'static + Working,
- {
- enum WorkerState<S: Working> {
- Working(S),
- }
- runtime
- .spawn::<ClientCallbackMsgs, _, _>(move |_, _, act_id| async move {
- let msg = match init.on_send_completed().await {
- Ok((End, msg)) => msg,
- Err(err) => {
- log::error!("Failed to send Completed message: {err}");
- return;
- }
- };
- let from = runtime.actor_name(act_id);
- let msg = ClientCallbackMsgs::Completed(msg);
- if let Err(err) = runtime.send(owned, from, msg).await {
- log::error!("Failed to send Completed message: {err}");
- }
- })
- .await
- }
- #[test]
- fn client_callback_protocol() {
- ASYNC_RT.block_on(async {
- const SERVICE_ID: &str = "ClientCallbackProtocolListening";
- let service_id = ServiceId::from(SERVICE_ID);
- let service_name = {
- let make_init = move || ListeningState { multiple: 2 };
- register_server(make_init, &RUNTIME, service_id.clone())
- .await
- .unwrap()
- };
- let (sender, receiver) = oneshot::channel();
- let client_handle = spawn_client(UnregisteredState { sender }, &RUNTIME).await;
- let service_addr = ServiceAddr::new(service_name, false);
- client_handle
- .send_register(service_addr, Register { factor: 21 })
- .await
- .unwrap();
- let value = timeout(Duration::from_millis(500), receiver)
- .await
- .unwrap()
- .unwrap();
- assert_eq!(42, value);
- });
- }
- }
|