|
@@ -0,0 +1,409 @@
|
|
|
|
+#![feature(impl_trait_in_assoc_type)]
|
|
|
|
+
|
|
|
|
+use std::{any::Any, collections::HashMap, future::Future, net::IpAddr, pin::Pin, sync::Arc};
|
|
|
|
+
|
|
|
|
+use btlib::{bterr, crypto::Creds, BlockPath, Result};
|
|
|
|
+use btmsg::{MsgCallback, Receiver, Replier, Transmitter};
|
|
|
|
+use btserde::field_helpers::smart_ptr;
|
|
|
|
+use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
|
|
|
+use tokio::{
|
|
|
|
+ sync::{mpsc, oneshot, RwLock},
|
|
|
|
+ task::JoinHandle,
|
|
|
|
+};
|
|
|
|
+use uuid::Uuid;
|
|
|
|
+
|
|
|
|
+/// Creates a new [Runtime] instance which listens at the given IP address and which uses the given
|
|
|
|
+/// credentials.
|
|
|
|
+pub fn new_runtime<C: 'static + Send + Sync + Creds>(
|
|
|
|
+ ip_addr: IpAddr,
|
|
|
|
+ creds: Arc<C>,
|
|
|
|
+) -> Result<Runtime<impl Receiver>> {
|
|
|
|
+ let path = Arc::new(creds.bind_path()?);
|
|
|
|
+ let handles = Arc::new(RwLock::new(HashMap::new()));
|
|
|
|
+ let callback = RuntimeCallback::new(handles.clone());
|
|
|
|
+ let rx = btmsg::receiver(ip_addr, creds, callback)?;
|
|
|
|
+ Ok(Runtime {
|
|
|
|
+ _rx: rx,
|
|
|
|
+ path,
|
|
|
|
+ handles,
|
|
|
|
+ peers: RwLock::new(HashMap::new()),
|
|
|
|
+ })
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+/// An actor runtime.
|
|
|
|
+///
|
|
|
|
+/// Actors can be activated by the runtime and execute autonomously until they halt. Running actors
|
|
|
|
+/// can be sent messages using the `send` method, which does not wait for a response from the
|
|
|
|
+/// recipient. If a reply is needed, then `call` can be used, which returns a future that will not
|
|
|
|
+/// be ready until the reply has been received.
|
|
|
|
+pub struct Runtime<Rx: Receiver> {
|
|
|
|
+ _rx: Rx,
|
|
|
|
+ path: Arc<BlockPath>,
|
|
|
|
+ handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>,
|
|
|
|
+ peers: RwLock<HashMap<Arc<BlockPath>, Rx::Transmitter>>,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+macro_rules! deliver {
|
|
|
|
+ ($self:expr, $to:expr, $msg:expr, $method:ident) => {
|
|
|
|
+ if $to.path == $self.path {
|
|
|
|
+ let guard = $self.handles.read().await;
|
|
|
|
+ if let Some(handle) = guard.get(&$to.act_id) {
|
|
|
|
+ handle.$method($msg).await
|
|
|
|
+ } else {
|
|
|
|
+ Err(bterr!("invalid actor name"))
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ let guard = $self.peers.read().await;
|
|
|
|
+ if let Some(peer) = guard.get(&$to.path) {
|
|
|
|
+ peer.$method(Adapter($msg)).await
|
|
|
|
+ } else {
|
|
|
|
+ // TODO: Use the filesystem to discover the address of the recipient and connect to
|
|
|
|
+ // it.
|
|
|
|
+ todo!()
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl<Rx: Receiver> Runtime<Rx> {
|
|
|
|
+ pub fn path(&self) -> &Arc<BlockPath> {
|
|
|
|
+ &self.path
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /// Sends a message to the actor identified by the given [ActorName].
|
|
|
|
+ pub async fn send<T: 'static + SendMsg>(&self, to: &ActorName, msg: T) -> Result<()> {
|
|
|
|
+ deliver!(self, to, msg, send)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /// Sends a message to the actor identified by the given [ActorName] and returns a future which
|
|
|
|
+ /// is ready when the reply has been received.
|
|
|
|
+ pub async fn call<T: 'static + CallMsg>(&self, to: &ActorName, msg: T) -> Result<T::Reply> {
|
|
|
|
+ deliver!(self, to, msg, call_through)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /// Resolves the given [ServiceName] to an [ActorName] which is part of it.
|
|
|
|
+ pub async fn resolve<'a>(&'a self, _service: &ServiceName) -> Result<ActorName> {
|
|
|
|
+ todo!()
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /// Activates a new actor using the given activator function and returns a handle to it.
|
|
|
|
+ pub async fn activate<Msg, F, Fut, G>(&self, activator: F, deserializer: G) -> ActorName
|
|
|
|
+ where
|
|
|
|
+ Msg: 'static + CallMsg,
|
|
|
|
+ Fut: 'static + Send + Future<Output = ()>,
|
|
|
|
+ F: FnOnce(mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
|
|
|
|
+ G: 'static + Send + Sync + Fn(&[u8]) -> Result<Msg>,
|
|
|
|
+ {
|
|
|
|
+ let mut guard = self.handles.write().await;
|
|
|
|
+ let act_id = {
|
|
|
|
+ let mut act_id = Uuid::new_v4();
|
|
|
|
+ while guard.contains_key(&act_id) {
|
|
|
|
+ act_id = Uuid::new_v4();
|
|
|
|
+ }
|
|
|
|
+ act_id
|
|
|
|
+ };
|
|
|
|
+ let (tx, rx) = mpsc::channel::<Envelope<Msg>>(MAILBOX_LIMIT);
|
|
|
|
+ // The deliverer closure is responsible for deserializing messages received over the wire
|
|
|
|
+ // and delivering them to the actor's mailbox. It's also responsible for sending replies to
|
|
|
|
+ // call messages.
|
|
|
|
+ let deliverer = {
|
|
|
|
+ let tx = tx.clone();
|
|
|
|
+ move |envelope: WireEnvelope| {
|
|
|
|
+ let (msg, replier) = envelope.into_parts();
|
|
|
|
+ let result = deserializer(msg);
|
|
|
|
+ let tx_clone = tx.clone();
|
|
|
|
+ let fut: FutureResult = Box::pin(async move {
|
|
|
|
+ let msg = result?;
|
|
|
|
+ if let Some(mut replier) = replier {
|
|
|
|
+ let (envelope, rx) = Envelope::call(msg);
|
|
|
|
+ tx_clone.send(envelope).await.map_err(|_| {
|
|
|
|
+ bterr!("failed to deliver message. Recipient may have halted.")
|
|
|
|
+ })?;
|
|
|
|
+ // TODO: `reply` does not have the right type.
|
|
|
|
+ // It needs to be WireEnvelope::Reply.
|
|
|
|
+ match rx.await {
|
|
|
|
+ Ok(reply) => replier.reply(reply).await,
|
|
|
|
+ Err(err) => replier.reply_err(err.to_string(), None).await,
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ tx_clone.send(Envelope::Send { msg }).await.map_err(|_| {
|
|
|
|
+ bterr!("failed to deliver message. Recipient may have halted.")
|
|
|
|
+ })
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ fut
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+ let handle = tokio::task::spawn(activator(rx, act_id));
|
|
|
|
+ let actor_handle = ActorHandle::new(handle, tx, deliverer);
|
|
|
|
+ guard.insert(act_id, actor_handle);
|
|
|
|
+ ActorName::new(self.path.clone(), act_id)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /// Registers an actor as a service with the given [ServiceId].
|
|
|
|
+ pub async fn register<Msg, Fut, F, G>(&self, _id: ServiceId, _activator: F, _deserializer: G) -> Result<()>
|
|
|
|
+ where
|
|
|
|
+ Msg: 'static + CallMsg,
|
|
|
|
+ Fut: 'static + Send + Future<Output = ()>,
|
|
|
|
+ F: Fn(mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
|
|
|
|
+ G: 'static + Send + Sync + Fn(&[u8]) -> Result<Msg>,
|
|
|
|
+ {
|
|
|
|
+ todo!()
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+/// A unique identifier for a particular service.
|
|
|
|
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
|
|
|
|
+pub struct ServiceId(#[serde(with = "smart_ptr")] Arc<String>);
|
|
|
|
+
|
|
|
|
+impl From<String> for ServiceId {
|
|
|
|
+ fn from(value: String) -> Self {
|
|
|
|
+ Self(Arc::new(value))
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl<'a> From<&'a str> for ServiceId {
|
|
|
|
+ fn from(value: &'a str) -> Self {
|
|
|
|
+ Self(Arc::new(value.to_owned()))
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+/// A unique identifier for a service.
|
|
|
|
+///
|
|
|
|
+/// A service is a collection of actors in the same directory which provide some functionality.
|
|
|
|
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
|
|
|
|
+pub struct ServiceName {
|
|
|
|
+ /// The path to the directory containing the service.
|
|
|
|
+ #[serde(with = "smart_ptr")]
|
|
|
|
+ path: Arc<BlockPath>,
|
|
|
|
+ /// The id of the service.
|
|
|
|
+ service_id: ServiceId,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+/// A unique identifier for a specific actor activation.
|
|
|
|
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
|
|
|
|
+pub struct ActorName {
|
|
|
|
+ /// The path to the directory containing this actor.
|
|
|
|
+ #[serde(with = "smart_ptr")]
|
|
|
|
+ path: Arc<BlockPath>,
|
|
|
|
+ /// A unique identifier for an actor activation. Even as an actor transitions to different types
|
|
|
|
+ /// as it handles messages, this value does not change. Thus this value can be used to trace an
|
|
|
|
+ /// actor through a series of state transitions.
|
|
|
|
+ act_id: Uuid,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl ActorName {
|
|
|
|
+ pub fn new(path: Arc<BlockPath>, act_id: Uuid) -> Self {
|
|
|
|
+ Self { path, act_id }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+/// Trait for messages which expect exactly one reply.
|
|
|
|
+pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync {
|
|
|
|
+ /// The reply type expected for this message.
|
|
|
|
+ type Reply: Serialize + DeserializeOwned + Send + Sync;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+/// Trait for messages which expect exactly zero replies.
|
|
|
|
+pub trait SendMsg: CallMsg {}
|
|
|
|
+
|
|
|
|
+/// An adapter which allows a [CallMsg] to be used sent remotely using a [Transmitter].
|
|
|
|
+#[derive(Serialize, Deserialize)]
|
|
|
|
+#[repr(transparent)]
|
|
|
|
+#[serde(transparent)]
|
|
|
|
+struct Adapter<T>(T);
|
|
|
|
+
|
|
|
|
+impl<'a, T: CallMsg> btmsg::CallMsg<'a> for Adapter<T> {
|
|
|
|
+ type Reply<'r> = T::Reply;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl<'a, T: SendMsg> btmsg::SendMsg<'a> for Adapter<T> {}
|
|
|
|
+
|
|
|
|
+/// The maximum number of messages which can be kept in an actor's mailbox.
|
|
|
|
+const MAILBOX_LIMIT: usize = 32;
|
|
|
|
+
|
|
|
|
+/// The type of messages sent remotely.
|
|
|
|
+enum WireEnvelope<'de> {
|
|
|
|
+ Send { msg: &'de [u8] },
|
|
|
|
+ Call { msg: &'de [u8], replier: Replier },
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl<'de> WireEnvelope<'de> {
|
|
|
|
+ fn into_parts(self) -> (&'de [u8], Option<Replier>) {
|
|
|
|
+ match self {
|
|
|
|
+ Self::Send { msg } => (msg, None),
|
|
|
|
+ Self::Call { msg, replier } => (msg, Some(replier)),
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+/// Wraps a message to indicate if it was sent with `call` or `send`.
|
|
|
|
+///
|
|
|
|
+/// If the message was sent with call, then this enum will contain a channel that can be used to
|
|
|
|
+/// reply to it.
|
|
|
|
+pub enum Envelope<T: CallMsg> {
|
|
|
|
+ /// The message was sent with `send` and does not expect a reply.
|
|
|
|
+ Send { msg: T },
|
|
|
|
+ /// The message was sent with `call` and expects a reply.
|
|
|
|
+ Call {
|
|
|
|
+ msg: T,
|
|
|
|
+ /// A reply message must be sent using this channel.
|
|
|
|
+ reply: oneshot::Sender<T::Reply>,
|
|
|
|
+ },
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl<T: CallMsg> Envelope<T> {
|
|
|
|
+ fn send(msg: T) -> Self {
|
|
|
|
+ Self::Send { msg }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fn call(msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
|
|
|
|
+ let (tx, rx) = oneshot::channel::<T::Reply>();
|
|
|
|
+ let kind = Envelope::Call { msg, reply: tx };
|
|
|
|
+ (kind, rx)
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+type FutureResult = Pin<Box<dyn Send + Future<Output = Result<()>>>>;
|
|
|
|
+
|
|
|
|
+struct ActorHandle {
|
|
|
|
+ _handle: JoinHandle<()>,
|
|
|
|
+ sender: Box<dyn Send + Sync + Any>,
|
|
|
|
+ deliverer: Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult>,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl ActorHandle {
|
|
|
|
+ fn new<T, F>(handle: JoinHandle<()>, sender: mpsc::Sender<Envelope<T>>, deliverer: F) -> Self
|
|
|
|
+ where
|
|
|
|
+ T: 'static + CallMsg,
|
|
|
|
+ F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult,
|
|
|
|
+ {
|
|
|
|
+ Self {
|
|
|
|
+ _handle: handle,
|
|
|
|
+ sender: Box::new(sender),
|
|
|
|
+ deliverer: Box::new(deliverer),
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fn sender<T: 'static + CallMsg>(&self) -> Result<&mpsc::Sender<Envelope<T>>> {
|
|
|
|
+ self.sender
|
|
|
|
+ .downcast_ref::<mpsc::Sender<Envelope<T>>>()
|
|
|
|
+ .ok_or_else(|| bterr!("unexpected message type"))
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ async fn send<T: 'static + SendMsg>(&self, msg: T) -> Result<()> {
|
|
|
|
+ let sender = self.sender()?;
|
|
|
|
+ sender
|
|
|
|
+ .send(Envelope::send(msg))
|
|
|
|
+ .await
|
|
|
|
+ .map_err(|_| bterr!("failed to enqueue message"))?;
|
|
|
|
+ Ok(())
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ async fn call_through<T: 'static + CallMsg>(&self, msg: T) -> Result<T::Reply> {
|
|
|
|
+ let sender = self.sender()?;
|
|
|
|
+ let (envelope, rx) = Envelope::call(msg);
|
|
|
|
+ sender
|
|
|
|
+ .send(envelope)
|
|
|
|
+ .await
|
|
|
|
+ .map_err(|_| bterr!("failed to enqueue call"))?;
|
|
|
|
+ let reply = rx.await?;
|
|
|
|
+ Ok(reply)
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+#[derive(Serialize, Deserialize)]
|
|
|
|
+enum WireReply<'a> {
|
|
|
|
+ Ok(&'a [u8]),
|
|
|
|
+ Err(&'a str),
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+#[derive(Serialize, Deserialize)]
|
|
|
|
+struct WireMsg<'a> {
|
|
|
|
+ act_id: Uuid,
|
|
|
|
+ payload: &'a [u8],
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl<'a> btmsg::CallMsg<'a> for WireMsg<'a> {
|
|
|
|
+ type Reply<'r> = WireReply<'r>;
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+/// This struct implements the server callback for network messages.
|
|
|
|
+#[derive(Clone)]
|
|
|
|
+struct RuntimeCallback {
|
|
|
|
+ handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>,
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl RuntimeCallback {
|
|
|
|
+ fn new(handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>) -> Self {
|
|
|
|
+ Self { handles }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+impl MsgCallback for RuntimeCallback {
|
|
|
|
+ type Arg<'de> = WireMsg<'de>;
|
|
|
|
+ type CallFut<'de> = impl 'de + Future<Output = Result<()>>;
|
|
|
|
+ fn call<'de>(&'de self, mut arg: btmsg::MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de> {
|
|
|
|
+ async move {
|
|
|
|
+ let replier = arg.take_replier();
|
|
|
|
+ let body = arg.body();
|
|
|
|
+ let guard = self.handles.read().await;
|
|
|
|
+ if let Some(handle) = guard.get(&body.act_id) {
|
|
|
|
+ let envelope = if let Some(replier) = replier {
|
|
|
|
+ WireEnvelope::Call {
|
|
|
|
+ msg: body.payload,
|
|
|
|
+ replier,
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ WireEnvelope::Send { msg: body.payload }
|
|
|
|
+ };
|
|
|
|
+ (handle.deliverer)(envelope).await
|
|
|
|
+ } else {
|
|
|
|
+ Err(bterr!("invalid actor ID: {}", body.act_id))
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+#[cfg(test)]
|
|
|
|
+mod tests {
|
|
|
|
+ use std::net::IpAddr;
|
|
|
|
+
|
|
|
|
+ use super::*;
|
|
|
|
+
|
|
|
|
+ use btlib::crypto::CredStore;
|
|
|
|
+ use btlib_tests::TEST_STORE;
|
|
|
|
+ use btserde::from_slice;
|
|
|
|
+
|
|
|
|
+ #[derive(Serialize, Deserialize)]
|
|
|
|
+ struct EchoMsg(String);
|
|
|
|
+
|
|
|
|
+ impl CallMsg for EchoMsg {
|
|
|
|
+ type Reply = EchoMsg;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ async fn echo(mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>, _act_id: Uuid) {
|
|
|
|
+ while let Some(msg) = mailbox.recv().await {
|
|
|
|
+ if let Envelope::Call { msg, reply } = msg {
|
|
|
|
+ if let Err(_) = reply.send(msg) {
|
|
|
|
+ panic!("failed to send reply");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fn echo_deserializer(slice: &[u8]) -> Result<EchoMsg> {
|
|
|
|
+ from_slice(slice).map_err(|err| err.into())
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ #[tokio::test]
|
|
|
|
+ async fn local_call() {
|
|
|
|
+ const EXPECTED: &str = "hello";
|
|
|
|
+ let ip_addr = IpAddr::from([127, 0, 0, 1]);
|
|
|
|
+ let creds = TEST_STORE.node_creds().unwrap();
|
|
|
|
+ let runtime = new_runtime(ip_addr, creds).unwrap();
|
|
|
|
+ let name = runtime.activate(echo, echo_deserializer).await;
|
|
|
|
+ let reply = runtime.call(&name, EchoMsg(EXPECTED.into())).await.unwrap();
|
|
|
|
+ assert_eq!(EXPECTED, reply.0)
|
|
|
|
+ }
|
|
|
|
+}
|