lib.rs 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042
  1. #![feature(impl_trait_in_assoc_type)]
  2. pub mod sector_proto;
  3. pub mod fs_proto;
  4. use std::{
  5. any::Any,
  6. collections::HashMap,
  7. fmt::Display,
  8. future::{ready, Future, Ready},
  9. marker::PhantomData,
  10. net::IpAddr,
  11. ops::DerefMut,
  12. pin::Pin,
  13. sync::Arc,
  14. };
  15. use btlib::{bterr, crypto::Creds, error::StringError, BlockPath, Result};
  16. use btserde::{field_helpers::smart_ptr, from_slice, to_vec, write_to};
  17. use bttp::{DeserCallback, MsgCallback, Receiver, Replier, Transmitter};
  18. use serde::{de::DeserializeOwned, Deserialize, Serialize};
  19. use tokio::{
  20. sync::{mpsc, oneshot, Mutex, RwLock},
  21. task::JoinHandle,
  22. };
  23. use uuid::Uuid;
  24. /// Declares a new [Runtime] which listens for messages at the given IP address and uses the given
  25. /// [Creds]. Runtimes are intended to be created once in a process's lifetime and continue running
  26. /// until the process exits.
  27. #[macro_export]
  28. macro_rules! declare_runtime {
  29. ($name:ident, $ip_addr:expr, $creds:expr) => {
  30. ::lazy_static::lazy_static! {
  31. static ref $name: &'static Runtime = {
  32. ::lazy_static::lazy_static! {
  33. static ref RUNTIME: Runtime = Runtime::_new($creds).unwrap();
  34. static ref RECEIVER: Receiver = _new_receiver($ip_addr, $creds, &*RUNTIME);
  35. }
  36. // By dereferencing RECEIVER we ensure it is started.
  37. let _ = &*RECEIVER;
  38. &*RUNTIME
  39. };
  40. }
  41. };
  42. }
  43. /// This function is not intended to be called by downstream crates.
  44. #[doc(hidden)]
  45. pub fn _new_receiver<C>(ip_addr: IpAddr, creds: Arc<C>, runtime: &'static Runtime) -> Receiver
  46. where
  47. C: 'static + Send + Sync + Creds,
  48. {
  49. let callback = RuntimeCallback::new(runtime);
  50. Receiver::new(ip_addr, creds, callback).unwrap()
  51. }
  52. /// An actor runtime.
  53. ///
  54. /// Actors can be activated by the runtime and execute autonomously until they return. Running
  55. /// actors can be sent messages using the `send` method, which does not wait for a response from the
  56. /// recipient. If a reply is needed, then `call` can be used, which returns a future that will
  57. /// be ready when the reply has been received.
  58. pub struct Runtime {
  59. path: Arc<BlockPath>,
  60. handles: RwLock<HashMap<Uuid, ActorHandle>>,
  61. peers: RwLock<HashMap<Arc<BlockPath>, Transmitter>>,
  62. }
  63. impl Runtime {
  64. /// This method is not intended to be called directly by downstream crates. Use the macro
  65. /// [declare_runtime] to create a [Runtime].
  66. ///
  67. /// If you create a non-static [Runtime], your process will panic when it is dropped.
  68. #[doc(hidden)]
  69. pub fn _new<C: 'static + Send + Sync + Creds>(creds: Arc<C>) -> Result<Runtime> {
  70. let path = Arc::new(creds.bind_path()?);
  71. Ok(Runtime {
  72. path,
  73. handles: RwLock::new(HashMap::new()),
  74. peers: RwLock::new(HashMap::new()),
  75. })
  76. }
  77. pub fn path(&self) -> &Arc<BlockPath> {
  78. &self.path
  79. }
  80. /// Returns the number of actors that are currently executing in this [Runtime].
  81. pub async fn num_running(&self) -> usize {
  82. let guard = self.handles.read().await;
  83. guard.len()
  84. }
  85. /// Sends a message to the actor identified by the given [ActorName].
  86. pub async fn send<T: 'static + SendMsg>(
  87. &self,
  88. to: ActorName,
  89. from: ActorName,
  90. msg: T,
  91. ) -> Result<()> {
  92. if to.path == self.path {
  93. let guard = self.handles.read().await;
  94. if let Some(handle) = guard.get(&to.act_id) {
  95. handle.send(from, msg).await
  96. } else {
  97. Err(bterr!("invalid actor name"))
  98. }
  99. } else {
  100. let guard = self.peers.read().await;
  101. if let Some(peer) = guard.get(&to.path) {
  102. let buf = to_vec(&msg)?;
  103. let wire_msg = WireMsg {
  104. to,
  105. from,
  106. payload: &buf,
  107. };
  108. peer.send(wire_msg).await
  109. } else {
  110. // TODO: Use the filesystem to discover the address of the recipient and connect to
  111. // it.
  112. todo!()
  113. }
  114. }
  115. }
  116. /// Sends a message to the actor identified by the given [ActorName] and returns a future which
  117. /// is ready when a reply has been received.
  118. pub async fn call<T: 'static + CallMsg>(
  119. &self,
  120. to: ActorName,
  121. from: ActorName,
  122. msg: T,
  123. ) -> Result<T::Reply> {
  124. if to.path == self.path {
  125. let guard = self.handles.read().await;
  126. if let Some(handle) = guard.get(&to.act_id) {
  127. handle.call_through(from, msg).await
  128. } else {
  129. Err(bterr!("invalid actor name"))
  130. }
  131. } else {
  132. let guard = self.peers.read().await;
  133. if let Some(peer) = guard.get(&to.path) {
  134. let buf = to_vec(&msg)?;
  135. let wire_msg = WireMsg {
  136. to,
  137. from,
  138. payload: &buf,
  139. };
  140. peer.call(wire_msg, ReplyCallback::<T>::new()).await?
  141. } else {
  142. // TODO: Use the filesystem to discover the address of the recipient and connect to
  143. // it.
  144. todo!()
  145. }
  146. }
  147. }
  148. /// Resolves the given [ServiceName] to an [ActorName] which is part of it.
  149. pub async fn resolve<'a>(&'a self, _service: &ServiceName) -> Result<ActorName> {
  150. todo!()
  151. }
  152. /// Activates a new actor using the given activator function and returns a handle to it.
  153. pub async fn activate<Msg, F, Fut>(&'static self, activator: F) -> ActorName
  154. where
  155. Msg: 'static + CallMsg,
  156. Fut: 'static + Send + Future<Output = ()>,
  157. F: FnOnce(&'static Runtime, mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
  158. {
  159. let mut guard = self.handles.write().await;
  160. let act_id = {
  161. let mut act_id = Uuid::new_v4();
  162. while guard.contains_key(&act_id) {
  163. act_id = Uuid::new_v4();
  164. }
  165. act_id
  166. };
  167. let act_name = self.actor_name(act_id);
  168. let (tx, rx) = mpsc::channel::<Envelope<Msg>>(MAILBOX_LIMIT);
  169. // The deliverer closure is responsible for deserializing messages received over the wire
  170. // and delivering them to the actor's mailbox, and sending replies to call messages.
  171. let deliverer = {
  172. let buffer = Arc::new(Mutex::new(Vec::<u8>::new()));
  173. let tx = tx.clone();
  174. let act_name = act_name.clone();
  175. move |envelope: WireEnvelope| {
  176. let (wire_msg, replier) = envelope.into_parts();
  177. let result = from_slice(wire_msg.payload);
  178. let buffer = buffer.clone();
  179. let tx = tx.clone();
  180. let act_name = act_name.clone();
  181. let fut: FutureResult = Box::pin(async move {
  182. let msg = result?;
  183. if let Some(mut replier) = replier {
  184. let (envelope, rx) = Envelope::new_call(act_name, msg);
  185. tx.send(envelope).await.map_err(|_| {
  186. bterr!("failed to deliver message. Recipient may have halted.")
  187. })?;
  188. match rx.await {
  189. Ok(reply) => {
  190. let mut guard = buffer.lock().await;
  191. guard.clear();
  192. write_to(&reply, guard.deref_mut())?;
  193. let wire_reply = WireReply::Ok(&guard);
  194. replier.reply(wire_reply).await
  195. }
  196. Err(err) => replier.reply_err(err.to_string(), None).await,
  197. }
  198. } else {
  199. tx.send(Envelope::new_send(act_name, msg))
  200. .await
  201. .map_err(|_| {
  202. bterr!("failed to deliver message. Recipient may have halted.")
  203. })
  204. }
  205. });
  206. fut
  207. }
  208. };
  209. let handle = tokio::task::spawn(activator(self, rx, act_id));
  210. let actor_handle = ActorHandle::new(handle, tx, deliverer);
  211. guard.insert(act_id, actor_handle);
  212. act_name
  213. }
  214. /// Registers an actor as a service with the given [ServiceId].
  215. pub async fn register<Msg, Fut, F, G>(
  216. &self,
  217. _id: ServiceId,
  218. _activator: F,
  219. _deserializer: G,
  220. ) -> Result<()>
  221. where
  222. Msg: 'static + CallMsg,
  223. Fut: 'static + Send + Future<Output = ()>,
  224. F: Fn(mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
  225. G: 'static + Send + Sync + Fn(&[u8]) -> Result<Msg>,
  226. {
  227. todo!()
  228. }
  229. /// Returns the [ActorHandle] for the actor with the given name.
  230. ///
  231. /// If there is no such actor in this runtime then a [RuntimeError::BadActorName] error is
  232. /// returned.
  233. ///
  234. /// Note that the actor will be aborted when the given handle is dropped (unless it has already
  235. /// returned when the handle is dropped), and no further messages will be delivered to it by
  236. /// this runtime.
  237. pub async fn take(&self, name: &ActorName) -> Result<ActorHandle> {
  238. if name.path == self.path {
  239. let mut guard = self.handles.write().await;
  240. if let Some(handle) = guard.remove(&name.act_id) {
  241. Ok(handle)
  242. } else {
  243. Err(RuntimeError::BadActorName(name.clone()).into())
  244. }
  245. } else {
  246. Err(RuntimeError::BadActorName(name.clone()).into())
  247. }
  248. }
  249. /// Returns the name of the actor in this runtime with the given actor ID.
  250. pub fn actor_name(&self, act_id: Uuid) -> ActorName {
  251. ActorName::new(self.path.clone(), act_id)
  252. }
  253. }
  254. impl Drop for Runtime {
  255. fn drop(&mut self) {
  256. panic!("A Runtime was dropped. Panicking to avoid undefined behavior.");
  257. }
  258. }
  259. #[derive(Debug, Clone, PartialEq, Eq)]
  260. pub enum RuntimeError {
  261. BadActorName(ActorName),
  262. }
  263. impl Display for RuntimeError {
  264. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  265. match self {
  266. Self::BadActorName(name) => write!(f, "bad actor name: {name}"),
  267. }
  268. }
  269. }
  270. impl std::error::Error for RuntimeError {}
  271. #[allow(dead_code)]
  272. /// Represents the terminal state of an actor, where it stops processing messages and halts.
  273. struct End;
  274. #[allow(dead_code)]
  275. /// Delivered to an actor implementation when it starts up.
  276. pub struct Activate {
  277. /// A reference to the `Runtime` which is running this actor.
  278. rt: &'static Runtime,
  279. /// The ID assigned to this actor.
  280. act_id: Uuid,
  281. }
  282. /// Deserializes replies sent over the wire.
  283. struct ReplyCallback<T> {
  284. _phantom: PhantomData<T>,
  285. }
  286. impl<T: CallMsg> ReplyCallback<T> {
  287. fn new() -> Self {
  288. Self {
  289. _phantom: PhantomData,
  290. }
  291. }
  292. }
  293. impl<T: CallMsg> DeserCallback for ReplyCallback<T> {
  294. type Arg<'de> = WireReply<'de> where T: 'de;
  295. type Return = Result<T::Reply>;
  296. type CallFut<'de> = Ready<Self::Return> where T: 'de, T::Reply: 'de;
  297. fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
  298. let result = match arg {
  299. WireReply::Ok(slice) => from_slice(slice).map_err(|err| err.into()),
  300. WireReply::Err(msg) => Err(StringError::new(msg.to_string()).into()),
  301. };
  302. ready(result)
  303. }
  304. }
  305. struct SendReplyCallback {
  306. replier: Option<Replier>,
  307. }
  308. impl SendReplyCallback {
  309. fn new(replier: Replier) -> Self {
  310. Self {
  311. replier: Some(replier),
  312. }
  313. }
  314. }
  315. impl DeserCallback for SendReplyCallback {
  316. type Arg<'de> = WireReply<'de>;
  317. type Return = Result<()>;
  318. type CallFut<'de> = impl 'de + Future<Output = Self::Return>;
  319. fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
  320. async move {
  321. if let Some(mut replier) = self.replier.take() {
  322. replier.reply(arg).await
  323. } else {
  324. Ok(())
  325. }
  326. }
  327. }
  328. }
  329. /// This struct implements the server callback for network messages.
  330. #[derive(Clone)]
  331. struct RuntimeCallback {
  332. rt: &'static Runtime,
  333. }
  334. impl RuntimeCallback {
  335. fn new(rt: &'static Runtime) -> Self {
  336. Self { rt }
  337. }
  338. async fn deliver_local(&self, msg: WireMsg<'_>, replier: Option<Replier>) -> Result<()> {
  339. let guard = self.rt.handles.read().await;
  340. if let Some(handle) = guard.get(&msg.to.act_id) {
  341. let envelope = if let Some(replier) = replier {
  342. WireEnvelope::Call { msg, replier }
  343. } else {
  344. WireEnvelope::Send { msg }
  345. };
  346. (handle.deliverer)(envelope).await
  347. } else {
  348. Err(bterr!("invalid actor name: {}", msg.to))
  349. }
  350. }
  351. async fn route_msg(&self, msg: WireMsg<'_>, replier: Option<Replier>) -> Result<()> {
  352. let guard = self.rt.peers.read().await;
  353. if let Some(tx) = guard.get(msg.to.path()) {
  354. if let Some(replier) = replier {
  355. let callback = SendReplyCallback::new(replier);
  356. tx.call(msg, callback).await?
  357. } else {
  358. tx.send(msg).await
  359. }
  360. } else {
  361. Err(bterr!(
  362. "unable to deliver message to peer at '{}'",
  363. msg.to.path()
  364. ))
  365. }
  366. }
  367. }
  368. impl MsgCallback for RuntimeCallback {
  369. type Arg<'de> = WireMsg<'de>;
  370. type CallFut<'de> = impl 'de + Future<Output = Result<()>>;
  371. fn call<'de>(&'de self, arg: bttp::MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de> {
  372. async move {
  373. let (_, body, replier) = arg.into_parts();
  374. if body.to.path() == self.rt.path() {
  375. self.deliver_local(body, replier).await
  376. } else {
  377. self.route_msg(body, replier).await
  378. }
  379. }
  380. }
  381. }
  382. /// A unique identifier for a particular service.
  383. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  384. pub struct ServiceId(#[serde(with = "smart_ptr")] Arc<String>);
  385. impl From<String> for ServiceId {
  386. fn from(value: String) -> Self {
  387. Self(Arc::new(value))
  388. }
  389. }
  390. impl<'a> From<&'a str> for ServiceId {
  391. fn from(value: &'a str) -> Self {
  392. Self(Arc::new(value.to_owned()))
  393. }
  394. }
  395. /// A unique identifier for a service.
  396. ///
  397. /// A service is a collection of actors in the same directory which provide some functionality.
  398. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  399. pub struct ServiceName {
  400. /// The path to the directory containing the service.
  401. #[serde(with = "smart_ptr")]
  402. path: Arc<BlockPath>,
  403. /// The id of the service.
  404. service_id: ServiceId,
  405. }
  406. /// A unique identifier for a specific actor activation.
  407. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  408. pub struct ActorName {
  409. /// The path to the directory containing this actor.
  410. #[serde(with = "smart_ptr")]
  411. path: Arc<BlockPath>,
  412. /// A unique identifier for an actor activation. Even as an actor transitions to different types
  413. /// as it handles messages, this value does not change. Thus this value can be used to trace an
  414. /// actor through a series of state transitions.
  415. act_id: Uuid,
  416. }
  417. impl ActorName {
  418. pub fn new(path: Arc<BlockPath>, act_id: Uuid) -> Self {
  419. Self { path, act_id }
  420. }
  421. pub fn path(&self) -> &Arc<BlockPath> {
  422. &self.path
  423. }
  424. pub fn act_id(&self) -> Uuid {
  425. self.act_id
  426. }
  427. }
  428. impl Display for ActorName {
  429. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  430. write!(f, "{}@{}", self.act_id, self.path)
  431. }
  432. }
  433. /// Trait for messages which expect exactly one reply.
  434. pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync {
  435. /// The reply type expected for this message.
  436. type Reply: Serialize + DeserializeOwned + Send + Sync;
  437. }
  438. /// Trait for messages which expect exactly zero replies.
  439. pub trait SendMsg: CallMsg {}
  440. /// A type used to express when a reply is not expected for a message type.
  441. #[derive(Serialize, Deserialize)]
  442. enum NoReply {}
  443. /// The maximum number of messages which can be kept in an actor's mailbox.
  444. const MAILBOX_LIMIT: usize = 32;
  445. /// The type of messages sent over the wire between runtimes.
  446. #[derive(Serialize, Deserialize)]
  447. struct WireMsg<'a> {
  448. to: ActorName,
  449. from: ActorName,
  450. payload: &'a [u8],
  451. }
  452. impl<'a> bttp::CallMsg<'a> for WireMsg<'a> {
  453. type Reply<'r> = WireReply<'r>;
  454. }
  455. impl<'a> bttp::SendMsg<'a> for WireMsg<'a> {}
  456. #[derive(Serialize, Deserialize)]
  457. enum WireReply<'a> {
  458. Ok(&'a [u8]),
  459. Err(&'a str),
  460. }
  461. /// A wrapper around [WireMsg] which indicates whether a call or send was executed.
  462. enum WireEnvelope<'de> {
  463. Send { msg: WireMsg<'de> },
  464. Call { msg: WireMsg<'de>, replier: Replier },
  465. }
  466. impl<'de> WireEnvelope<'de> {
  467. fn into_parts(self) -> (WireMsg<'de>, Option<Replier>) {
  468. match self {
  469. Self::Send { msg } => (msg, None),
  470. Self::Call { msg, replier } => (msg, Some(replier)),
  471. }
  472. }
  473. }
  474. /// Wrapper around a message type `T` which indicates who the message is from and, if the message
  475. /// was dispatched with `call`, provides a channel to reply to it.
  476. pub struct Envelope<T: CallMsg> {
  477. from: ActorName,
  478. reply: Option<oneshot::Sender<T::Reply>>,
  479. msg: T,
  480. }
  481. impl<T: CallMsg> Envelope<T> {
  482. pub fn new(msg: T, reply: Option<oneshot::Sender<T::Reply>>, from: ActorName) -> Self {
  483. Self { from, reply, msg }
  484. }
  485. /// Creates a new envelope containing the given message which does not expect a reply.
  486. fn new_send(from: ActorName, msg: T) -> Self {
  487. Self {
  488. from,
  489. msg,
  490. reply: None,
  491. }
  492. }
  493. /// Creates a new envelope containing the given message which expects exactly one reply.
  494. fn new_call(from: ActorName, msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
  495. let (tx, rx) = oneshot::channel::<T::Reply>();
  496. let envelope = Self {
  497. from,
  498. msg,
  499. reply: Some(tx),
  500. };
  501. (envelope, rx)
  502. }
  503. /// Returns the name of the actor which sent this message.
  504. pub fn from(&self) -> &ActorName {
  505. &self.from
  506. }
  507. /// Returns a reference to the message in this envelope.
  508. pub fn msg(&self) -> &T {
  509. &self.msg
  510. }
  511. /// Sends a reply to this message.
  512. ///
  513. /// If this message is not expecting a reply, or if this message has already been replied to,
  514. /// then an error is returned.
  515. pub fn reply(&mut self, reply: T::Reply) -> Result<()> {
  516. if let Some(tx) = self.reply.take() {
  517. if tx.send(reply).is_ok() {
  518. Ok(())
  519. } else {
  520. Err(bterr!("failed to send reply"))
  521. }
  522. } else {
  523. Err(bterr!("reply already sent"))
  524. }
  525. }
  526. /// Returns true if this message expects a reply and it has not already been replied to.
  527. pub fn needs_reply(&self) -> bool {
  528. self.reply.is_some()
  529. }
  530. pub fn split(self) -> (T, Option<oneshot::Sender<T::Reply>>, ActorName) {
  531. (self.msg, self.reply, self.from)
  532. }
  533. }
  534. type FutureResult = Pin<Box<dyn Send + Future<Output = Result<()>>>>;
  535. pub struct ActorHandle {
  536. handle: Option<JoinHandle<()>>,
  537. sender: Box<dyn Send + Sync + Any>,
  538. deliverer: Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult>,
  539. }
  540. impl ActorHandle {
  541. fn new<T, F>(handle: JoinHandle<()>, sender: mpsc::Sender<Envelope<T>>, deliverer: F) -> Self
  542. where
  543. T: 'static + CallMsg,
  544. F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult,
  545. {
  546. Self {
  547. handle: Some(handle),
  548. sender: Box::new(sender),
  549. deliverer: Box::new(deliverer),
  550. }
  551. }
  552. fn sender<T: 'static + CallMsg>(&self) -> Result<&mpsc::Sender<Envelope<T>>> {
  553. self.sender
  554. .downcast_ref::<mpsc::Sender<Envelope<T>>>()
  555. .ok_or_else(|| bterr!("unexpected message type"))
  556. }
  557. /// Sends a message to the actor represented by this handle.
  558. pub async fn send<T: 'static + SendMsg>(&self, from: ActorName, msg: T) -> Result<()> {
  559. let sender = self.sender()?;
  560. sender
  561. .send(Envelope::new_send(from, msg))
  562. .await
  563. .map_err(|_| bterr!("failed to enqueue message"))?;
  564. Ok(())
  565. }
  566. pub async fn call_through<T: 'static + CallMsg>(
  567. &self,
  568. from: ActorName,
  569. msg: T,
  570. ) -> Result<T::Reply> {
  571. let sender = self.sender()?;
  572. let (envelope, rx) = Envelope::new_call(from, msg);
  573. sender
  574. .send(envelope)
  575. .await
  576. .map_err(|_| bterr!("failed to enqueue call"))?;
  577. let reply = rx.await?;
  578. Ok(reply)
  579. }
  580. pub async fn returned(&mut self) -> Result<()> {
  581. if let Some(handle) = self.handle.take() {
  582. handle.await?;
  583. }
  584. Ok(())
  585. }
  586. pub fn abort(&mut self) {
  587. if let Some(handle) = self.handle.take() {
  588. handle.abort();
  589. }
  590. }
  591. }
  592. impl Drop for ActorHandle {
  593. fn drop(&mut self) {
  594. self.abort();
  595. }
  596. }
  597. #[cfg(test)]
  598. mod tests {
  599. use super::*;
  600. use btlib::{
  601. crypto::{ConcreteCreds, CredStore, CredsPriv},
  602. log::BuilderExt,
  603. };
  604. use btlib_tests::TEST_STORE;
  605. use btproto::protocol;
  606. use btserde::to_vec;
  607. use bttp::BlockAddr;
  608. use ctor::ctor;
  609. use lazy_static::lazy_static;
  610. use std::{
  611. net::{IpAddr, Ipv4Addr},
  612. sync::atomic::{AtomicU8, Ordering},
  613. time::{Duration, Instant},
  614. };
  615. use tokio::runtime::Builder;
  616. const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
  617. lazy_static! {
  618. static ref RUNTIME_CREDS: Arc<ConcreteCreds> = TEST_STORE.node_creds().unwrap();
  619. }
  620. declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
  621. lazy_static! {
  622. /// A tokio async runtime.
  623. ///
  624. /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
  625. /// is created for each test
  626. /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
  627. /// This creates a problem, because the first test thread to access the `RUNTIME` static
  628. /// will initialize its `Receiver` in its runtime, which will stop running at the end of
  629. /// the test. Hence subsequent tests will not be able to send remote messages to this
  630. /// `Runtime`.
  631. ///
  632. /// By creating a single async runtime which is used by all of the tests, we can avoid this
  633. /// problem.
  634. static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread()
  635. .enable_all()
  636. .build()
  637. .unwrap();
  638. }
  639. /// The log level to use when running tests.
  640. const LOG_LEVEL: &str = "warn";
  641. #[ctor]
  642. fn ctor() {
  643. std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
  644. env_logger::Builder::from_default_env().btformat().init();
  645. }
  646. #[derive(Serialize, Deserialize)]
  647. struct EchoMsg(String);
  648. impl CallMsg for EchoMsg {
  649. type Reply = EchoMsg;
  650. }
  651. async fn echo(
  652. _rt: &'static Runtime,
  653. mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
  654. _act_id: Uuid,
  655. ) {
  656. while let Some(envelope) = mailbox.recv().await {
  657. let (msg, replier, ..) = envelope.split();
  658. if let Some(replier) = replier {
  659. if let Err(_) = replier.send(msg) {
  660. panic!("failed to send reply");
  661. }
  662. }
  663. }
  664. }
  665. #[test]
  666. fn local_call() {
  667. ASYNC_RT.block_on(async {
  668. const EXPECTED: &str = "hello";
  669. let name = RUNTIME.activate(echo).await;
  670. let from = ActorName::new(name.path().clone(), Uuid::default());
  671. let reply = RUNTIME
  672. .call(name.clone(), from, EchoMsg(EXPECTED.into()))
  673. .await
  674. .unwrap();
  675. assert_eq!(EXPECTED, reply.0);
  676. RUNTIME.take(&name).await.unwrap();
  677. })
  678. }
  679. #[test]
  680. fn remote_call() {
  681. ASYNC_RT.block_on(async {
  682. const EXPECTED: &str = "hello";
  683. let actor_name = RUNTIME.activate(echo).await;
  684. let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
  685. let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
  686. let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
  687. .await
  688. .unwrap();
  689. let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
  690. let wire_msg = WireMsg {
  691. to: actor_name.clone(),
  692. from: RUNTIME.actor_name(Uuid::default()),
  693. payload: &buf,
  694. };
  695. let reply = transmitter
  696. .call(wire_msg, ReplyCallback::<EchoMsg>::new())
  697. .await
  698. .unwrap()
  699. .unwrap();
  700. assert_eq!(EXPECTED, reply.0);
  701. RUNTIME.take(&actor_name).await.unwrap();
  702. });
  703. }
  704. /// Tests the `num_running` method.
  705. ///
  706. /// This test uses its own runtime and so can use the `#[tokio::test]` attribute.
  707. #[tokio::test]
  708. async fn num_running() {
  709. declare_runtime!(
  710. LOCAL_RT,
  711. // This needs to be different from the address where `RUNTIME` is listening.
  712. IpAddr::from([127, 0, 0, 2]),
  713. TEST_STORE.node_creds().unwrap()
  714. );
  715. assert_eq!(0, LOCAL_RT.num_running().await);
  716. let name = LOCAL_RT.activate(echo).await;
  717. assert_eq!(1, LOCAL_RT.num_running().await);
  718. LOCAL_RT.take(&name).await.unwrap();
  719. assert_eq!(0, LOCAL_RT.num_running().await);
  720. }
  721. // The following code is a proof-of-concept for what types should be generated for a
  722. // simple ping-pong protocol:
  723. //
  724. protocol! {
  725. let name = PingPongProtocol;
  726. let states = [
  727. ClientInit, SentPing,
  728. ServerInit, Listening,
  729. ];
  730. ClientInit?Activate -> SentPing, Listening!Ping;
  731. ServerInit?Activate -> Listening;
  732. Listening?Ping -> End, SentPing!Ping::Reply;
  733. SentPing?Ping::Reply -> End;
  734. }
  735. //
  736. // In words, the protocol is described as follows.
  737. // 1. The ClientInit state receives the Activate message. It returns the SentPing state and a
  738. // Ping message to be sent to the Listening state.
  739. // 2. The ServerInit state receives the Activate message. It returns the Listening state.
  740. // 3. When the Listening state receives the Ping message it returns the End state and a
  741. // Ping::Reply message to be sent to the SentPing state.
  742. // 4. When the SentPing state receives the Ping::Reply message it returns the End state.
  743. //
  744. // The End state represents an end to the session described by the protocol. When an actor
  745. // transitions to the End state its function returns.
  746. // The generated actor implementation is the sender of the Activate message.
  747. // When a state is expecting a Reply message, an error occurs if the message is not received
  748. // in a timely manner.
  749. #[derive(Serialize, Deserialize)]
  750. struct Ping;
  751. impl CallMsg for Ping {
  752. type Reply = PingReply;
  753. }
  754. // I was tempted to name this "Pong", but the proc macro wouldn't think to do that.
  755. #[derive(Serialize, Deserialize)]
  756. struct PingReply;
  757. trait ClientInit {
  758. type AfterActivate: SentPing;
  759. type HandleActivateFut: Future<Output = Result<(Self::AfterActivate, Ping)>>;
  760. fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
  761. }
  762. trait ServerInit {
  763. type AfterActivate: Listening;
  764. type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
  765. fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
  766. }
  767. trait Listening {
  768. type HandlePingFut: Future<Output = Result<(End, PingReply)>>;
  769. fn handle_ping(self, msg: Ping) -> Self::HandlePingFut;
  770. }
  771. trait SentPing {
  772. type HandleReplyFut: Future<Output = Result<End>>;
  773. fn handle_reply(self, msg: PingReply) -> Self::HandleReplyFut;
  774. }
  775. #[derive(Serialize, Deserialize)]
  776. enum PingProtocolMsg {
  777. Ping(Ping),
  778. PingReply(PingReply),
  779. }
  780. impl CallMsg for PingProtocolMsg {
  781. type Reply = PingProtocolMsg;
  782. }
  783. impl SendMsg for PingProtocolMsg {}
  784. struct ClientInitState;
  785. impl ClientInit for ClientInitState {
  786. type AfterActivate = ClientState;
  787. type HandleActivateFut = impl Future<Output = Result<(Self::AfterActivate, Ping)>>;
  788. fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
  789. ready(Ok((ClientState, Ping)))
  790. }
  791. }
  792. struct ClientState;
  793. impl SentPing for ClientState {
  794. type HandleReplyFut = Ready<Result<End>>;
  795. fn handle_reply(self, _msg: PingReply) -> Self::HandleReplyFut {
  796. ready(Ok(End))
  797. }
  798. }
  799. #[allow(dead_code)]
  800. enum PingClientState {
  801. Init(ClientInitState),
  802. SentPing(ClientState),
  803. End(End),
  804. }
  805. struct ServerInitState;
  806. struct ServerState;
  807. impl ServerInit for ServerInitState {
  808. type AfterActivate = ServerState;
  809. type HandleActivateFut = Ready<Result<Self::AfterActivate>>;
  810. fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
  811. ready(Ok(ServerState))
  812. }
  813. }
  814. impl Listening for ServerState {
  815. type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
  816. fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
  817. ready(Ok((End, PingReply)))
  818. }
  819. }
  820. #[allow(dead_code)]
  821. enum PingServerState {
  822. ServerInit(ServerInitState),
  823. Listening(ServerState),
  824. End(End),
  825. }
  826. async fn ping_server(
  827. counter: Arc<AtomicU8>,
  828. rt: &'static Runtime,
  829. mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
  830. act_id: Uuid,
  831. ) {
  832. let mut state = {
  833. let init = ServerInitState;
  834. let state = init.handle_activate(Activate { rt, act_id }).await.unwrap();
  835. PingServerState::Listening(state)
  836. };
  837. while let Some(envelope) = mailbox.recv().await {
  838. let (msg, replier, _from) = envelope.split();
  839. match (state, msg) {
  840. (PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => {
  841. let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
  842. state = PingServerState::End(new_state);
  843. if let Err(_) = replier.unwrap().send(PingProtocolMsg::PingReply(reply)) {
  844. panic!("Failed to send Ping reply.");
  845. }
  846. }
  847. (_prev_state, _) => {
  848. panic!("Ping protocol violation.");
  849. // A real implementation should assign the previous state and log the error.
  850. // state = prev_state;
  851. }
  852. }
  853. if let PingServerState::End(_) = state {
  854. break;
  855. }
  856. }
  857. counter.fetch_sub(1, Ordering::SeqCst);
  858. }
  859. async fn ping_client(
  860. counter: Arc<AtomicU8>,
  861. server_name: ActorName,
  862. rt: &'static Runtime,
  863. _mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
  864. act_id: Uuid,
  865. ) {
  866. let init = ClientInitState;
  867. let (state, msg) = init.handle_activate(Activate { rt, act_id }).await.unwrap();
  868. let from = rt.actor_name(act_id);
  869. let reply = rt
  870. .call(server_name, from, PingProtocolMsg::Ping(msg))
  871. .await
  872. .unwrap();
  873. if let PingProtocolMsg::PingReply(msg) = reply {
  874. state.handle_reply(msg).await.unwrap();
  875. } else {
  876. panic!("Incorrect message type sent in reply to Ping.");
  877. }
  878. counter.fetch_sub(1, Ordering::SeqCst);
  879. }
  880. #[test]
  881. fn ping_pong_test() {
  882. ASYNC_RT.block_on(async {
  883. let counter = Arc::new(AtomicU8::new(2));
  884. let server_name = {
  885. let counter = counter.clone();
  886. RUNTIME
  887. .activate(move |rt, mailbox, act_id| ping_server(counter, rt, mailbox, act_id))
  888. .await
  889. };
  890. let client_name = {
  891. let server_name = server_name.clone();
  892. let counter = counter.clone();
  893. RUNTIME
  894. .activate(move |rt, mailbox, act_id| {
  895. ping_client(counter, server_name, rt, mailbox, act_id)
  896. })
  897. .await
  898. };
  899. let deadline = Instant::now() + Duration::from_millis(500);
  900. while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline {
  901. tokio::time::sleep(Duration::from_millis(20)).await;
  902. }
  903. // Check that both tasks finished successfully and we didn't just timeout.
  904. assert_eq!(0, counter.load(Ordering::SeqCst));
  905. // TODO: Should actor which return be removed from the runtime automatically?
  906. RUNTIME.take(&server_name).await.unwrap();
  907. RUNTIME.take(&client_name).await.unwrap();
  908. });
  909. }
  910. // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
  911. // example in the survey paper "Behavioral Types in Programming Languages."
  912. // Note that the Choosing state can send messages at any time, not just in response to another
  913. // message because there is a transition from Choosing that doesn't use the receive operator
  914. // (`?`).
  915. protocol! {
  916. let name = TravelAgency;
  917. let states = [
  918. AgencyInit, Listening,
  919. Choosing,
  920. ];
  921. AgencyInit?Activate -> Listening;
  922. Choosing -> Choosing, Listening!Query|Accept|Reject;
  923. Listening?Query -> Listening, Choosing!Query::Reply;
  924. Choosing?Query::Reply -> Choosing;
  925. Listening?Accept -> End, Choosing!Accept::Reply;
  926. Choosing?Accept::Reply -> End;
  927. Listening?Reject -> End, Choosing!Reject:Reply;
  928. Choosing?Reject::Reply -> End;
  929. }
  930. }