lib.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. #![feature(impl_trait_in_assoc_type)]
  2. use std::{any::Any, collections::HashMap, future::Future, net::IpAddr, pin::Pin, sync::Arc};
  3. use btlib::{bterr, crypto::Creds, BlockPath, Result};
  4. use btmsg::{MsgCallback, Receiver, Replier, Transmitter};
  5. use btserde::field_helpers::smart_ptr;
  6. use serde::{de::DeserializeOwned, Deserialize, Serialize};
  7. use tokio::{
  8. sync::{mpsc, oneshot, RwLock},
  9. task::JoinHandle,
  10. };
  11. use uuid::Uuid;
  12. /// Creates a new [Runtime] instance which listens at the given IP address and which uses the given
  13. /// credentials.
  14. pub fn new_runtime<C: 'static + Send + Sync + Creds>(
  15. ip_addr: IpAddr,
  16. creds: Arc<C>,
  17. ) -> Result<Runtime<impl Receiver>> {
  18. let path = Arc::new(creds.bind_path()?);
  19. let handles = Arc::new(RwLock::new(HashMap::new()));
  20. let callback = RuntimeCallback::new(handles.clone());
  21. let rx = btmsg::receiver(ip_addr, creds, callback)?;
  22. Ok(Runtime {
  23. _rx: rx,
  24. path,
  25. handles,
  26. peers: RwLock::new(HashMap::new()),
  27. })
  28. }
  29. /// An actor runtime.
  30. ///
  31. /// Actors can be activated by the runtime and execute autonomously until they halt. Running actors
  32. /// can be sent messages using the `send` method, which does not wait for a response from the
  33. /// recipient. If a reply is needed, then `call` can be used, which returns a future that will not
  34. /// be ready until the reply has been received.
  35. pub struct Runtime<Rx: Receiver> {
  36. _rx: Rx,
  37. path: Arc<BlockPath>,
  38. handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>,
  39. peers: RwLock<HashMap<Arc<BlockPath>, Rx::Transmitter>>,
  40. }
  41. macro_rules! deliver {
  42. ($self:expr, $to:expr, $msg:expr, $method:ident) => {
  43. if $to.path == $self.path {
  44. let guard = $self.handles.read().await;
  45. if let Some(handle) = guard.get(&$to.act_id) {
  46. handle.$method($msg).await
  47. } else {
  48. Err(bterr!("invalid actor name"))
  49. }
  50. } else {
  51. let guard = $self.peers.read().await;
  52. if let Some(peer) = guard.get(&$to.path) {
  53. peer.$method(Adapter($msg)).await
  54. } else {
  55. // TODO: Use the filesystem to discover the address of the recipient and connect to
  56. // it.
  57. todo!()
  58. }
  59. }
  60. };
  61. }
  62. impl<Rx: Receiver> Runtime<Rx> {
  63. pub fn path(&self) -> &Arc<BlockPath> {
  64. &self.path
  65. }
  66. /// Sends a message to the actor identified by the given [ActorName].
  67. pub async fn send<T: 'static + SendMsg>(&self, to: &ActorName, msg: T) -> Result<()> {
  68. deliver!(self, to, msg, send)
  69. }
  70. /// Sends a message to the actor identified by the given [ActorName] and returns a future which
  71. /// is ready when the reply has been received.
  72. pub async fn call<T: 'static + CallMsg>(&self, to: &ActorName, msg: T) -> Result<T::Reply> {
  73. deliver!(self, to, msg, call_through)
  74. }
  75. /// Resolves the given [ServiceName] to an [ActorName] which is part of it.
  76. pub async fn resolve<'a>(&'a self, _service: &ServiceName) -> Result<ActorName> {
  77. todo!()
  78. }
  79. /// Activates a new actor using the given activator function and returns a handle to it.
  80. pub async fn activate<Msg, F, Fut, G>(&self, activator: F, deserializer: G) -> ActorName
  81. where
  82. Msg: 'static + CallMsg,
  83. Fut: 'static + Send + Future<Output = ()>,
  84. F: FnOnce(mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
  85. G: 'static + Send + Sync + Fn(&[u8]) -> Result<Msg>,
  86. {
  87. let mut guard = self.handles.write().await;
  88. let act_id = {
  89. let mut act_id = Uuid::new_v4();
  90. while guard.contains_key(&act_id) {
  91. act_id = Uuid::new_v4();
  92. }
  93. act_id
  94. };
  95. let (tx, rx) = mpsc::channel::<Envelope<Msg>>(MAILBOX_LIMIT);
  96. // The deliverer closure is responsible for deserializing messages received over the wire
  97. // and delivering them to the actor's mailbox. It's also responsible for sending replies to
  98. // call messages.
  99. let deliverer = {
  100. let tx = tx.clone();
  101. move |envelope: WireEnvelope| {
  102. let (msg, replier) = envelope.into_parts();
  103. let result = deserializer(msg);
  104. let tx_clone = tx.clone();
  105. let fut: FutureResult = Box::pin(async move {
  106. let msg = result?;
  107. if let Some(mut replier) = replier {
  108. let (envelope, rx) = Envelope::call(msg);
  109. tx_clone.send(envelope).await.map_err(|_| {
  110. bterr!("failed to deliver message. Recipient may have halted.")
  111. })?;
  112. // TODO: `reply` does not have the right type.
  113. // It needs to be WireEnvelope::Reply.
  114. match rx.await {
  115. Ok(reply) => replier.reply(reply).await,
  116. Err(err) => replier.reply_err(err.to_string(), None).await,
  117. }
  118. } else {
  119. tx_clone.send(Envelope::Send { msg }).await.map_err(|_| {
  120. bterr!("failed to deliver message. Recipient may have halted.")
  121. })
  122. }
  123. });
  124. fut
  125. }
  126. };
  127. let handle = tokio::task::spawn(activator(rx, act_id));
  128. let actor_handle = ActorHandle::new(handle, tx, deliverer);
  129. guard.insert(act_id, actor_handle);
  130. ActorName::new(self.path.clone(), act_id)
  131. }
  132. /// Registers an actor as a service with the given [ServiceId].
  133. pub async fn register<Msg, Fut, F, G>(
  134. &self,
  135. _id: ServiceId,
  136. _activator: F,
  137. _deserializer: G,
  138. ) -> Result<()>
  139. where
  140. Msg: 'static + CallMsg,
  141. Fut: 'static + Send + Future<Output = ()>,
  142. F: Fn(mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
  143. G: 'static + Send + Sync + Fn(&[u8]) -> Result<Msg>,
  144. {
  145. todo!()
  146. }
  147. }
  148. /// A unique identifier for a particular service.
  149. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  150. pub struct ServiceId(#[serde(with = "smart_ptr")] Arc<String>);
  151. impl From<String> for ServiceId {
  152. fn from(value: String) -> Self {
  153. Self(Arc::new(value))
  154. }
  155. }
  156. impl<'a> From<&'a str> for ServiceId {
  157. fn from(value: &'a str) -> Self {
  158. Self(Arc::new(value.to_owned()))
  159. }
  160. }
  161. /// A unique identifier for a service.
  162. ///
  163. /// A service is a collection of actors in the same directory which provide some functionality.
  164. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  165. pub struct ServiceName {
  166. /// The path to the directory containing the service.
  167. #[serde(with = "smart_ptr")]
  168. path: Arc<BlockPath>,
  169. /// The id of the service.
  170. service_id: ServiceId,
  171. }
  172. /// A unique identifier for a specific actor activation.
  173. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  174. pub struct ActorName {
  175. /// The path to the directory containing this actor.
  176. #[serde(with = "smart_ptr")]
  177. path: Arc<BlockPath>,
  178. /// A unique identifier for an actor activation. Even as an actor transitions to different types
  179. /// as it handles messages, this value does not change. Thus this value can be used to trace an
  180. /// actor through a series of state transitions.
  181. act_id: Uuid,
  182. }
  183. impl ActorName {
  184. pub fn new(path: Arc<BlockPath>, act_id: Uuid) -> Self {
  185. Self { path, act_id }
  186. }
  187. }
  188. /// Trait for messages which expect exactly one reply.
  189. pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync {
  190. /// The reply type expected for this message.
  191. type Reply: Serialize + DeserializeOwned + Send + Sync;
  192. }
  193. /// Trait for messages which expect exactly zero replies.
  194. pub trait SendMsg: CallMsg {}
  195. /// An adapter which allows a [CallMsg] to be used sent remotely using a [Transmitter].
  196. #[derive(Serialize, Deserialize)]
  197. #[repr(transparent)]
  198. #[serde(transparent)]
  199. struct Adapter<T>(T);
  200. impl<'a, T: CallMsg> btmsg::CallMsg<'a> for Adapter<T> {
  201. type Reply<'r> = T::Reply;
  202. }
  203. impl<'a, T: SendMsg> btmsg::SendMsg<'a> for Adapter<T> {}
  204. /// The maximum number of messages which can be kept in an actor's mailbox.
  205. const MAILBOX_LIMIT: usize = 32;
  206. /// The type of messages sent remotely.
  207. enum WireEnvelope<'de> {
  208. Send { msg: &'de [u8] },
  209. Call { msg: &'de [u8], replier: Replier },
  210. }
  211. impl<'de> WireEnvelope<'de> {
  212. fn into_parts(self) -> (&'de [u8], Option<Replier>) {
  213. match self {
  214. Self::Send { msg } => (msg, None),
  215. Self::Call { msg, replier } => (msg, Some(replier)),
  216. }
  217. }
  218. }
  219. /// Wraps a message to indicate if it was sent with `call` or `send`.
  220. ///
  221. /// If the message was sent with call, then this enum will contain a channel that can be used to
  222. /// reply to it.
  223. pub enum Envelope<T: CallMsg> {
  224. /// The message was sent with `send` and does not expect a reply.
  225. Send { msg: T },
  226. /// The message was sent with `call` and expects a reply.
  227. Call {
  228. msg: T,
  229. /// A reply message must be sent using this channel.
  230. reply: oneshot::Sender<T::Reply>,
  231. },
  232. }
  233. impl<T: CallMsg> Envelope<T> {
  234. fn send(msg: T) -> Self {
  235. Self::Send { msg }
  236. }
  237. fn call(msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
  238. let (tx, rx) = oneshot::channel::<T::Reply>();
  239. let kind = Envelope::Call { msg, reply: tx };
  240. (kind, rx)
  241. }
  242. }
  243. type FutureResult = Pin<Box<dyn Send + Future<Output = Result<()>>>>;
  244. struct ActorHandle {
  245. _handle: JoinHandle<()>,
  246. sender: Box<dyn Send + Sync + Any>,
  247. deliverer: Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult>,
  248. }
  249. impl ActorHandle {
  250. fn new<T, F>(handle: JoinHandle<()>, sender: mpsc::Sender<Envelope<T>>, deliverer: F) -> Self
  251. where
  252. T: 'static + CallMsg,
  253. F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult,
  254. {
  255. Self {
  256. _handle: handle,
  257. sender: Box::new(sender),
  258. deliverer: Box::new(deliverer),
  259. }
  260. }
  261. fn sender<T: 'static + CallMsg>(&self) -> Result<&mpsc::Sender<Envelope<T>>> {
  262. self.sender
  263. .downcast_ref::<mpsc::Sender<Envelope<T>>>()
  264. .ok_or_else(|| bterr!("unexpected message type"))
  265. }
  266. async fn send<T: 'static + SendMsg>(&self, msg: T) -> Result<()> {
  267. let sender = self.sender()?;
  268. sender
  269. .send(Envelope::send(msg))
  270. .await
  271. .map_err(|_| bterr!("failed to enqueue message"))?;
  272. Ok(())
  273. }
  274. async fn call_through<T: 'static + CallMsg>(&self, msg: T) -> Result<T::Reply> {
  275. let sender = self.sender()?;
  276. let (envelope, rx) = Envelope::call(msg);
  277. sender
  278. .send(envelope)
  279. .await
  280. .map_err(|_| bterr!("failed to enqueue call"))?;
  281. let reply = rx.await?;
  282. Ok(reply)
  283. }
  284. }
  285. #[derive(Serialize, Deserialize)]
  286. enum WireReply<'a> {
  287. Ok(&'a [u8]),
  288. Err(&'a str),
  289. }
  290. #[derive(Serialize, Deserialize)]
  291. struct WireMsg<'a> {
  292. act_id: Uuid,
  293. payload: &'a [u8],
  294. }
  295. impl<'a> btmsg::CallMsg<'a> for WireMsg<'a> {
  296. type Reply<'r> = WireReply<'r>;
  297. }
  298. /// This struct implements the server callback for network messages.
  299. #[derive(Clone)]
  300. struct RuntimeCallback {
  301. handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>,
  302. }
  303. impl RuntimeCallback {
  304. fn new(handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>) -> Self {
  305. Self { handles }
  306. }
  307. }
  308. impl MsgCallback for RuntimeCallback {
  309. type Arg<'de> = WireMsg<'de>;
  310. type CallFut<'de> = impl 'de + Future<Output = Result<()>>;
  311. fn call<'de>(&'de self, mut arg: btmsg::MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de> {
  312. async move {
  313. let replier = arg.take_replier();
  314. let body = arg.body();
  315. let guard = self.handles.read().await;
  316. if let Some(handle) = guard.get(&body.act_id) {
  317. let envelope = if let Some(replier) = replier {
  318. WireEnvelope::Call {
  319. msg: body.payload,
  320. replier,
  321. }
  322. } else {
  323. WireEnvelope::Send { msg: body.payload }
  324. };
  325. (handle.deliverer)(envelope).await
  326. } else {
  327. Err(bterr!("invalid actor ID: {}", body.act_id))
  328. }
  329. }
  330. }
  331. }
  332. #[cfg(test)]
  333. mod tests {
  334. use std::net::IpAddr;
  335. use super::*;
  336. use btlib::crypto::CredStore;
  337. use btlib_tests::TEST_STORE;
  338. use btserde::from_slice;
  339. #[derive(Serialize, Deserialize)]
  340. struct EchoMsg(String);
  341. impl CallMsg for EchoMsg {
  342. type Reply = EchoMsg;
  343. }
  344. async fn echo(mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>, _act_id: Uuid) {
  345. while let Some(msg) = mailbox.recv().await {
  346. if let Envelope::Call { msg, reply } = msg {
  347. if let Err(_) = reply.send(msg) {
  348. panic!("failed to send reply");
  349. }
  350. }
  351. }
  352. }
  353. fn echo_deserializer(slice: &[u8]) -> Result<EchoMsg> {
  354. from_slice(slice).map_err(|err| err.into())
  355. }
  356. #[tokio::test]
  357. async fn local_call() {
  358. const EXPECTED: &str = "hello";
  359. let ip_addr = IpAddr::from([127, 0, 0, 1]);
  360. let creds = TEST_STORE.node_creds().unwrap();
  361. let runtime = new_runtime(ip_addr, creds).unwrap();
  362. let name = runtime.activate(echo, echo_deserializer).await;
  363. let reply = runtime.call(&name, EchoMsg(EXPECTED.into())).await.unwrap();
  364. assert_eq!(EXPECTED, reply.0)
  365. }
  366. }