|
@@ -3,15 +3,15 @@
|
|
|
use std::{any::Any, fmt::Display, future::Future, pin::Pin, sync::Arc};
|
|
|
|
|
|
use btlib::{bterr, BlockPath, Result};
|
|
|
-use btserde::field_helpers::smart_ptr;
|
|
|
+use btserde::{field_helpers::smart_ptr, from_slice, write_to};
|
|
|
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
|
|
use tokio::{
|
|
|
- sync::{mpsc, oneshot},
|
|
|
+ sync::{mpsc, oneshot, Mutex},
|
|
|
task::AbortHandle,
|
|
|
};
|
|
|
use uuid::Uuid;
|
|
|
|
|
|
-use crate::WireEnvelope;
|
|
|
+use crate::{WireEnvelope, WireReply};
|
|
|
|
|
|
pub use once_cell::sync::Lazy;
|
|
|
|
|
@@ -59,7 +59,7 @@ impl Copy for TransKind {}
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
/// A struct which conveys information about where an actor error occurred to the kernel.
|
|
|
-pub struct ActorErrorCommon {
|
|
|
+pub struct ActorErrorPayload {
|
|
|
/// The ID of the actor which experienced the error.
|
|
|
pub actor_id: ActorId,
|
|
|
/// The name of the actor implementation which erred.
|
|
@@ -76,7 +76,7 @@ pub struct ActorErrorCommon {
|
|
|
pub kind: TransKind,
|
|
|
}
|
|
|
|
|
|
-impl Display for ActorErrorCommon {
|
|
|
+impl Display for ActorErrorPayload {
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
write!(
|
|
|
f,
|
|
@@ -90,34 +90,35 @@ impl Display for ActorErrorCommon {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/// An actor error which holds a [btlib::Error] to describe the error which occurred.
|
|
|
pub struct ActorError {
|
|
|
- common: ActorErrorCommon,
|
|
|
+ payload: ActorErrorPayload,
|
|
|
err: btlib::Error,
|
|
|
}
|
|
|
|
|
|
impl ActorError {
|
|
|
- pub fn new(err: btlib::Error, common: ActorErrorCommon) -> Self {
|
|
|
- Self { common, err }
|
|
|
+ pub fn new(err: btlib::Error, payload: ActorErrorPayload) -> Self {
|
|
|
+ Self { payload, err }
|
|
|
}
|
|
|
|
|
|
pub fn actor_id(&self) -> ActorId {
|
|
|
- self.common.actor_id
|
|
|
+ self.payload.actor_id
|
|
|
}
|
|
|
|
|
|
pub fn actor_impl(&self) -> &Arc<String> {
|
|
|
- &self.common.actor_impl
|
|
|
+ &self.payload.actor_impl
|
|
|
}
|
|
|
|
|
|
pub fn state(&self) -> &Arc<String> {
|
|
|
- &self.common.state
|
|
|
+ &self.payload.state
|
|
|
}
|
|
|
|
|
|
pub fn message(&self) -> &Arc<String> {
|
|
|
- &self.common.message
|
|
|
+ &self.payload.message
|
|
|
}
|
|
|
|
|
|
pub fn kind(&self) -> TransKind {
|
|
|
- self.common.kind
|
|
|
+ self.payload.kind
|
|
|
}
|
|
|
|
|
|
pub fn err(&self) -> &btlib::Error {
|
|
@@ -127,36 +128,40 @@ impl ActorError {
|
|
|
|
|
|
impl Display for ActorError {
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
- write!(f, "{}: {}", self.common, self.err)
|
|
|
+ write!(f, "{}: {}", self.payload, self.err)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/// An actor error which holds a [String] to describe the error which occurred.
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
pub struct ActorErrorStr {
|
|
|
- common: ActorErrorCommon,
|
|
|
+ payload: ActorErrorPayload,
|
|
|
+ // Note that the reason why a generic err field couldn't be added to ActorErrorPayload is
|
|
|
+ // because we need to use the smart_ptr helper only in the case where the error is an
|
|
|
+ // Arc<String>.
|
|
|
#[serde(with = "smart_ptr")]
|
|
|
err: Arc<String>,
|
|
|
}
|
|
|
|
|
|
impl ActorErrorStr {
|
|
|
pub fn actor_id(&self) -> ActorId {
|
|
|
- self.common.actor_id
|
|
|
+ self.payload.actor_id
|
|
|
}
|
|
|
|
|
|
pub fn actor_impl(&self) -> &Arc<String> {
|
|
|
- &self.common.actor_impl
|
|
|
+ &self.payload.actor_impl
|
|
|
}
|
|
|
|
|
|
pub fn state(&self) -> &Arc<String> {
|
|
|
- &self.common.state
|
|
|
+ &self.payload.state
|
|
|
}
|
|
|
|
|
|
pub fn message(&self) -> &Arc<String> {
|
|
|
- &self.common.message
|
|
|
+ &self.payload.message
|
|
|
}
|
|
|
|
|
|
pub fn kind(&self) -> TransKind {
|
|
|
- self.common.kind
|
|
|
+ self.payload.kind
|
|
|
}
|
|
|
|
|
|
pub fn err(&self) -> &Arc<String> {
|
|
@@ -166,7 +171,7 @@ impl ActorErrorStr {
|
|
|
|
|
|
impl Display for ActorErrorStr {
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
- write!(f, "{}: {}", self.common, self.err)
|
|
|
+ write!(f, "{}: {}", self.payload, self.err)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -174,14 +179,16 @@ impl From<ActorError> for ActorErrorStr {
|
|
|
fn from(value: ActorError) -> Self {
|
|
|
let err = Arc::new(value.err.to_string());
|
|
|
Self {
|
|
|
- common: value.common,
|
|
|
+ payload: value.payload,
|
|
|
err,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/// Indicates the type of panic which occurred. Either a panic with a string object, or some other
|
|
|
+/// type that we can't currently handle.
|
|
|
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
|
|
|
-pub enum PanicKind {
|
|
|
+pub(crate) enum PanicKind {
|
|
|
String(#[serde(with = "smart_ptr")] Arc<String>),
|
|
|
Unknown,
|
|
|
}
|
|
@@ -189,8 +196,7 @@ pub enum PanicKind {
|
|
|
impl PanicKind {
|
|
|
fn new(err: Box<dyn Any>) -> Self {
|
|
|
if let Ok(err) = err.downcast::<String>() {
|
|
|
- let string = *err;
|
|
|
- Self::String(Arc::new(string))
|
|
|
+ Self::String(Arc::new(*err))
|
|
|
} else {
|
|
|
Self::Unknown
|
|
|
}
|
|
@@ -206,6 +212,7 @@ impl Display for PanicKind {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+/// Conveys information about an actor which panicked.
|
|
|
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
|
|
|
pub struct ActorPanic {
|
|
|
actor_id: ActorId,
|
|
@@ -416,7 +423,7 @@ impl ActorName {
|
|
|
&self.path
|
|
|
}
|
|
|
|
|
|
- pub fn act_id(&self) -> ActorId {
|
|
|
+ pub fn actor_id(&self) -> ActorId {
|
|
|
self.act_id
|
|
|
}
|
|
|
}
|
|
@@ -436,6 +443,7 @@ pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync {
|
|
|
/// Trait for messages which expect exactly zero replies.
|
|
|
pub trait SendMsg: CallMsg {}
|
|
|
|
|
|
+/// Trait for enums which wrap individual message types.
|
|
|
pub trait MsgEnum: CallMsg + Named {}
|
|
|
|
|
|
impl<T: CallMsg + Named> MsgEnum for T {}
|
|
@@ -461,8 +469,7 @@ impl Named for ControlMsg {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-/// Wrapper around a message type `T` which indicates who the message is from and, if the message
|
|
|
-/// was dispatched with `call`, provides a channel to reply to it.
|
|
|
+/// Wrapper around a message enum `T`.
|
|
|
pub enum Envelope<T: MsgEnum> {
|
|
|
Call {
|
|
|
msg: T,
|
|
@@ -476,12 +483,13 @@ pub enum Envelope<T: MsgEnum> {
|
|
|
}
|
|
|
|
|
|
impl<T: MsgEnum> Envelope<T> {
|
|
|
- /// Creates a new envelope containing the given message which does not expect a reply.
|
|
|
+ /// Creates a new envelope of the `Send` variant containing the given message.
|
|
|
pub(super) fn new_send(from: ActorName, msg: T) -> Self {
|
|
|
Self::Send { from, msg }
|
|
|
}
|
|
|
|
|
|
/// Creates a new envelope containing the given message which expects exactly one reply.
|
|
|
+ /// The receiver end of the reply channel is also returned.
|
|
|
pub(super) fn new_call(msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
|
|
|
let (tx, rx) = oneshot::channel::<T::Reply>();
|
|
|
let envelope = Self::Call {
|
|
@@ -491,7 +499,8 @@ impl<T: MsgEnum> Envelope<T> {
|
|
|
(envelope, rx)
|
|
|
}
|
|
|
|
|
|
- /// Returns the name of the actor which sent this message.
|
|
|
+ /// Returns the name of the actor which sent this message, or [None] if this instance isn't the
|
|
|
+ /// `Send` variant.
|
|
|
pub fn from(&self) -> Option<&ActorName> {
|
|
|
match self {
|
|
|
Self::Send { from, .. } => Some(from),
|
|
@@ -499,7 +508,8 @@ impl<T: MsgEnum> Envelope<T> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// Returns a reference to the message in this envelope.
|
|
|
+ /// Returns a reference to the message in this envelope, or [None] if this instance is the
|
|
|
+ /// `Control` variant.
|
|
|
pub fn msg(&self) -> Option<&T> {
|
|
|
match self {
|
|
|
Self::Call { msg, .. } | Self::Send { msg, .. } => Some(msg),
|
|
@@ -507,6 +517,7 @@ impl<T: MsgEnum> Envelope<T> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// Returns the name of the envelope variant this instance is.
|
|
|
pub const fn name(&self) -> &'static str {
|
|
|
match self {
|
|
|
Self::Call { .. } => "Call",
|
|
@@ -515,6 +526,7 @@ impl<T: MsgEnum> Envelope<T> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// Returns the name of the message contained in this envelope.
|
|
|
pub fn msg_name(&self) -> Arc<String> {
|
|
|
match self {
|
|
|
Self::Call { msg, .. } | Self::Send { msg, .. } => msg.name(),
|
|
@@ -547,41 +559,104 @@ impl<T: MsgEnum> Envelope<T> {
|
|
|
|
|
|
pub(super) type FutureResult = Pin<Box<dyn Send + Future<Output = Result<()>>>>;
|
|
|
|
|
|
+/// An owned handle to a running actor. The actor will continue to run until this handle is dropped.
|
|
|
pub struct ActorHandle {
|
|
|
name: ActorName,
|
|
|
handle: AbortHandle,
|
|
|
+ /// A boxed trait object of type `Sender<T>`, where `T` is the message enum for this actor.
|
|
|
sender: Box<dyn Send + Sync + Any>,
|
|
|
- deliverer: Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult>,
|
|
|
+ /// A closure used to deliver messages received over the wire to the actor.
|
|
|
+ wire_deliverer: Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult>,
|
|
|
+ /// A closure used to deliver control messages to the actor.
|
|
|
ctrl_deliverer: Box<dyn Send + Sync + Fn(ControlMsg) -> FutureResult>,
|
|
|
owner: Option<ActorName>,
|
|
|
owns: Vec<ActorName>,
|
|
|
}
|
|
|
|
|
|
impl ActorHandle {
|
|
|
- pub(super) fn new<T, F, G>(
|
|
|
+ pub(super) fn new<T>(
|
|
|
name: ActorName,
|
|
|
handle: AbortHandle,
|
|
|
sender: mpsc::Sender<Envelope<T>>,
|
|
|
- deliverer: F,
|
|
|
- ctrl_deliverer: G,
|
|
|
owner: Option<ActorName>,
|
|
|
) -> Self
|
|
|
where
|
|
|
T: 'static + MsgEnum,
|
|
|
- F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult,
|
|
|
- G: 'static + Send + Sync + Fn(ControlMsg) -> FutureResult,
|
|
|
{
|
|
|
Self {
|
|
|
- name,
|
|
|
handle,
|
|
|
+ wire_deliverer: Self::wire_deliverer(sender.clone(), name.clone()),
|
|
|
+ ctrl_deliverer: Self::control_deliverer(sender.clone()),
|
|
|
+ name,
|
|
|
sender: Box::new(sender),
|
|
|
- deliverer: Box::new(deliverer),
|
|
|
- ctrl_deliverer: Box::new(ctrl_deliverer),
|
|
|
owner,
|
|
|
owns: Vec::new(),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // Makes the `wire_deliverer` field, which is responsible for deserializing messages received
|
|
|
+ // over the wire and delivering them to the actor's mailbox, as well as sending replies to call
|
|
|
+ // messages.
|
|
|
+ fn wire_deliverer<T: 'static + MsgEnum>(
|
|
|
+ sender: mpsc::Sender<Envelope<T>>,
|
|
|
+ name: ActorName,
|
|
|
+ ) -> Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult> {
|
|
|
+ let buffer = Arc::new(Mutex::new(Vec::<u8>::new()));
|
|
|
+ let closure = move |envelope: WireEnvelope| {
|
|
|
+ let (wire_msg, replier) = envelope.into_parts();
|
|
|
+ let result = from_slice(wire_msg.payload);
|
|
|
+ let buffer = buffer.clone();
|
|
|
+ let sender = sender.clone();
|
|
|
+ let act_name = name.clone();
|
|
|
+ let fut: FutureResult = Box::pin(async move {
|
|
|
+ let msg = result?;
|
|
|
+ if let Some(mut replier) = replier {
|
|
|
+ let (envelope, rx) = Envelope::new_call(msg);
|
|
|
+ sender.send(envelope).await.map_err(|_| {
|
|
|
+ bterr!("failed to deliver message. Recipient may have halted.")
|
|
|
+ })?;
|
|
|
+ match rx.await {
|
|
|
+ Ok(reply) => {
|
|
|
+ let mut guard = buffer.lock().await;
|
|
|
+ guard.clear();
|
|
|
+ write_to(&reply, &mut *guard)?;
|
|
|
+ let wire_reply = WireReply::Ok(&guard);
|
|
|
+ replier.reply(wire_reply).await
|
|
|
+ }
|
|
|
+ Err(err) => replier.reply_err(err.to_string(), None).await,
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ sender
|
|
|
+ .send(Envelope::new_send(act_name, msg))
|
|
|
+ .await
|
|
|
+ .map_err(|_| {
|
|
|
+ bterr!("failed to deliver message. Recipient may have halted.")
|
|
|
+ })
|
|
|
+ }
|
|
|
+ });
|
|
|
+ fut
|
|
|
+ };
|
|
|
+ Box::new(closure)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Makes the `ctrl_deliverer` field, which is responsible for delivering [ControlMsg]s to the
|
|
|
+ /// actor.
|
|
|
+ fn control_deliverer<T: 'static + MsgEnum>(
|
|
|
+ sender: mpsc::Sender<Envelope<T>>,
|
|
|
+ ) -> Box<dyn Send + Sync + Fn(ControlMsg) -> FutureResult> {
|
|
|
+ let closure = move |control: ControlMsg| {
|
|
|
+ let sender = sender.clone();
|
|
|
+ let fut: FutureResult = Box::pin(async move {
|
|
|
+ sender
|
|
|
+ .send(Envelope::Control(control))
|
|
|
+ .await
|
|
|
+ .map_err(|err| bterr!("{err}"))
|
|
|
+ });
|
|
|
+ fut
|
|
|
+ };
|
|
|
+ Box::new(closure)
|
|
|
+ }
|
|
|
+
|
|
|
fn sender<T: 'static + MsgEnum>(&self) -> Result<&mpsc::Sender<Envelope<T>>> {
|
|
|
self.sender
|
|
|
.downcast_ref::<mpsc::Sender<Envelope<T>>>()
|
|
@@ -592,7 +667,7 @@ impl ActorHandle {
|
|
|
&self.name
|
|
|
}
|
|
|
|
|
|
- /// Sends a message to the actor represented by this handle.
|
|
|
+ /// Sends a message to this actor.
|
|
|
pub async fn send<T: 'static + MsgEnum>(&self, from: ActorName, msg: T) -> Result<()> {
|
|
|
let sender = self.sender()?;
|
|
|
sender
|
|
@@ -602,6 +677,7 @@ impl ActorHandle {
|
|
|
Ok(())
|
|
|
}
|
|
|
|
|
|
+ /// Sends a call message to this actor and awaits the reply.
|
|
|
pub async fn call_through<T: 'static + MsgEnum>(&self, msg: T) -> Result<T::Reply> {
|
|
|
let sender = self.sender()?;
|
|
|
let (envelope, rx) = Envelope::new_call(msg);
|
|
@@ -613,22 +689,27 @@ impl ActorHandle {
|
|
|
Ok(reply)
|
|
|
}
|
|
|
|
|
|
+ /// Sends a [ControlMsg] to this actor.
|
|
|
pub(crate) async fn send_control(&self, msg: ControlMsg) -> Result<()> {
|
|
|
(self.ctrl_deliverer)(msg).await
|
|
|
}
|
|
|
|
|
|
+ /// Aborts this actor.
|
|
|
pub fn abort(&self) {
|
|
|
self.handle.abort();
|
|
|
}
|
|
|
|
|
|
+ /// Delivers a message received over the wire to this actor.
|
|
|
pub(super) async fn deliver(&self, envelope: WireEnvelope<'_>) -> Result<()> {
|
|
|
- (self.deliverer)(envelope).await
|
|
|
+ (self.wire_deliverer)(envelope).await
|
|
|
}
|
|
|
|
|
|
+ /// Takes the owner name out of this handle.
|
|
|
pub(crate) fn take_owner(&mut self) -> Option<ActorName> {
|
|
|
self.owner.take()
|
|
|
}
|
|
|
|
|
|
+ /// Returns a mutable reference to the vector of actor names which this actor owns.
|
|
|
pub(crate) fn owns_mut(&mut self) -> &mut Vec<ActorName> {
|
|
|
&mut self.owns
|
|
|
}
|
|
@@ -646,8 +727,8 @@ impl Drop for ActorHandle {
|
|
|
macro_rules! actor_name {
|
|
|
($name:literal) => {
|
|
|
fn actor_impl() -> ::std::sync::Arc<String> {
|
|
|
- use ::once_cell::sync::Lazy;
|
|
|
use ::std::sync::Arc;
|
|
|
+ use $crate::model::Lazy;
|
|
|
static NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new($name.into()));
|
|
|
NAME.clone()
|
|
|
}
|