#![feature(impl_trait_in_assoc_type)] use std::{ any::Any, collections::HashMap, fmt::Display, future::{ready, Future, Ready}, marker::PhantomData, net::IpAddr, ops::DerefMut, pin::Pin, sync::Arc, }; use btlib::{bterr, crypto::Creds, error::StringError, BlockPath, Result}; use btmsg::{DeserCallback, MsgCallback, Receiver, Replier, Transmitter}; use btserde::{field_helpers::smart_ptr, from_slice, to_vec, write_to}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::{ sync::{mpsc, oneshot, Mutex, RwLock}, task::JoinHandle, }; use uuid::Uuid; /// Declares a new [Runtime] which listens for messages at the given IP address and uses the given /// [Creds]. Runtimes are intended to be created once in a process's lifetime and continue running /// until the process exits. #[macro_export] macro_rules! declare_runtime { ($name:ident, $ip_addr:expr, $creds:expr) => { ::lazy_static::lazy_static! { static ref $name: Runtime = _new_runtime($ip_addr, $creds).unwrap(); } }; } /// This function is not intended to be called directly by downstream crates. Use the macro /// [declare_runtime] to create a [Runtime] instead. pub fn _new_runtime( ip_addr: IpAddr, creds: Arc, ) -> Result { let path = Arc::new(creds.bind_path()?); let handles = Arc::new(RwLock::new(HashMap::new())); let callback = RuntimeCallback::new(handles.clone()); let rx = Receiver::new(ip_addr, creds, callback)?; Ok(Runtime { _rx: rx, path, handles, peers: RwLock::new(HashMap::new()), }) } /// An actor runtime. /// /// Actors can be activated by the runtime and execute autonomously until they return. Running /// actors can be sent messages using the `send` method, which does not wait for a response from the /// recipient. If a reply is needed, then `call` can be used, which returns a future that will /// be ready when the reply has been received. pub struct Runtime { _rx: Receiver, path: Arc, handles: Arc>>, peers: RwLock, Transmitter>>, } impl Runtime { pub fn path(&self) -> &Arc { &self.path } /// Returns the number of actors that are currently executing in this [Runtime]. pub async fn num_running(&self) -> usize { let guard = self.handles.read().await; guard.len() } /// Sends a message to the actor identified by the given [ActorName]. pub async fn send( &self, to: &ActorName, from: Uuid, msg: T, ) -> Result<()> { if to.path == self.path { let guard = self.handles.read().await; if let Some(handle) = guard.get(&to.act_id) { handle.send(msg).await } else { Err(bterr!("invalid actor name")) } } else { let guard = self.peers.read().await; if let Some(peer) = guard.get(&to.path) { let buf = to_vec(&msg)?; let wire_msg = WireMsg { to: to.act_id, from, payload: &buf, }; peer.send(wire_msg).await } else { // TODO: Use the filesystem to discover the address of the recipient and connect to // it. todo!() } } } /// Sends a message to the actor identified by the given [ActorName] and returns a future which /// is ready when a reply has been received. pub async fn call( &self, to: &ActorName, from: Uuid, msg: T, ) -> Result { if to.path == self.path { let guard = self.handles.read().await; if let Some(handle) = guard.get(&to.act_id) { handle.call_through(msg).await } else { Err(bterr!("invalid actor name")) } } else { let guard = self.peers.read().await; if let Some(peer) = guard.get(&to.path) { let buf = to_vec(&msg)?; let wire_msg = WireMsg { to: to.act_id, from, payload: &buf, }; peer.call(wire_msg, ReplyCallback::::new()).await? } else { // TODO: Use the filesystem to discover the address of the recipient and connect to // it. todo!() } } } /// Resolves the given [ServiceName] to an [ActorName] which is part of it. pub async fn resolve<'a>(&'a self, _service: &ServiceName) -> Result { todo!() } /// Activates a new actor using the given activator function and returns a handle to it. pub async fn activate(&'static self, activator: F) -> ActorName where Msg: 'static + CallMsg, Fut: 'static + Send + Future, F: FnOnce(&'static Runtime, mpsc::Receiver>, Uuid) -> Fut, { let mut guard = self.handles.write().await; let act_id = { let mut act_id = Uuid::new_v4(); while guard.contains_key(&act_id) { act_id = Uuid::new_v4(); } act_id }; let (tx, rx) = mpsc::channel::>(MAILBOX_LIMIT); // The deliverer closure is responsible for deserializing messages received over the wire // and delivering them to the actor's mailbox and sending replies to call messages. let deliverer = { let buffer = Arc::new(Mutex::new(Vec::::new())); let tx = tx.clone(); move |envelope: WireEnvelope| { let (wire_msg, replier) = envelope.into_parts(); let result = from_slice(wire_msg.payload); let buffer = buffer.clone(); let tx = tx.clone(); let fut: FutureResult = Box::pin(async move { let msg = result?; if let Some(mut replier) = replier { let (envelope, rx) = Envelope::call(msg); tx.send(envelope).await.map_err(|_| { bterr!("failed to deliver message. Recipient may have halted.") })?; match rx.await { Ok(reply) => { let mut guard = buffer.lock().await; guard.clear(); write_to(&reply, guard.deref_mut())?; let wire_reply = WireReply::Ok(&guard); replier.reply(wire_reply).await } Err(err) => replier.reply_err(err.to_string(), None).await, } } else { tx.send(Envelope::Send { msg }).await.map_err(|_| { bterr!("failed to deliver message. Recipient may have halted.") }) } }); fut } }; let handle = tokio::task::spawn(activator(self, rx, act_id)); let actor_handle = ActorHandle::new(handle, tx, deliverer); guard.insert(act_id, actor_handle); ActorName::new(self.path.clone(), act_id) } /// Registers an actor as a service with the given [ServiceId]. pub async fn register( &self, _id: ServiceId, _activator: F, _deserializer: G, ) -> Result<()> where Msg: 'static + CallMsg, Fut: 'static + Send + Future, F: Fn(mpsc::Receiver>, Uuid) -> Fut, G: 'static + Send + Sync + Fn(&[u8]) -> Result, { todo!() } /// Returns the [ActorHandle] for the actor with the given name. /// /// If there is no such actor in this runtime then a [RuntimeError::BadActorName] error is /// returned. /// /// Note that the actor will be aborted when the given handle is dropped (unless it has already /// returned when the handle is dropped), and no further messages will be delivered to it by /// this runtime. pub async fn take(&self, name: &ActorName) -> Result { if name.path == self.path { let mut guard = self.handles.write().await; if let Some(handle) = guard.remove(&name.act_id) { Ok(handle) } else { Err(RuntimeError::BadActorName(name.clone()).into()) } } else { Err(RuntimeError::BadActorName(name.clone()).into()) } } } impl Drop for Runtime { fn drop(&mut self) { panic!("A Runtime was dropped. Panicking to avoid undefined behavior."); } } #[derive(Debug, Clone, PartialEq, Eq)] pub enum RuntimeError { BadActorName(ActorName), } impl Display for RuntimeError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::BadActorName(name) => write!(f, "bad actor name: {name}"), } } } impl std::error::Error for RuntimeError {} /// Deserializes replies sent over the wire. struct ReplyCallback { _phantom: PhantomData, } impl ReplyCallback { fn new() -> Self { Self { _phantom: PhantomData, } } } impl DeserCallback for ReplyCallback { type Arg<'de> = WireReply<'de> where T: 'de; type Return = Result; type CallFut<'de> = Ready where T: 'de, T::Reply: 'de; fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> { let result = match arg { WireReply::Ok(slice) => from_slice(slice).map_err(|err| err.into()), WireReply::Err(msg) => Err(StringError::new(msg.to_string()).into()), }; ready(result) } } /// This struct implements the server callback for network messages. #[derive(Clone)] struct RuntimeCallback { handles: Arc>>, } impl RuntimeCallback { fn new(handles: Arc>>) -> Self { Self { handles } } } impl MsgCallback for RuntimeCallback { type Arg<'de> = WireMsg<'de>; type CallFut<'de> = impl 'de + Future>; fn call<'de>(&'de self, arg: btmsg::MsgReceived>) -> Self::CallFut<'de> { async move { let (_, body, replier) = arg.into_parts(); let guard = self.handles.read().await; if let Some(handle) = guard.get(&body.to) { let envelope = if let Some(replier) = replier { WireEnvelope::Call { msg: body, replier } } else { WireEnvelope::Send { msg: body } }; (handle.deliverer)(envelope).await } else { Err(bterr!("invalid actor ID: {}", body.to)) } } } } /// A unique identifier for a particular service. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)] pub struct ServiceId(#[serde(with = "smart_ptr")] Arc); impl From for ServiceId { fn from(value: String) -> Self { Self(Arc::new(value)) } } impl<'a> From<&'a str> for ServiceId { fn from(value: &'a str) -> Self { Self(Arc::new(value.to_owned())) } } /// A unique identifier for a service. /// /// A service is a collection of actors in the same directory which provide some functionality. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)] pub struct ServiceName { /// The path to the directory containing the service. #[serde(with = "smart_ptr")] path: Arc, /// The id of the service. service_id: ServiceId, } /// A unique identifier for a specific actor activation. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)] pub struct ActorName { /// The path to the directory containing this actor. #[serde(with = "smart_ptr")] path: Arc, /// A unique identifier for an actor activation. Even as an actor transitions to different types /// as it handles messages, this value does not change. Thus this value can be used to trace an /// actor through a series of state transitions. act_id: Uuid, } impl ActorName { pub fn new(path: Arc, act_id: Uuid) -> Self { Self { path, act_id } } } impl Display for ActorName { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}@{}", self.act_id, self.path) } } /// Trait for messages which expect exactly one reply. pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync { /// The reply type expected for this message. type Reply: Serialize + DeserializeOwned + Send + Sync; } /// Trait for messages which expect exactly zero replies. pub trait SendMsg: CallMsg {} /// The maximum number of messages which can be kept in an actor's mailbox. const MAILBOX_LIMIT: usize = 32; /// The type of messages sent over the wire between runtimes. #[derive(Serialize, Deserialize)] struct WireMsg<'a> { to: Uuid, from: Uuid, payload: &'a [u8], } impl<'a> btmsg::CallMsg<'a> for WireMsg<'a> { type Reply<'r> = WireReply<'r>; } impl<'a> btmsg::SendMsg<'a> for WireMsg<'a> {} #[derive(Serialize, Deserialize)] enum WireReply<'a> { Ok(&'a [u8]), Err(&'a str), } /// A wrapper around [WireMsg] which indicates whether a call or send was executed. enum WireEnvelope<'de> { Send { msg: WireMsg<'de> }, Call { msg: WireMsg<'de>, replier: Replier }, } impl<'de> WireEnvelope<'de> { fn into_parts(self) -> (WireMsg<'de>, Option) { match self { Self::Send { msg } => (msg, None), Self::Call { msg, replier } => (msg, Some(replier)), } } } /// Wraps a message to indicate if it was sent with `call` or `send`. /// /// If the message was sent with call, then this enum will contain a channel that can be used to /// reply to it. pub enum Envelope { /// The message was sent with `send` and does not expect a reply. Send { msg: T }, /// The message was sent with `call` and expects a reply. Call { msg: T, /// A reply message must be sent using this channel. reply: oneshot::Sender, }, } impl Envelope { fn send(msg: T) -> Self { Self::Send { msg } } fn call(msg: T) -> (Self, oneshot::Receiver) { let (tx, rx) = oneshot::channel::(); let kind = Envelope::Call { msg, reply: tx }; (kind, rx) } } type FutureResult = Pin>>>; pub struct ActorHandle { handle: Option>, sender: Box, deliverer: Box) -> FutureResult>, } impl ActorHandle { fn new(handle: JoinHandle<()>, sender: mpsc::Sender>, deliverer: F) -> Self where T: 'static + CallMsg, F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult, { Self { handle: Some(handle), sender: Box::new(sender), deliverer: Box::new(deliverer), } } fn sender(&self) -> Result<&mpsc::Sender>> { self.sender .downcast_ref::>>() .ok_or_else(|| bterr!("unexpected message type")) } pub async fn send(&self, msg: T) -> Result<()> { let sender = self.sender()?; sender .send(Envelope::send(msg)) .await .map_err(|_| bterr!("failed to enqueue message"))?; Ok(()) } pub async fn call_through(&self, msg: T) -> Result { let sender = self.sender()?; let (envelope, rx) = Envelope::call(msg); sender .send(envelope) .await .map_err(|_| bterr!("failed to enqueue call"))?; let reply = rx.await?; Ok(reply) } pub async fn returned(&mut self) -> Result<()> { if let Some(handle) = self.handle.take() { handle.await?; } Ok(()) } pub fn abort(&mut self) { if let Some(handle) = self.handle.take() { handle.abort(); } } } impl Drop for ActorHandle { fn drop(&mut self) { self.abort(); } } #[cfg(test)] mod tests { use super::*; use btlib::{ crypto::{ConcreteCreds, CredStore, CredsPriv}, log::BuilderExt, }; use btlib_tests::TEST_STORE; use btmsg::BlockAddr; use btserde::to_vec; use ctor::ctor; use lazy_static::lazy_static; use std::net::{IpAddr, Ipv4Addr}; use tokio::runtime::Builder; const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); lazy_static! { static ref RUNTIME_CREDS: Arc = TEST_STORE.node_creds().unwrap(); } declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone()); lazy_static! { /// A tokio async runtime. /// /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime /// is created for each test /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime). /// This creates a problem, because the first test thread to access the `RUNTIME` static /// will initialize its `Receiver` in its runtime, which will stop running at the end of /// the test. Hence subsequent tests will not be able to send remote messages to this /// `Runtime`. /// /// By creating a single async runtime which is used by all of the tests, we can avoid this /// problem. static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread() .enable_all() .build() .unwrap(); } /// The log level to use when running tests. const LOG_LEVEL: &str = "warn"; #[ctor] fn ctor() { std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL)); env_logger::Builder::from_default_env().btformat().init(); } #[derive(Serialize, Deserialize)] struct EchoMsg(String); impl CallMsg for EchoMsg { type Reply = EchoMsg; } async fn echo( _rt: &'static Runtime, mut mailbox: mpsc::Receiver>, _act_id: Uuid, ) { while let Some(msg) = mailbox.recv().await { if let Envelope::Call { msg, reply } = msg { if let Err(_) = reply.send(msg) { panic!("failed to send reply"); } } } } #[test] fn local_call() { ASYNC_RT.block_on(async { const EXPECTED: &str = "hello"; let name = RUNTIME.activate(echo).await; let reply = RUNTIME .call(&name, Uuid::default(), EchoMsg(EXPECTED.into())) .await .unwrap(); assert_eq!(EXPECTED, reply.0); RUNTIME.take(&name).await.unwrap(); }) } #[test] fn remote_call() { ASYNC_RT.block_on(async { const EXPECTED: &str = "hello"; let actor_name = RUNTIME.activate(echo).await; let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap()); let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path)); let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone()) .await .unwrap(); let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap(); let wire_msg = WireMsg { to: actor_name.act_id, from: Uuid::default(), payload: &buf, }; let reply = transmitter .call(wire_msg, ReplyCallback::::new()) .await .unwrap() .unwrap(); assert_eq!(EXPECTED, reply.0); RUNTIME.take(&actor_name).await.unwrap(); }); } /// Tests the `num_running` method. /// /// This test uses its own runtime and so can use the `#[tokio::test]` attribute. #[tokio::test] async fn num_running() { declare_runtime!( LOCAL_RT, // This needs to be different from the address where `RUNTIME` is listening. IpAddr::from([127, 0, 0, 2]), TEST_STORE.node_creds().unwrap() ); assert_eq!(0, LOCAL_RT.num_running().await); let name = LOCAL_RT.activate(echo).await; assert_eq!(1, LOCAL_RT.num_running().await); LOCAL_RT.take(&name).await.unwrap(); assert_eq!(0, LOCAL_RT.num_running().await); } }