#![feature(impl_trait_in_assoc_type)] use std::{any::Any, collections::HashMap, future::Future, net::IpAddr, pin::Pin, sync::Arc}; use btlib::{bterr, crypto::Creds, BlockPath, Result}; use btmsg::{MsgCallback, Receiver, Replier, Transmitter}; use btserde::field_helpers::smart_ptr; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::{ sync::{mpsc, oneshot, RwLock}, task::JoinHandle, }; use uuid::Uuid; /// Creates a new [Runtime] instance which listens at the given IP address and which uses the given /// credentials. pub fn new_runtime( ip_addr: IpAddr, creds: Arc, ) -> Result> { let path = Arc::new(creds.bind_path()?); let handles = Arc::new(RwLock::new(HashMap::new())); let callback = RuntimeCallback::new(handles.clone()); let rx = btmsg::receiver(ip_addr, creds, callback)?; Ok(Runtime { _rx: rx, path, handles, peers: RwLock::new(HashMap::new()), }) } /// An actor runtime. /// /// Actors can be activated by the runtime and execute autonomously until they halt. Running actors /// can be sent messages using the `send` method, which does not wait for a response from the /// recipient. If a reply is needed, then `call` can be used, which returns a future that will not /// be ready until the reply has been received. pub struct Runtime { _rx: Rx, path: Arc, handles: Arc>>, peers: RwLock, Rx::Transmitter>>, } macro_rules! deliver { ($self:expr, $to:expr, $msg:expr, $method:ident) => { if $to.path == $self.path { let guard = $self.handles.read().await; if let Some(handle) = guard.get(&$to.act_id) { handle.$method($msg).await } else { Err(bterr!("invalid actor name")) } } else { let guard = $self.peers.read().await; if let Some(peer) = guard.get(&$to.path) { peer.$method(Adapter($msg)).await } else { // TODO: Use the filesystem to discover the address of the recipient and connect to // it. todo!() } } }; } impl Runtime { pub fn path(&self) -> &Arc { &self.path } /// Sends a message to the actor identified by the given [ActorName]. pub async fn send(&self, to: &ActorName, msg: T) -> Result<()> { deliver!(self, to, msg, send) } /// Sends a message to the actor identified by the given [ActorName] and returns a future which /// is ready when the reply has been received. pub async fn call(&self, to: &ActorName, msg: T) -> Result { deliver!(self, to, msg, call_through) } /// Resolves the given [ServiceName] to an [ActorName] which is part of it. pub async fn resolve<'a>(&'a self, _service: &ServiceName) -> Result { todo!() } /// Activates a new actor using the given activator function and returns a handle to it. pub async fn activate(&self, activator: F, deserializer: G) -> ActorName where Msg: 'static + CallMsg, Fut: 'static + Send + Future, F: FnOnce(mpsc::Receiver>, Uuid) -> Fut, G: 'static + Send + Sync + Fn(&[u8]) -> Result, { let mut guard = self.handles.write().await; let act_id = { let mut act_id = Uuid::new_v4(); while guard.contains_key(&act_id) { act_id = Uuid::new_v4(); } act_id }; let (tx, rx) = mpsc::channel::>(MAILBOX_LIMIT); // The deliverer closure is responsible for deserializing messages received over the wire // and delivering them to the actor's mailbox. It's also responsible for sending replies to // call messages. let deliverer = { let tx = tx.clone(); move |envelope: WireEnvelope| { let (msg, replier) = envelope.into_parts(); let result = deserializer(msg); let tx_clone = tx.clone(); let fut: FutureResult = Box::pin(async move { let msg = result?; if let Some(mut replier) = replier { let (envelope, rx) = Envelope::call(msg); tx_clone.send(envelope).await.map_err(|_| { bterr!("failed to deliver message. Recipient may have halted.") })?; // TODO: `reply` does not have the right type. // It needs to be WireEnvelope::Reply. match rx.await { Ok(reply) => replier.reply(reply).await, Err(err) => replier.reply_err(err.to_string(), None).await, } } else { tx_clone.send(Envelope::Send { msg }).await.map_err(|_| { bterr!("failed to deliver message. Recipient may have halted.") }) } }); fut } }; let handle = tokio::task::spawn(activator(rx, act_id)); let actor_handle = ActorHandle::new(handle, tx, deliverer); guard.insert(act_id, actor_handle); ActorName::new(self.path.clone(), act_id) } /// Registers an actor as a service with the given [ServiceId]. pub async fn register( &self, _id: ServiceId, _activator: F, _deserializer: G, ) -> Result<()> where Msg: 'static + CallMsg, Fut: 'static + Send + Future, F: Fn(mpsc::Receiver>, Uuid) -> Fut, G: 'static + Send + Sync + Fn(&[u8]) -> Result, { todo!() } } /// A unique identifier for a particular service. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)] pub struct ServiceId(#[serde(with = "smart_ptr")] Arc); impl From for ServiceId { fn from(value: String) -> Self { Self(Arc::new(value)) } } impl<'a> From<&'a str> for ServiceId { fn from(value: &'a str) -> Self { Self(Arc::new(value.to_owned())) } } /// A unique identifier for a service. /// /// A service is a collection of actors in the same directory which provide some functionality. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)] pub struct ServiceName { /// The path to the directory containing the service. #[serde(with = "smart_ptr")] path: Arc, /// The id of the service. service_id: ServiceId, } /// A unique identifier for a specific actor activation. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)] pub struct ActorName { /// The path to the directory containing this actor. #[serde(with = "smart_ptr")] path: Arc, /// A unique identifier for an actor activation. Even as an actor transitions to different types /// as it handles messages, this value does not change. Thus this value can be used to trace an /// actor through a series of state transitions. act_id: Uuid, } impl ActorName { pub fn new(path: Arc, act_id: Uuid) -> Self { Self { path, act_id } } } /// Trait for messages which expect exactly one reply. pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync { /// The reply type expected for this message. type Reply: Serialize + DeserializeOwned + Send + Sync; } /// Trait for messages which expect exactly zero replies. pub trait SendMsg: CallMsg {} /// An adapter which allows a [CallMsg] to be used sent remotely using a [Transmitter]. #[derive(Serialize, Deserialize)] #[repr(transparent)] #[serde(transparent)] struct Adapter(T); impl<'a, T: CallMsg> btmsg::CallMsg<'a> for Adapter { type Reply<'r> = T::Reply; } impl<'a, T: SendMsg> btmsg::SendMsg<'a> for Adapter {} /// The maximum number of messages which can be kept in an actor's mailbox. const MAILBOX_LIMIT: usize = 32; /// The type of messages sent remotely. enum WireEnvelope<'de> { Send { msg: &'de [u8] }, Call { msg: &'de [u8], replier: Replier }, } impl<'de> WireEnvelope<'de> { fn into_parts(self) -> (&'de [u8], Option) { match self { Self::Send { msg } => (msg, None), Self::Call { msg, replier } => (msg, Some(replier)), } } } /// Wraps a message to indicate if it was sent with `call` or `send`. /// /// If the message was sent with call, then this enum will contain a channel that can be used to /// reply to it. pub enum Envelope { /// The message was sent with `send` and does not expect a reply. Send { msg: T }, /// The message was sent with `call` and expects a reply. Call { msg: T, /// A reply message must be sent using this channel. reply: oneshot::Sender, }, } impl Envelope { fn send(msg: T) -> Self { Self::Send { msg } } fn call(msg: T) -> (Self, oneshot::Receiver) { let (tx, rx) = oneshot::channel::(); let kind = Envelope::Call { msg, reply: tx }; (kind, rx) } } type FutureResult = Pin>>>; struct ActorHandle { _handle: JoinHandle<()>, sender: Box, deliverer: Box) -> FutureResult>, } impl ActorHandle { fn new(handle: JoinHandle<()>, sender: mpsc::Sender>, deliverer: F) -> Self where T: 'static + CallMsg, F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult, { Self { _handle: handle, sender: Box::new(sender), deliverer: Box::new(deliverer), } } fn sender(&self) -> Result<&mpsc::Sender>> { self.sender .downcast_ref::>>() .ok_or_else(|| bterr!("unexpected message type")) } async fn send(&self, msg: T) -> Result<()> { let sender = self.sender()?; sender .send(Envelope::send(msg)) .await .map_err(|_| bterr!("failed to enqueue message"))?; Ok(()) } async fn call_through(&self, msg: T) -> Result { let sender = self.sender()?; let (envelope, rx) = Envelope::call(msg); sender .send(envelope) .await .map_err(|_| bterr!("failed to enqueue call"))?; let reply = rx.await?; Ok(reply) } } #[derive(Serialize, Deserialize)] enum WireReply<'a> { Ok(&'a [u8]), Err(&'a str), } #[derive(Serialize, Deserialize)] struct WireMsg<'a> { act_id: Uuid, payload: &'a [u8], } impl<'a> btmsg::CallMsg<'a> for WireMsg<'a> { type Reply<'r> = WireReply<'r>; } /// This struct implements the server callback for network messages. #[derive(Clone)] struct RuntimeCallback { handles: Arc>>, } impl RuntimeCallback { fn new(handles: Arc>>) -> Self { Self { handles } } } impl MsgCallback for RuntimeCallback { type Arg<'de> = WireMsg<'de>; type CallFut<'de> = impl 'de + Future>; fn call<'de>(&'de self, mut arg: btmsg::MsgReceived>) -> Self::CallFut<'de> { async move { let replier = arg.take_replier(); let body = arg.body(); let guard = self.handles.read().await; if let Some(handle) = guard.get(&body.act_id) { let envelope = if let Some(replier) = replier { WireEnvelope::Call { msg: body.payload, replier, } } else { WireEnvelope::Send { msg: body.payload } }; (handle.deliverer)(envelope).await } else { Err(bterr!("invalid actor ID: {}", body.act_id)) } } } } #[cfg(test)] mod tests { use std::net::IpAddr; use super::*; use btlib::crypto::CredStore; use btlib_tests::TEST_STORE; use btserde::from_slice; #[derive(Serialize, Deserialize)] struct EchoMsg(String); impl CallMsg for EchoMsg { type Reply = EchoMsg; } async fn echo(mut mailbox: mpsc::Receiver>, _act_id: Uuid) { while let Some(msg) = mailbox.recv().await { if let Envelope::Call { msg, reply } = msg { if let Err(_) = reply.send(msg) { panic!("failed to send reply"); } } } } fn echo_deserializer(slice: &[u8]) -> Result { from_slice(slice).map_err(|err| err.into()) } #[tokio::test] async fn local_call() { const EXPECTED: &str = "hello"; let ip_addr = IpAddr::from([127, 0, 0, 1]); let creds = TEST_STORE.node_creds().unwrap(); let runtime = new_runtime(ip_addr, creds).unwrap(); let name = runtime.activate(echo, echo_deserializer).await; let reply = runtime.call(&name, EchoMsg(EXPECTED.into())).await.unwrap(); assert_eq!(EXPECTED, reply.0) } }