runtime_tests.rs 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. use btrun::model::*;
  2. use btrun::test_setup;
  3. use btrun::*;
  4. use btproto::protocol;
  5. use log;
  6. use serde::{Deserialize, Serialize};
  7. use std::{
  8. future::{ready, Ready},
  9. sync::{
  10. atomic::{AtomicU8, Ordering},
  11. Arc,
  12. },
  13. };
  14. test_setup!();
  15. mod ping_pong {
  16. use super::*;
  17. use btlib::bterr;
  18. // The following code is a proof-of-concept for what types should be generated for a
  19. // simple ping-pong protocol:
  20. protocol! {
  21. named PingProtocol;
  22. let server = [Server];
  23. let client = [Client];
  24. Client -> End, >service(Server)!Ping;
  25. Server?Ping -> End, >Client!Ping::Reply;
  26. }
  27. //
  28. // In words, the protocol is described as follows.
  29. // 1. When the Listening state receives the Ping message it returns the End state and a
  30. // Ping::Reply message to be sent to the SentPing state.
  31. // 2. When the SentPing state receives the Ping::Reply message it returns the End state.
  32. //
  33. // The End state represents an end to the session described by the protocol. When an actor
  34. // transitions to the End state its function returns.
  35. // When a state is expecting a Reply message, an error occurs if the message is not received
  36. // in a timely manner.
  37. #[derive(Serialize, Deserialize)]
  38. pub struct Ping;
  39. impl CallMsg for Ping {
  40. type Reply = PingReply;
  41. }
  42. #[derive(Serialize, Deserialize)]
  43. pub struct PingReply;
  44. struct ClientImpl {
  45. counter: Arc<AtomicU8>,
  46. }
  47. impl ClientImpl {
  48. fn new(counter: Arc<AtomicU8>) -> Self {
  49. counter.fetch_add(1, Ordering::SeqCst);
  50. Self { counter }
  51. }
  52. }
  53. impl Client for ClientImpl {
  54. actor_name!("ping_client");
  55. type OnSendPingReturn = ();
  56. type OnSendPingFut = Ready<TransResult<Self, (End, ())>>;
  57. fn on_send_ping(self, _msg: PingReply) -> Self::OnSendPingFut {
  58. self.counter.fetch_sub(1, Ordering::SeqCst);
  59. ready(TransResult::Ok((End, ())))
  60. }
  61. }
  62. struct ServerState {
  63. counter: Arc<AtomicU8>,
  64. }
  65. impl ServerState {
  66. fn new(counter: Arc<AtomicU8>) -> Self {
  67. let count = counter.fetch_add(1, Ordering::SeqCst);
  68. log::info!("New service provider started. Count is now '{count}'.");
  69. Self { counter }
  70. }
  71. }
  72. impl Server for ServerState {
  73. actor_name!("ping_server");
  74. type HandlePingFut = Ready<TransResult<Self, (End, PingReply)>>;
  75. fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
  76. self.counter.fetch_sub(1, Ordering::SeqCst);
  77. ready(TransResult::Ok((End, PingReply)))
  78. }
  79. }
  80. #[test]
  81. fn ping_pong_test() {
  82. ASYNC_RT.block_on(async {
  83. const SERVICE_ID: &str = "PingPongProtocolServer";
  84. let service_id = ServiceId::from(SERVICE_ID);
  85. let counter = Arc::new(AtomicU8::new(0));
  86. let service_name = {
  87. let service_counter = counter.clone();
  88. let make_init = move || {
  89. let server_counter = service_counter.clone();
  90. ServerState::new(server_counter)
  91. };
  92. register_server(&RUNTIME, service_id.clone(), make_init)
  93. .await
  94. .unwrap()
  95. };
  96. let client_handle = spawn_client(ClientImpl::new(counter.clone()), &RUNTIME).await;
  97. let service_addr = ServiceAddr::new(service_name, true);
  98. client_handle.send_ping(service_addr, Ping).await.unwrap();
  99. assert_eq!(0, counter.load(Ordering::SeqCst));
  100. });
  101. }
  102. }
  103. mod travel_agency {
  104. use super::*;
  105. // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
  106. // example in the survey paper "Behavioral Types in Programming Languages."
  107. // Note that the Choosing state can send messages at any time, not just in response to another
  108. // message because there is a transition from Choosing that doesn't use the receive operator
  109. // (`?`).
  110. protocol! {
  111. named TravelAgency;
  112. let agency = [Listening];
  113. let customer = [Choosing];
  114. Choosing -> Choosing, >service(Listening)!Query;
  115. Choosing -> Choosing, >service(Listening)!Accept;
  116. Choosing -> Choosing, >service(Listening)!Reject;
  117. Listening?Query -> Listening, >Choosing!Query::Reply;
  118. Listening?Accept -> End, >Choosing!Accept::Reply;
  119. Listening?Reject -> End, >Choosing!Reject::Reply;
  120. }
  121. #[derive(Serialize, Deserialize)]
  122. pub struct Query;
  123. impl CallMsg for Query {
  124. type Reply = ();
  125. }
  126. #[derive(Serialize, Deserialize)]
  127. pub struct Reject;
  128. impl CallMsg for Reject {
  129. type Reply = ();
  130. }
  131. #[derive(Serialize, Deserialize)]
  132. pub struct Accept;
  133. impl CallMsg for Accept {
  134. type Reply = ();
  135. }
  136. }
  137. mod client_callback {
  138. use super::*;
  139. use btlib::bterr;
  140. use std::time::Duration;
  141. use tokio::{sync::oneshot, time::timeout};
  142. #[derive(Serialize, Deserialize)]
  143. pub struct Register {
  144. factor: usize,
  145. }
  146. #[derive(Serialize, Deserialize)]
  147. pub struct Completed {
  148. value: usize,
  149. }
  150. protocol! {
  151. named ClientCallback;
  152. let server = [Listening];
  153. let worker = [Working];
  154. let client = [Unregistered, Registered];
  155. Unregistered -> Registered, >service(Listening)!Register[*Registered];
  156. Listening?Register[*Registered] -> Listening, Working[*Registered];
  157. Working[*Registered] -> End, >Registered!Completed;
  158. Registered?Completed -> End;
  159. }
  160. struct UnregisteredState {
  161. sender: oneshot::Sender<usize>,
  162. }
  163. impl Unregistered for UnregisteredState {
  164. actor_name!("callback_client");
  165. type OnSendRegisterReturn = ();
  166. type OnSendRegisterRegistered = RegisteredState;
  167. type OnSendRegisterFut = Ready<TransResult<Self, (Self::OnSendRegisterRegistered, ())>>;
  168. fn on_send_register(self) -> Self::OnSendRegisterFut {
  169. ready(TransResult::Ok((
  170. RegisteredState {
  171. sender: self.sender,
  172. },
  173. (),
  174. )))
  175. }
  176. }
  177. struct RegisteredState {
  178. sender: oneshot::Sender<usize>,
  179. }
  180. impl Registered for RegisteredState {
  181. type HandleCompletedFut = Ready<TransResult<Self, End>>;
  182. fn handle_completed(self, arg: Completed) -> Self::HandleCompletedFut {
  183. self.sender.send(arg.value).unwrap();
  184. ready(TransResult::Ok(End))
  185. }
  186. }
  187. struct ListeningState {
  188. multiply_by: usize,
  189. }
  190. impl Listening for ListeningState {
  191. actor_name!("callback_server");
  192. type HandleRegisterListening = ListeningState;
  193. type HandleRegisterWorking = WorkingState;
  194. type HandleRegisterFut = Ready<TransResult<Self, (ListeningState, WorkingState)>>;
  195. fn handle_register(self, arg: Register) -> Self::HandleRegisterFut {
  196. let multiple = self.multiply_by;
  197. ready(TransResult::Ok((
  198. self,
  199. WorkingState {
  200. factor: arg.factor,
  201. multiple,
  202. },
  203. )))
  204. }
  205. }
  206. struct WorkingState {
  207. factor: usize,
  208. multiple: usize,
  209. }
  210. impl Working for WorkingState {
  211. actor_name!("callback_worker");
  212. type OnSendCompletedFut = Ready<TransResult<Self, (End, Completed)>>;
  213. fn on_send_completed(self) -> Self::OnSendCompletedFut {
  214. let value = self.multiple * self.factor;
  215. ready(TransResult::Ok((End, Completed { value })))
  216. }
  217. }
  218. #[test]
  219. fn client_callback_protocol() {
  220. ASYNC_RT.block_on(async {
  221. const SERVICE_ID: &str = "ClientCallbackProtocolListening";
  222. let factor = 21usize;
  223. let multiply_by = 2usize;
  224. let expected = multiply_by * factor;
  225. let service_id = ServiceId::from(SERVICE_ID);
  226. let service_name = {
  227. let make_init = move || ListeningState { multiply_by };
  228. register_server(&RUNTIME, service_id.clone(), make_init)
  229. .await
  230. .unwrap()
  231. };
  232. let (sender, receiver) = oneshot::channel();
  233. let client_handle = spawn_client(UnregisteredState { sender }, &RUNTIME).await;
  234. let service_addr = ServiceAddr::new(service_name, false);
  235. client_handle
  236. .send_register(service_addr, Register { factor })
  237. .await
  238. .unwrap();
  239. let value = timeout(Duration::from_millis(500), receiver)
  240. .await
  241. .unwrap()
  242. .unwrap();
  243. assert_eq!(expected, value);
  244. });
  245. }
  246. }