123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- #![feature(impl_trait_in_assoc_type)]
- use std::{any::Any, collections::HashMap, future::Future, net::IpAddr, pin::Pin, sync::Arc};
- use btlib::{bterr, crypto::Creds, BlockPath, Result};
- use btmsg::{MsgCallback, Receiver, Replier, Transmitter};
- use btserde::field_helpers::smart_ptr;
- use serde::{de::DeserializeOwned, Deserialize, Serialize};
- use tokio::{
- sync::{mpsc, oneshot, RwLock},
- task::JoinHandle,
- };
- use uuid::Uuid;
- /// Creates a new [Runtime] instance which listens at the given IP address and which uses the given
- /// credentials.
- pub fn new_runtime<C: 'static + Send + Sync + Creds>(
- ip_addr: IpAddr,
- creds: Arc<C>,
- ) -> Result<Runtime<impl Receiver>> {
- let path = Arc::new(creds.bind_path()?);
- let handles = Arc::new(RwLock::new(HashMap::new()));
- let callback = RuntimeCallback::new(handles.clone());
- let rx = btmsg::receiver(ip_addr, creds, callback)?;
- Ok(Runtime {
- _rx: rx,
- path,
- handles,
- peers: RwLock::new(HashMap::new()),
- })
- }
- /// An actor runtime.
- ///
- /// Actors can be activated by the runtime and execute autonomously until they halt. Running actors
- /// can be sent messages using the `send` method, which does not wait for a response from the
- /// recipient. If a reply is needed, then `call` can be used, which returns a future that will not
- /// be ready until the reply has been received.
- pub struct Runtime<Rx: Receiver> {
- _rx: Rx,
- path: Arc<BlockPath>,
- handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>,
- peers: RwLock<HashMap<Arc<BlockPath>, Rx::Transmitter>>,
- }
- macro_rules! deliver {
- ($self:expr, $to:expr, $msg:expr, $method:ident) => {
- if $to.path == $self.path {
- let guard = $self.handles.read().await;
- if let Some(handle) = guard.get(&$to.act_id) {
- handle.$method($msg).await
- } else {
- Err(bterr!("invalid actor name"))
- }
- } else {
- let guard = $self.peers.read().await;
- if let Some(peer) = guard.get(&$to.path) {
- peer.$method(Adapter($msg)).await
- } else {
- // TODO: Use the filesystem to discover the address of the recipient and connect to
- // it.
- todo!()
- }
- }
- };
- }
- impl<Rx: Receiver> Runtime<Rx> {
- pub fn path(&self) -> &Arc<BlockPath> {
- &self.path
- }
- /// Sends a message to the actor identified by the given [ActorName].
- pub async fn send<T: 'static + SendMsg>(&self, to: &ActorName, msg: T) -> Result<()> {
- deliver!(self, to, msg, send)
- }
- /// Sends a message to the actor identified by the given [ActorName] and returns a future which
- /// is ready when the reply has been received.
- pub async fn call<T: 'static + CallMsg>(&self, to: &ActorName, msg: T) -> Result<T::Reply> {
- deliver!(self, to, msg, call_through)
- }
- /// Resolves the given [ServiceName] to an [ActorName] which is part of it.
- pub async fn resolve<'a>(&'a self, _service: &ServiceName) -> Result<ActorName> {
- todo!()
- }
- /// Activates a new actor using the given activator function and returns a handle to it.
- pub async fn activate<Msg, F, Fut, G>(&self, activator: F, deserializer: G) -> ActorName
- where
- Msg: 'static + CallMsg,
- Fut: 'static + Send + Future<Output = ()>,
- F: FnOnce(mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
- G: 'static + Send + Sync + Fn(&[u8]) -> Result<Msg>,
- {
- let mut guard = self.handles.write().await;
- let act_id = {
- let mut act_id = Uuid::new_v4();
- while guard.contains_key(&act_id) {
- act_id = Uuid::new_v4();
- }
- act_id
- };
- let (tx, rx) = mpsc::channel::<Envelope<Msg>>(MAILBOX_LIMIT);
- // The deliverer closure is responsible for deserializing messages received over the wire
- // and delivering them to the actor's mailbox. It's also responsible for sending replies to
- // call messages.
- let deliverer = {
- let tx = tx.clone();
- move |envelope: WireEnvelope| {
- let (msg, replier) = envelope.into_parts();
- let result = deserializer(msg);
- let tx_clone = tx.clone();
- let fut: FutureResult = Box::pin(async move {
- let msg = result?;
- if let Some(mut replier) = replier {
- let (envelope, rx) = Envelope::call(msg);
- tx_clone.send(envelope).await.map_err(|_| {
- bterr!("failed to deliver message. Recipient may have halted.")
- })?;
- // TODO: `reply` does not have the right type.
- // It needs to be WireEnvelope::Reply.
- match rx.await {
- Ok(reply) => replier.reply(reply).await,
- Err(err) => replier.reply_err(err.to_string(), None).await,
- }
- } else {
- tx_clone.send(Envelope::Send { msg }).await.map_err(|_| {
- bterr!("failed to deliver message. Recipient may have halted.")
- })
- }
- });
- fut
- }
- };
- let handle = tokio::task::spawn(activator(rx, act_id));
- let actor_handle = ActorHandle::new(handle, tx, deliverer);
- guard.insert(act_id, actor_handle);
- ActorName::new(self.path.clone(), act_id)
- }
- /// Registers an actor as a service with the given [ServiceId].
- pub async fn register<Msg, Fut, F, G>(
- &self,
- _id: ServiceId,
- _activator: F,
- _deserializer: G,
- ) -> Result<()>
- where
- Msg: 'static + CallMsg,
- Fut: 'static + Send + Future<Output = ()>,
- F: Fn(mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
- G: 'static + Send + Sync + Fn(&[u8]) -> Result<Msg>,
- {
- todo!()
- }
- }
- /// A unique identifier for a particular service.
- #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
- pub struct ServiceId(#[serde(with = "smart_ptr")] Arc<String>);
- impl From<String> for ServiceId {
- fn from(value: String) -> Self {
- Self(Arc::new(value))
- }
- }
- impl<'a> From<&'a str> for ServiceId {
- fn from(value: &'a str) -> Self {
- Self(Arc::new(value.to_owned()))
- }
- }
- /// A unique identifier for a service.
- ///
- /// A service is a collection of actors in the same directory which provide some functionality.
- #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
- pub struct ServiceName {
- /// The path to the directory containing the service.
- #[serde(with = "smart_ptr")]
- path: Arc<BlockPath>,
- /// The id of the service.
- service_id: ServiceId,
- }
- /// A unique identifier for a specific actor activation.
- #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
- pub struct ActorName {
- /// The path to the directory containing this actor.
- #[serde(with = "smart_ptr")]
- path: Arc<BlockPath>,
- /// A unique identifier for an actor activation. Even as an actor transitions to different types
- /// as it handles messages, this value does not change. Thus this value can be used to trace an
- /// actor through a series of state transitions.
- act_id: Uuid,
- }
- impl ActorName {
- pub fn new(path: Arc<BlockPath>, act_id: Uuid) -> Self {
- Self { path, act_id }
- }
- }
- /// Trait for messages which expect exactly one reply.
- pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync {
- /// The reply type expected for this message.
- type Reply: Serialize + DeserializeOwned + Send + Sync;
- }
- /// Trait for messages which expect exactly zero replies.
- pub trait SendMsg: CallMsg {}
- /// An adapter which allows a [CallMsg] to be used sent remotely using a [Transmitter].
- #[derive(Serialize, Deserialize)]
- #[repr(transparent)]
- #[serde(transparent)]
- struct Adapter<T>(T);
- impl<'a, T: CallMsg> btmsg::CallMsg<'a> for Adapter<T> {
- type Reply<'r> = T::Reply;
- }
- impl<'a, T: SendMsg> btmsg::SendMsg<'a> for Adapter<T> {}
- /// The maximum number of messages which can be kept in an actor's mailbox.
- const MAILBOX_LIMIT: usize = 32;
- /// The type of messages sent remotely.
- enum WireEnvelope<'de> {
- Send { msg: &'de [u8] },
- Call { msg: &'de [u8], replier: Replier },
- }
- impl<'de> WireEnvelope<'de> {
- fn into_parts(self) -> (&'de [u8], Option<Replier>) {
- match self {
- Self::Send { msg } => (msg, None),
- Self::Call { msg, replier } => (msg, Some(replier)),
- }
- }
- }
- /// Wraps a message to indicate if it was sent with `call` or `send`.
- ///
- /// If the message was sent with call, then this enum will contain a channel that can be used to
- /// reply to it.
- pub enum Envelope<T: CallMsg> {
- /// The message was sent with `send` and does not expect a reply.
- Send { msg: T },
- /// The message was sent with `call` and expects a reply.
- Call {
- msg: T,
- /// A reply message must be sent using this channel.
- reply: oneshot::Sender<T::Reply>,
- },
- }
- impl<T: CallMsg> Envelope<T> {
- fn send(msg: T) -> Self {
- Self::Send { msg }
- }
- fn call(msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
- let (tx, rx) = oneshot::channel::<T::Reply>();
- let kind = Envelope::Call { msg, reply: tx };
- (kind, rx)
- }
- }
- type FutureResult = Pin<Box<dyn Send + Future<Output = Result<()>>>>;
- struct ActorHandle {
- _handle: JoinHandle<()>,
- sender: Box<dyn Send + Sync + Any>,
- deliverer: Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult>,
- }
- impl ActorHandle {
- fn new<T, F>(handle: JoinHandle<()>, sender: mpsc::Sender<Envelope<T>>, deliverer: F) -> Self
- where
- T: 'static + CallMsg,
- F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult,
- {
- Self {
- _handle: handle,
- sender: Box::new(sender),
- deliverer: Box::new(deliverer),
- }
- }
- fn sender<T: 'static + CallMsg>(&self) -> Result<&mpsc::Sender<Envelope<T>>> {
- self.sender
- .downcast_ref::<mpsc::Sender<Envelope<T>>>()
- .ok_or_else(|| bterr!("unexpected message type"))
- }
- async fn send<T: 'static + SendMsg>(&self, msg: T) -> Result<()> {
- let sender = self.sender()?;
- sender
- .send(Envelope::send(msg))
- .await
- .map_err(|_| bterr!("failed to enqueue message"))?;
- Ok(())
- }
- async fn call_through<T: 'static + CallMsg>(&self, msg: T) -> Result<T::Reply> {
- let sender = self.sender()?;
- let (envelope, rx) = Envelope::call(msg);
- sender
- .send(envelope)
- .await
- .map_err(|_| bterr!("failed to enqueue call"))?;
- let reply = rx.await?;
- Ok(reply)
- }
- }
- #[derive(Serialize, Deserialize)]
- enum WireReply<'a> {
- Ok(&'a [u8]),
- Err(&'a str),
- }
- #[derive(Serialize, Deserialize)]
- struct WireMsg<'a> {
- act_id: Uuid,
- payload: &'a [u8],
- }
- impl<'a> btmsg::CallMsg<'a> for WireMsg<'a> {
- type Reply<'r> = WireReply<'r>;
- }
- /// This struct implements the server callback for network messages.
- #[derive(Clone)]
- struct RuntimeCallback {
- handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>,
- }
- impl RuntimeCallback {
- fn new(handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>) -> Self {
- Self { handles }
- }
- }
- impl MsgCallback for RuntimeCallback {
- type Arg<'de> = WireMsg<'de>;
- type CallFut<'de> = impl 'de + Future<Output = Result<()>>;
- fn call<'de>(&'de self, mut arg: btmsg::MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de> {
- async move {
- let replier = arg.take_replier();
- let body = arg.body();
- let guard = self.handles.read().await;
- if let Some(handle) = guard.get(&body.act_id) {
- let envelope = if let Some(replier) = replier {
- WireEnvelope::Call {
- msg: body.payload,
- replier,
- }
- } else {
- WireEnvelope::Send { msg: body.payload }
- };
- (handle.deliverer)(envelope).await
- } else {
- Err(bterr!("invalid actor ID: {}", body.act_id))
- }
- }
- }
- }
- #[cfg(test)]
- mod tests {
- use std::net::IpAddr;
- use super::*;
- use btlib::crypto::CredStore;
- use btlib_tests::TEST_STORE;
- use btserde::from_slice;
- #[derive(Serialize, Deserialize)]
- struct EchoMsg(String);
- impl CallMsg for EchoMsg {
- type Reply = EchoMsg;
- }
- async fn echo(mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>, _act_id: Uuid) {
- while let Some(msg) = mailbox.recv().await {
- if let Envelope::Call { msg, reply } = msg {
- if let Err(_) = reply.send(msg) {
- panic!("failed to send reply");
- }
- }
- }
- }
- fn echo_deserializer(slice: &[u8]) -> Result<EchoMsg> {
- from_slice(slice).map_err(|err| err.into())
- }
- #[tokio::test]
- async fn local_call() {
- const EXPECTED: &str = "hello";
- let ip_addr = IpAddr::from([127, 0, 0, 1]);
- let creds = TEST_STORE.node_creds().unwrap();
- let runtime = new_runtime(ip_addr, creds).unwrap();
- let name = runtime.activate(echo, echo_deserializer).await;
- let reply = runtime.call(&name, EchoMsg(EXPECTED.into())).await.unwrap();
- assert_eq!(EXPECTED, reply.0)
- }
- }
|