Pārlūkot izejas kodu

Added support to the btrun kernel for sending exit notifications.

Matthew Carr 1 gadu atpakaļ
vecāks
revīzija
2f75793d5d

+ 4 - 3
Cargo.lock

@@ -428,6 +428,7 @@ version = "0.1.0"
 dependencies = [
  "btlib",
  "btrun",
+ "once_cell",
  "proc-macro2",
  "quote",
  "serde",
@@ -462,8 +463,8 @@ dependencies = [
  "ctor",
  "env_logger",
  "futures",
- "lazy_static",
  "log",
+ "once_cell",
  "serde",
  "strum",
  "tokio",
@@ -1812,9 +1813,9 @@ dependencies = [
 
 [[package]]
 name = "once_cell"
-version = "1.15.0"
+version = "1.19.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e82dad04139b71a90c080c8463fe0dc7902db5192d939bd0950f074d014339e1"
+checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
 
 [[package]]
 name = "oorandom"

+ 1 - 0
crates/btproto/Cargo.toml

@@ -17,3 +17,4 @@ serde = { version = "^1.0.136", features = ["derive"] }
 
 [dev-dependencies]
 btlib = { path = "../btlib" }
+once_cell = { version = "1.19.0" }

+ 64 - 11
crates/btproto/src/generation.rs

@@ -1,7 +1,12 @@
-use proc_macro2::TokenStream;
+use std::collections::HashMap;
+
+use proc_macro2::{Ident, TokenStream};
 use quote::{format_ident, quote, ToTokens};
 
-use crate::model::{MethodModel, ProtocolModel};
+use crate::{
+    case_convert::CaseConvert,
+    model::{MethodModel, ProtocolModel},
+};
 
 impl ToTokens for ProtocolModel {
     fn to_tokens(&self, tokens: &mut TokenStream) {
@@ -13,13 +18,31 @@ impl ToTokens for ProtocolModel {
 impl ProtocolModel {
     fn generate_message_enum(&self) -> TokenStream {
         let msg_lookup = self.msg_lookup();
-        let get_variants = || msg_lookup.msg_iter().map(|msg| msg.msg_name());
-        let variants0 = get_variants();
-        let variants1 = get_variants();
-        let variant_names = get_variants().map(|variant| variant.to_string());
+        let variants: Vec<_> = msg_lookup.msg_iter().map(|msg| msg.msg_name()).collect();
+        let variant_names_map: HashMap<&Ident, String> = variants
+            .iter()
+            .map(|variant| (variant.as_ref(), variant.to_string()))
+            .collect();
         let msg_types = msg_lookup.msg_iter().map(|msg| msg.msg_type());
         let all_replies = msg_lookup.msg_iter().all(|msg| msg.is_reply());
         let enum_name = format_ident!("{}Msgs", self.def().name_def.name);
+        let enum_kinds_name = format_ident!("{}MsgKinds", self.def().name_def.name);
+        let name_decl_vec: Vec<(Ident, TokenStream)> = variants
+            .iter()
+            .map(|variant| {
+                let variant_str = variant_names_map.get(variant.as_ref()).unwrap();
+                let name_ident =
+                    format_ident!("{}_NAME", variant_str.pascal_to_snake().to_uppercase());
+                let decl = quote! {
+                    static #name_ident: Lazy<Arc<String>> = Lazy::new(|| {
+                        Arc::new(#variant_str.into())
+                    });
+                };
+                (name_ident, decl)
+            })
+            .collect();
+        let name_decls = name_decl_vec.iter().map(|(_, decl)| decl);
+        let name_idents = name_decl_vec.iter().map(|(ident, _)| ident);
         let send_impl = if all_replies {
             quote! {}
         } else {
@@ -33,13 +56,33 @@ impl ProtocolModel {
             #[doc = #doc_comment]
             #[derive(::serde::Serialize, ::serde::Deserialize)]
             pub enum #enum_name {
-                #( #variants0(#msg_types) ),*
+                #( #variants(#msg_types) ),*
+            }
+
+
+            #[derive(Clone)]
+            #[allow(dead_code)]
+            pub enum #enum_kinds_name {
+                #( #variants ),*
+            }
+
+            impl ::btrun::model::Named for #enum_kinds_name {
+                fn name(&self) -> ::std::sync::Arc<String> {
+                    use ::btrun::model::Lazy;
+                    use ::std::sync::Arc;
+                    #( #name_decls )*
+                    match self {
+                        #( Self::#variants => #name_idents.clone() ),*
+                    }
+                }
             }
 
-            impl #enum_name {
-                pub fn name(&self) -> &'static str {
+            impl Copy for #enum_kinds_name {}
+
+            impl ::btrun::model::Named for #enum_name {
+                fn name(&self) -> ::std::sync::Arc<String> {
                     match self {
-                        #( Self::#variants1(_) => #variant_names),*
+                        #( Self::#variants(_) => #enum_kinds_name::#variants.name()),*
                     }
                 }
             }
@@ -68,15 +111,25 @@ impl ProtocolModel {
             let actor_impl_method = if is_init_state {
                 quote! {
                     #[doc = "The name of the implementation for the actor this state is a part of."]
-                    fn actor_impl() -> String;
+                    fn actor_impl() -> ::std::sync::Arc<String>;
                 }
             } else {
                 quote! {}
             };
+            let trait_str = format!("{trait_ident}");
             quote! {
                 pub trait #trait_ident : Send + Sync + Sized {
                     #actor_impl_method
                     #( #method_tokens )*
+
+                    fn state_name() -> ::std::sync::Arc<String> {
+                        use ::btrun::model::Lazy;
+                        use ::std::sync::Arc;
+                        static STATE_NAME: Lazy<Arc<String>> = Lazy::new(|| {
+                            Arc::new(#trait_str.into())
+                        });
+                        STATE_NAME.clone()
+                    }
                 }
             }
             .to_tokens(&mut tokens);

+ 11 - 22
crates/btproto/tests/protocol_tests.rs

@@ -3,7 +3,10 @@
 use std::future::{ready, Ready};
 
 use btproto::protocol;
-use btrun::model::{CallMsg, End, TransResult};
+use btrun::{
+    actor_name,
+    model::{CallMsg, End, TransResult},
+};
 use serde::{Deserialize, Serialize};
 
 #[derive(Serialize, Deserialize)]
@@ -42,9 +45,7 @@ fn minimal_syntax() {
     struct ServerState;
 
     impl Server for ServerState {
-        fn actor_impl() -> String {
-            "server".into()
-        }
+        actor_name!("minimal_server");
 
         type HandleMsgFut = Ready<TransResult<Self, End>>;
         fn handle_msg(self, _msg: Msg) -> Self::HandleMsgFut {
@@ -55,9 +56,7 @@ fn minimal_syntax() {
     struct ClientState;
 
     impl Client for ClientState {
-        fn actor_impl() -> String {
-            "client".into()
-        }
+        actor_name!("minimal_client");
 
         type OnSendMsgFut = Ready<TransResult<Self, End>>;
         fn on_send_msg(self, _msg: &mut Msg) -> Self::OnSendMsgFut {
@@ -86,9 +85,7 @@ fn reply() {
     struct ListeningState;
 
     impl Listening for ListeningState {
-        fn actor_impl() -> String {
-            "server".into()
-        }
+        actor_name!("reply_server");
 
         type HandlePingListening = Self;
         type HandlePingFut = Ready<TransResult<Self, (Self, <Ping as CallMsg>::Reply)>>;
@@ -100,9 +97,7 @@ fn reply() {
     struct ClientState;
 
     impl Client for ClientState {
-        fn actor_impl() -> String {
-            "client".into()
-        }
+        actor_name!("reply_client");
 
         type OnSendPingClient = Self;
         type OnSendPingFut = Ready<TransResult<Self, (Self, <Ping as CallMsg>::Reply)>>;
@@ -140,9 +135,7 @@ fn client_callback() {
     struct UnregisteredState;
 
     impl Unregistered for UnregisteredState {
-        fn actor_impl() -> String {
-            "client".into()
-        }
+        actor_name!("callback_client");
 
         type OnSendRegisterRegistered = RegisteredState;
         type OnSendRegisterFut = Ready<TransResult<Self, Self::OnSendRegisterRegistered>>;
@@ -163,9 +156,7 @@ fn client_callback() {
     struct ListeningState;
 
     impl Listening for ListeningState {
-        fn actor_impl() -> String {
-            "server".into()
-        }
+        actor_name!("callback_server");
 
         type HandleRegisterListening = ListeningState;
         type HandleRegisterWorking = WorkingState;
@@ -178,9 +169,7 @@ fn client_callback() {
     struct WorkingState;
 
     impl Working for WorkingState {
-        fn actor_impl() -> String {
-            "worker".into()
-        }
+        actor_name!("callback_worker");
 
         type OnSendCompletedFut = Ready<TransResult<Self, (End, Completed)>>;
         fn on_send_completed(self) -> Self::OnSendCompletedFut {

+ 1 - 1
crates/btrun/Cargo.toml

@@ -16,8 +16,8 @@ uuid = { version = "1.3.3", features = ["v4", "fast-rng", "macro-diagnostics", "
 anyhow = { version = "1.0.66", features = ["std", "backtrace"] }
 bytes = "1.3.0"
 strum = { version = "^0.24.0", features = ["derive"] }
-lazy_static = { version = "1.4.0" }
 log = "0.4.17"
+once_cell = { version = "1.19.0" }
 
 [dev-dependencies]
 btlib-tests = { path = "../btlib-tests" }

+ 71 - 20
crates/btrun/src/kernel.rs

@@ -2,18 +2,19 @@
 
 use std::{future::Future, pin::Pin};
 
-use log::{debug, error};
+use log::{debug, error, warn};
 use tokio::{
     select,
     sync::{mpsc, oneshot},
     task::{AbortHandle, JoinSet},
 };
 
-use crate::ActorPanic;
+use crate::{ActorExit, ActorFault, ActorId, ControlMsg, Runtime};
 
-pub(super) type BoxedFuture = Pin<Box<dyn Send + Future<Output = ()>>>;
+pub(super) type FaultResult = std::result::Result<ActorId, ActorFault>;
+pub(super) type BoxedFuture = Pin<Box<dyn Send + Future<Output = FaultResult>>>;
 
-pub(super) struct SpawnReq {
+pub struct SpawnReq {
     task: BoxedFuture,
     sender: oneshot::Sender<AbortHandle>,
 }
@@ -21,7 +22,7 @@ pub(super) struct SpawnReq {
 impl SpawnReq {
     pub(super) fn new<Fut>(task: Fut) -> (Self, oneshot::Receiver<AbortHandle>)
     where
-        Fut: 'static + Send + Future<Output = ()>,
+        Fut: 'static + Send + Future<Output = FaultResult>,
     {
         let (sender, receiver) = oneshot::channel();
         let req = Self {
@@ -33,8 +34,8 @@ impl SpawnReq {
 }
 
 /// The kernel is responsible for spawning and supervising actors.
-pub(super) async fn kernel(mut tasks: mpsc::Receiver<SpawnReq>) {
-    let mut running = JoinSet::<()>::new();
+pub(super) async fn kernel(runtime: &Runtime, mut tasks: mpsc::Receiver<SpawnReq>) {
+    let mut running = JoinSet::<FaultResult>::new();
 
     loop {
         select! {
@@ -49,23 +50,73 @@ pub(super) async fn kernel(mut tasks: mpsc::Receiver<SpawnReq>) {
                     None => panic!("The Runtime was dropped!")
                 };
             }
-            Some(Err(join_error)) = running.join_next() => {
-                match join_error.try_into_panic() {
-                    Ok(panic_obj) => {
-                        if let Some(actor_panic) = panic_obj.downcast_ref::<ActorPanic>() {
-                            error!("{actor_panic}")
-                        } else if let Some(message) = panic_obj.downcast_ref::<String>() {
-                            error!("Actor panicked in an unknown state: {message}");
-                        } else {
-                            error!("Actor panicked for an unknown reason.");
-                        }
+            Some(result) = running.join_next() => {
+                match result {
+                    Ok(actor_result) => {
+                        handle_actor_exit(actor_result, runtime).await;
                     }
                     Err(join_error) => {
-                        debug!("Actor was aborted: {join_error}");
+                        match join_error.try_into_panic() {
+                            Ok(panic_obj) => {
+                                if let Some(message) = panic_obj.downcast_ref::<String>() {
+                                    error!("An actor panic was uncaught: {message}");
+                                } else {
+                                    error!("An actor panic was uncaught.");
+                                }
+                            }
+                            Err(join_error) => {
+                                debug!("Actor was aborted: {join_error}");
+                            }
+                        };
                     }
-                };
-
+                }
             }
         }
     }
 }
+
+/// Handles the termination of an actor by notifying the actor its owned by and all the actors it
+/// owns.
+async fn handle_actor_exit(actor_result: FaultResult, runtime: &Runtime) {
+    let (actor_id, actor_exit) = match actor_result {
+        Ok(actor_id) => {
+            debug!("Actor {actor_id} exited normally.");
+            (actor_id, ActorExit::Ok)
+        }
+        Err(err) => {
+            error!("{err}");
+            (err.actor_id(), ActorExit::from(err))
+        }
+    };
+
+    let mut handle = {
+        let mut handles = runtime.handles.write().await;
+        if let Some(handle) = handles.remove(&actor_id) {
+            handle
+        } else {
+            warn!("Exiting actor {actor_id} was has been taken from the runtime before notification could be sent.");
+            return;
+        }
+    };
+
+    if let Some(owner) = handle.take_owner() {
+        let handle_name = handle.name();
+        let msg = ControlMsg::OwnedExited {
+            name: handle_name.clone(),
+            exit: actor_exit.clone(),
+        };
+        let result = runtime.send_control(owner.clone(), msg).await;
+        if let Err(err) = result {
+            error!("Failed to notify owner {owner} that {handle_name} exited: {err}");
+        }
+    }
+
+    let handle_name = handle.name().clone();
+    for owned in handle.owns_mut().drain(0..) {
+        let msg = ControlMsg::OwnerExited(actor_exit.clone());
+        let result = runtime.send_control(owned.clone(), msg).await;
+        if let Err(err) = result {
+            error!("Failed to notify owned actor {owned} that {handle_name} exited: {err}");
+        }
+    }
+}

+ 152 - 222
crates/btrun/src/lib.rs

@@ -1,28 +1,28 @@
 #![feature(impl_trait_in_assoc_type)]
 
 use std::{
-    any::Any,
     collections::{hash_map, HashMap},
     fmt::Display,
     future::{ready, Future, Ready},
     marker::PhantomData,
     net::IpAddr,
     ops::DerefMut,
+    panic::AssertUnwindSafe,
     pin::Pin,
-    sync::Arc,
+    sync::{Arc, Mutex as SyncMutex},
 };
 
 use btlib::{bterr, crypto::Creds, error::StringError, BlockPath, Result};
 use btserde::{from_slice, to_vec, write_to};
-use bttp::{DeserCallback, MsgCallback, Receiver, Replier, Transmitter};
-use kernel::{kernel, SpawnReq};
+use bttp::{DeserCallback, MsgCallback, Replier, Transmitter};
+use futures::FutureExt;
 use serde::{Deserialize, Serialize};
-use tokio::{
-    sync::{mpsc, oneshot, Mutex, RwLock},
-    task::AbortHandle,
-};
+use tokio::sync::{mpsc, Mutex, RwLock};
 
+pub use bttp::Receiver;
 mod kernel;
+pub use kernel::SpawnReq;
+use kernel::{kernel, FaultResult};
 pub mod model;
 use model::*;
 
@@ -32,17 +32,17 @@ use model::*;
 #[macro_export]
 macro_rules! declare_runtime {
     ($name:ident, $ip_addr:expr, $creds:expr) => {
-        ::lazy_static::lazy_static! {
-            static ref $name: &'static $crate::Runtime = {
-                ::lazy_static::lazy_static! {
-                    static ref RUNTIME: $crate::Runtime = $crate::Runtime::_new($creds).unwrap();
-                    static ref RECEIVER: ::bttp::Receiver = _new_receiver($ip_addr, $creds, &*RUNTIME);
-                }
-                // By dereferencing RECEIVER we ensure it is started.
-                let _ = &*RECEIVER;
-                &*RUNTIME
-            };
-        }
+        static $name: $crate::model::Lazy<&'static $crate::Runtime> = $crate::model::Lazy::new(|| {
+            use $crate::{Runtime, Receiver, model::Lazy};
+            static RUNTIME: Lazy<Runtime> = Lazy::new(|| Runtime::_new($creds).unwrap());
+            static RECEIVER: Lazy<Receiver> =
+                Lazy::new(|| _new_receiver($ip_addr, $creds, &*RUNTIME));
+            // Start the kernel task.
+            RUNTIME._spawn_kernel();
+            // By dereferencing RECEIVER we ensure it's started.
+            let _ = &*RECEIVER;
+            &*RUNTIME
+        });
     };
 }
 
@@ -71,6 +71,7 @@ pub struct Runtime {
     peers: RwLock<HashMap<Arc<BlockPath>, Transmitter>>,
     registry: RwLock<HashMap<ServiceId, ServiceRecord>>,
     kernel_sender: mpsc::Sender<SpawnReq>,
+    kernel_receiver: SyncMutex<Option<mpsc::Receiver<SpawnReq>>>,
 }
 
 impl Runtime {
@@ -86,16 +87,25 @@ impl Runtime {
     pub fn _new<C: 'static + Send + Sync + Creds>(creds: Arc<C>) -> Result<Runtime> {
         let path = Arc::new(creds.bind_path()?);
         let (sender, receiver) = mpsc::channel(Self::SPAWN_REQ_BUF_SZ);
-        tokio::task::spawn(kernel(receiver));
         Ok(Runtime {
             path,
             handles: RwLock::new(HashMap::new()),
             peers: RwLock::new(HashMap::new()),
             registry: RwLock::new(HashMap::new()),
             kernel_sender: sender,
+            kernel_receiver: SyncMutex::new(Some(receiver)),
         })
     }
 
+    /// This method is not intended to be called directly by downstream crates. It's used
+    /// by the [declare_runtime] macro to start a runtime's kernel task. If you call it,
+    /// it will panic.
+    #[doc(hidden)]
+    pub fn _spawn_kernel(&'static self) {
+        let receiver = self.kernel_receiver.lock().unwrap().take().unwrap();
+        tokio::task::spawn(kernel(self, receiver));
+    }
+
     pub fn path(&self) -> &Arc<BlockPath> {
         &self.path
     }
@@ -106,8 +116,21 @@ impl Runtime {
         guard.len()
     }
 
+    pub(crate) async fn send_control(&self, to: ActorName, msg: ControlMsg) -> Result<()> {
+        if to.path().as_ref() == self.path.as_ref() {
+            let handles = self.handles.read().await;
+            if let Some(handle) = handles.get(&to.act_id()) {
+                handle.send_control(msg).await
+            } else {
+                Err(RuntimeError::BadActorName(to).into())
+            }
+        } else {
+            todo!("Deliver the message remotely.");
+        }
+    }
+
     /// Sends a message to the actor identified by the given [ActorName].
-    pub async fn send<T: 'static + SendMsg>(
+    pub async fn send<T: 'static + MsgEnum>(
         &self,
         to: ActorName,
         from: ActorName,
@@ -118,7 +141,7 @@ impl Runtime {
             if let Some(handle) = guard.get(&to.act_id()) {
                 handle.send(from, msg).await
             } else {
-                Err(bterr!("invalid actor name"))
+                Err(RuntimeError::BadActorName(to).into())
             }
         } else {
             let guard = self.peers.read().await;
@@ -137,7 +160,7 @@ impl Runtime {
     }
 
     /// Sends a message to the service identified by [ServiceName].
-    pub async fn send_service<T: 'static + SendMsg>(
+    pub async fn send_service<T: 'static + MsgEnum>(
         &'static self,
         to: ServiceAddr,
         from: ActorName,
@@ -162,7 +185,7 @@ impl Runtime {
 
     /// Sends a message to the actor identified by the given [ActorName] and returns a future which
     /// is ready when a reply has been received.
-    pub async fn call<T: 'static + CallMsg>(
+    pub async fn call<T: 'static + MsgEnum>(
         &self,
         to: ActorName,
         from: ActorName,
@@ -192,7 +215,7 @@ impl Runtime {
     }
 
     /// Calls a service identified by [ServiceName].
-    pub async fn call_service<T: 'static + CallMsg>(
+    pub async fn call_service<T: 'static + MsgEnum>(
         &'static self,
         to: ServiceAddr,
         msg: T,
@@ -251,16 +274,16 @@ impl Runtime {
     }
 
     /// Spawns a new actor using the given activator function and returns a handle to it.
-    pub async fn spawn<Msg, F, Fut>(&'static self, activator: F) -> ActorName
+    pub async fn spawn<Msg, F, Fut>(&'static self, owner: Option<ActorName>, actor: F) -> ActorName
     where
-        Msg: 'static + CallMsg,
-        Fut: 'static + Send + Future<Output = ()>,
-        F: FnOnce(&'static Runtime, Mailbox<Msg>, ActorId) -> Fut,
+        Msg: 'static + MsgEnum,
+        Fut: 'static + Send + Future<Output = ActorResult>,
+        F: FnOnce(Mailbox<Msg>, ActorId, &'static Runtime) -> Fut,
     {
-        let mut guard = self.handles.write().await;
+        let mut handles = self.handles.write().await;
         let act_id = {
             let mut act_id = ActorId::new();
-            while guard.contains_key(&act_id) {
+            while handles.contains_key(&act_id) {
                 act_id = ActorId::new();
             }
             act_id
@@ -307,7 +330,21 @@ impl Runtime {
                 fut
             }
         };
-        let (req, receiver) = SpawnReq::new(activator(self, rx, act_id));
+        // ctrl_deliverer is responsible for delivering ControlMsgs to the actor.
+        let ctrl_deliverer = {
+            let tx = tx.clone();
+            move |control: ControlMsg| {
+                let tx = tx.clone();
+                let fut: FutureResult = Box::pin(async move {
+                    tx.send(Envelope::Control(control))
+                        .await
+                        .map_err(|err| bterr!("{err}"))
+                });
+                fut
+            }
+        };
+        let actor = actor(rx, act_id, self);
+        let (req, receiver) = SpawnReq::new(Self::catch_unwind(act_id, actor));
         self.kernel_sender
             .send(req)
             .await
@@ -315,11 +352,29 @@ impl Runtime {
         let handle = receiver
             .await
             .unwrap_or_else(|err| panic!("Kernel failed to send abort handle: {err}"));
-        let actor_handle = ActorHandle::new(handle, tx, deliverer);
-        guard.insert(act_id, actor_handle);
+        let actor_handle = ActorHandle::new(
+            act_name.clone(),
+            handle,
+            tx,
+            deliverer,
+            ctrl_deliverer,
+            owner,
+        );
+        handles.insert(act_id, actor_handle);
         act_name
     }
 
+    async fn catch_unwind<Fut>(actor_id: ActorId, fut: Fut) -> FaultResult
+    where
+        Fut: 'static + Send + Future<Output = ActorResult>,
+    {
+        let fut = AssertUnwindSafe(fut);
+        match fut.catch_unwind().await {
+            Ok(result) => result.map_err(ActorFault::Error),
+            Err(panic) => Err(ActorFault::Panic(ActorPanic::new(actor_id, panic))),
+        }
+    }
+
     /// Registers a service activation closure for [ServiceId]. An error is returned if the
     /// [ServiceId] has already been registered.
     pub async fn register<Msg, F>(&self, id: ServiceId, spawner: F) -> Result<ServiceName>
@@ -533,7 +588,7 @@ impl RuntimeCallback {
             } else {
                 WireEnvelope::Send { msg }
             };
-            (handle.deliverer)(envelope).await
+            handle.deliver(envelope).await
         } else {
             Err(bterr!("invalid actor name: {}", msg.to))
         }
@@ -617,192 +672,46 @@ impl<'de> WireEnvelope<'de> {
     }
 }
 
-pub enum EnvelopeKind<T: CallMsg> {
-    Call {
-        reply: Option<oneshot::Sender<T::Reply>>,
-    },
-    Send {
-        from: ActorName,
-    },
-}
-
-impl<T: CallMsg> EnvelopeKind<T> {
-    pub fn name(&self) -> &'static str {
-        match self {
-            Self::Call { .. } => "Call",
-            Self::Send { .. } => "Send",
-        }
-    }
-}
-
-/// Wrapper around a message type `T` which indicates who the message is from and, if the message
-/// was dispatched with `call`, provides a channel to reply to it.
-pub struct Envelope<T: CallMsg> {
-    msg: T,
-    kind: EnvelopeKind<T>,
-}
-
-impl<T: CallMsg> Envelope<T> {
-    pub fn new(msg: T, kind: EnvelopeKind<T>) -> Self {
-        Self { msg, kind }
-    }
-
-    /// Creates a new envelope containing the given message which does not expect a reply.
-    fn new_send(from: ActorName, msg: T) -> Self {
-        Self {
-            kind: EnvelopeKind::Send { from },
-            msg,
-        }
-    }
-
-    /// Creates a new envelope containing the given message which expects exactly one reply.
-    fn new_call(msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
-        let (tx, rx) = oneshot::channel::<T::Reply>();
-        let envelope = Self {
-            kind: EnvelopeKind::Call { reply: Some(tx) },
-            msg,
-        };
-        (envelope, rx)
-    }
-
-    /// Returns the name of the actor which sent this message.
-    pub fn from(&self) -> Option<&ActorName> {
-        match &self.kind {
-            EnvelopeKind::Send { from } => Some(from),
-            _ => None,
-        }
-    }
-
-    /// Returns a reference to the message in this envelope.
-    pub fn msg(&self) -> &T {
-        &self.msg
-    }
-
-    /// Sends a reply to this message.
-    ///
-    /// If this message is not expecting a reply, or if this message has already been replied to,
-    /// then an error is returned.
-    pub fn reply(&mut self, reply: T::Reply) -> Result<()> {
-        match &mut self.kind {
-            EnvelopeKind::Call { reply: tx } => {
-                if let Some(tx) = tx.take() {
-                    tx.send(reply).map_err(|_| bterr!("Failed to send reply."))
-                } else {
-                    Err(bterr!("Reply has already been sent."))
-                }
-            }
-            _ => Err(bterr!("Can't reply to '{}' messages.", self.kind.name())),
-        }
-    }
-
-    /// Returns true if this message expects a reply and it has not already been replied to.
-    pub fn needs_reply(&self) -> bool {
-        matches!(&self.kind, EnvelopeKind::Call { .. })
-    }
-
-    pub fn split(self) -> (T, EnvelopeKind<T>) {
-        (self.msg, self.kind)
-    }
-}
-
-type FutureResult = Pin<Box<dyn Send + Future<Output = Result<()>>>>;
-
-pub struct ActorHandle {
-    handle: AbortHandle,
-    sender: Box<dyn Send + Sync + Any>,
-    deliverer: Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult>,
-}
-
-impl ActorHandle {
-    fn new<T, F>(handle: AbortHandle, sender: mpsc::Sender<Envelope<T>>, deliverer: F) -> Self
-    where
-        T: 'static + CallMsg,
-        F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult,
-    {
-        Self {
-            handle,
-            sender: Box::new(sender),
-            deliverer: Box::new(deliverer),
-        }
-    }
-
-    fn sender<T: 'static + CallMsg>(&self) -> Result<&mpsc::Sender<Envelope<T>>> {
-        self.sender
-            .downcast_ref::<mpsc::Sender<Envelope<T>>>()
-            .ok_or_else(|| bterr!("Attempt to send message as the wrong type."))
-    }
-
-    /// Sends a message to the actor represented by this handle.
-    pub async fn send<T: 'static + SendMsg>(&self, from: ActorName, msg: T) -> Result<()> {
-        let sender = self.sender()?;
-        sender
-            .send(Envelope::new_send(from, msg))
-            .await
-            .map_err(|_| bterr!("failed to enqueue message"))?;
-        Ok(())
-    }
-
-    pub async fn call_through<T: 'static + CallMsg>(&self, msg: T) -> Result<T::Reply> {
-        let sender = self.sender()?;
-        let (envelope, rx) = Envelope::new_call(msg);
-        sender
-            .send(envelope)
-            .await
-            .map_err(|_| bterr!("failed to enqueue call"))?;
-        let reply = rx.await?;
-        Ok(reply)
-    }
-
-    pub fn abort(&self) {
-        self.handle.abort();
-    }
-}
-
-impl Drop for ActorHandle {
-    fn drop(&mut self) {
-        self.abort();
-    }
-}
-
-/// Sets up variable declarations and logging configuration to facilitate testing with a [Runtime].
+/// Sets up static `RUNTIME` and `ASYNC_RT` static variables, with types [Runtime] and
+/// [tokio::runtime::Runtime] respectively, in the scope where its used.
+/// The `env_logger` crate is also configured to print messages to stderr.
 #[macro_export]
 macro_rules! test_setup {
     () => {
+        /// A tokio async runtime.
+        ///
+        /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
+        /// is created for each test
+        /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
+        /// This creates a problem, because the first test thread to access the `RUNTIME` static
+        /// will initialize its `Receiver` in its runtime, which will stop running at the end of
+        /// the test. Hence subsequent tests will not be able to send remote messages to this
+        /// `Runtime`, nor will the kernel task be running.
+        ///
+        /// By creating a single async runtime which is used by all of the tests, we can avoid this
+        /// problem.
+        static ASYNC_RT: $crate::model::Lazy<::tokio::runtime::Runtime> =
+            $crate::model::Lazy::new(|| {
+                ::tokio::runtime::Builder::new_current_thread()
+                    .enable_all()
+                    .build()
+                    .unwrap()
+            });
+
         const RUNTIME_ADDR: ::std::net::IpAddr =
             ::std::net::IpAddr::V4(::std::net::Ipv4Addr::new(127, 0, 0, 1));
-        lazy_static! {
-            static ref RUNTIME_CREDS: ::std::sync::Arc<::btlib::crypto::ConcreteCreds> = {
-                let test_store = &::btlib_tests::TEST_STORE;
-                ::btlib::crypto::CredStore::node_creds(test_store).unwrap()
-            };
-        }
+        static RUNTIME_CREDS: $crate::model::Lazy<
+            ::std::sync::Arc<::btlib::crypto::ConcreteCreds>,
+        > = $crate::model::Lazy::new(|| {
+            let test_store = &::btlib_tests::TEST_STORE;
+            ::btlib::crypto::CredStore::node_creds(test_store).unwrap()
+        });
         declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
 
-        lazy_static! {
-            /// A tokio async runtime.
-            ///
-            /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
-            /// is created for each test
-            /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
-            /// This creates a problem, because the first test thread to access the `RUNTIME` static
-            /// will initialize its `Receiver` in its runtime, which will stop running at the end of
-            /// the test. Hence subsequent tests will not be able to send remote messages to this
-            /// `Runtime`.
-            ///
-            /// By creating a single async runtime which is used by all of the tests, we can avoid this
-            /// problem.
-            static ref ASYNC_RT: tokio::runtime::Runtime = ::tokio::runtime::Builder
-                ::new_current_thread()
-                .enable_all()
-                .build()
-                .unwrap();
-        }
-
         /// The log level to use when running tests.
         const LOG_LEVEL: &str = "warn";
 
         #[::ctor::ctor]
-        #[allow(non_snake_case)]
         fn ctor() {
             ::std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
             let mut builder = ::env_logger::Builder::from_default_env();
@@ -818,10 +727,9 @@ pub mod test {
     use btlib::crypto::{CredStore, CredsPriv};
     use btlib_tests::TEST_STORE;
     use bttp::BlockAddr;
-    use lazy_static::lazy_static;
     use serde::{Deserialize, Serialize};
 
-    use crate::CallMsg;
+    use crate::{model::Lazy, CallMsg};
 
     test_setup!();
 
@@ -832,31 +740,53 @@ pub mod test {
         type Reply = EchoMsg;
     }
 
+    impl Named for EchoMsg {
+        fn name(&self) -> Arc<String> {
+            static NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("EchoMsg".into()));
+            NAME.clone()
+        }
+    }
+
     async fn echo(
-        _rt: &'static Runtime,
         mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
-        _act_id: ActorId,
-    ) {
+        actor_id: ActorId,
+        _rt: &'static Runtime,
+    ) -> ActorResult {
+        static ACTOR_IMPL: Lazy<Arc<String>> = Lazy::new(|| Arc::new("echo".into()));
+        static STATE_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("Listening".into()));
+        static MESSAGE_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("Ping".into()));
+
         while let Some(envelope) = mailbox.recv().await {
-            let (msg, kind) = envelope.split();
-            match kind {
-                EnvelopeKind::Call { reply } => {
+            match envelope {
+                Envelope::Call { msg, reply, .. } => {
                     let replier =
                         reply.unwrap_or_else(|| panic!("The reply has already been sent."));
                     if let Err(_) = replier.send(msg) {
                         panic!("failed to send reply");
                     }
                 }
-                _ => panic!("Expected EchoMsg to be a Call Message."),
+                _ => {
+                    return Err(ActorError::new(
+                        bterr!("Expected EchoMsg to be a Call Message."),
+                        ActorErrorCommon {
+                            actor_id,
+                            actor_impl: ACTOR_IMPL.clone(),
+                            state: STATE_NAME.clone(),
+                            message: MESSAGE_NAME.clone(),
+                            kind: TransKind::Receive,
+                        },
+                    ));
+                }
             }
         }
+        Ok(actor_id)
     }
 
     #[test]
     fn local_call() {
         ASYNC_RT.block_on(async {
             const EXPECTED: &str = "hello";
-            let name = RUNTIME.spawn(echo).await;
+            let name = RUNTIME.spawn(None, echo).await;
             let from = ActorName::new(name.path().clone(), ActorId::new());
 
             let reply = RUNTIME
@@ -882,7 +812,7 @@ pub mod test {
             TEST_STORE.node_creds().unwrap()
         );
         assert_eq!(0, LOCAL_RT.num_running().await);
-        let name = LOCAL_RT.spawn(echo).await;
+        let name = LOCAL_RT.spawn(None, echo).await;
         assert_eq!(1, LOCAL_RT.num_running().await);
         LOCAL_RT.take(&name).await.unwrap();
         assert_eq!(0, LOCAL_RT.num_running().await);
@@ -892,7 +822,7 @@ pub mod test {
     fn remote_call() {
         ASYNC_RT.block_on(async {
             const EXPECTED: &str = "hello";
-            let actor_name = RUNTIME.spawn(echo).await;
+            let actor_name = RUNTIME.spawn(None, echo).await;
             let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
             let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
             let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())

+ 445 - 14
crates/btrun/src/model.rs

@@ -1,12 +1,27 @@
 //! This module contains types used to model the actor system implemented by the runtime.
 
-use std::{fmt::Display, sync::Arc};
+use std::{any::Any, fmt::Display, future::Future, pin::Pin, sync::Arc};
 
-use btlib::BlockPath;
+use btlib::{bterr, BlockPath, Result};
 use btserde::field_helpers::smart_ptr;
 use serde::{de::DeserializeOwned, Deserialize, Serialize};
+use tokio::{
+    sync::{mpsc, oneshot},
+    task::AbortHandle,
+};
 use uuid::Uuid;
 
+use crate::WireEnvelope;
+
+pub use once_cell::sync::Lazy;
+
+/// The return type of actor closures.
+pub type ActorResult = std::result::Result<ActorId, ActorError>;
+
+pub trait Named {
+    fn name(&self) -> Arc<String>;
+}
+
 /// Represents the result of an actor state transition, which can be one of the following:
 /// * `Success`: The transition succeeded and the new state is contained in the result.
 /// * `Abort`: The transition failed and the previous state along with an error describing
@@ -22,6 +37,7 @@ pub enum TransResult<From, To> {
     Fatal { err: btlib::Error },
 }
 
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
 /// Specifies a kind of transition, either a `Send` or a `Receive`.
 pub enum TransKind {
     /// A transition which doesn't receive any message, but sends one or more.
@@ -39,35 +55,236 @@ impl TransKind {
     }
 }
 
-/// A struct which conveys information about where an actor panic occurred to the kernel.
-pub struct ActorPanic {
-    /// The name of the actor implementation which panicked.
-    pub actor_impl: String,
+impl Copy for TransKind {}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+/// A struct which conveys information about where an actor error occurred to the kernel.
+pub struct ActorErrorCommon {
+    /// The ID of the actor which experienced the error.
+    pub actor_id: ActorId,
+    /// The name of the actor implementation which erred.
+    #[serde(with = "smart_ptr")]
+    pub actor_impl: Arc<String>,
     /// The name of the state the actor was in.
-    pub state: &'static str,
+    #[serde(with = "smart_ptr")]
+    pub state: Arc<String>,
     /// The name of the message the actor was handling, or the name of the first message it was
     /// trying to send.
-    pub message: &'static str,
+    #[serde(with = "smart_ptr")]
+    pub message: Arc<String>,
     /// The kind of transition which was being attempted.
     pub kind: TransKind,
-    /// An error describing why the panic occurred.
-    pub err: btlib::Error,
 }
 
-impl Display for ActorPanic {
+impl Display for ActorErrorCommon {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         write!(
             f,
-            "Actor {} panicked in state {} while {} a message of type {}: {}",
+            "Actor {}, with implementation '{}', panicked in state {} while {} a message of type {}",
+            self.actor_id,
             self.actor_impl,
             self.state,
             self.kind.verb(),
-            self.message,
-            self.err
+            self.message
         )
     }
 }
 
+pub struct ActorError {
+    common: ActorErrorCommon,
+    err: btlib::Error,
+}
+
+impl ActorError {
+    pub fn new(err: btlib::Error, common: ActorErrorCommon) -> Self {
+        Self { common, err }
+    }
+
+    pub fn actor_id(&self) -> ActorId {
+        self.common.actor_id
+    }
+
+    pub fn actor_impl(&self) -> &Arc<String> {
+        &self.common.actor_impl
+    }
+
+    pub fn state(&self) -> &Arc<String> {
+        &self.common.state
+    }
+
+    pub fn message(&self) -> &Arc<String> {
+        &self.common.message
+    }
+
+    pub fn kind(&self) -> TransKind {
+        self.common.kind
+    }
+
+    pub fn err(&self) -> &btlib::Error {
+        &self.err
+    }
+}
+
+impl Display for ActorError {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}: {}", self.common, self.err)
+    }
+}
+
+#[derive(Clone, Serialize, Deserialize)]
+pub struct ActorErrorStr {
+    common: ActorErrorCommon,
+    #[serde(with = "smart_ptr")]
+    err: Arc<String>,
+}
+
+impl ActorErrorStr {
+    pub fn actor_id(&self) -> ActorId {
+        self.common.actor_id
+    }
+
+    pub fn actor_impl(&self) -> &Arc<String> {
+        &self.common.actor_impl
+    }
+
+    pub fn state(&self) -> &Arc<String> {
+        &self.common.state
+    }
+
+    pub fn message(&self) -> &Arc<String> {
+        &self.common.message
+    }
+
+    pub fn kind(&self) -> TransKind {
+        self.common.kind
+    }
+
+    pub fn err(&self) -> &Arc<String> {
+        &self.err
+    }
+}
+
+impl Display for ActorErrorStr {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}: {}", self.common, self.err)
+    }
+}
+
+impl From<ActorError> for ActorErrorStr {
+    fn from(value: ActorError) -> Self {
+        let err = Arc::new(value.err.to_string());
+        Self {
+            common: value.common,
+            err,
+        }
+    }
+}
+
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
+pub enum PanicKind {
+    String(#[serde(with = "smart_ptr")] Arc<String>),
+    Unknown,
+}
+
+impl PanicKind {
+    fn new(err: Box<dyn Any>) -> Self {
+        if let Ok(err) = err.downcast::<String>() {
+            let string = *err;
+            Self::String(Arc::new(string))
+        } else {
+            Self::Unknown
+        }
+    }
+}
+
+impl Display for PanicKind {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::String(string) => f.write_str(string.as_str()),
+            Self::Unknown => f.write_str("Unknown error, panic object was not a String."),
+        }
+    }
+}
+
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
+pub struct ActorPanic {
+    actor_id: ActorId,
+    err: PanicKind,
+}
+
+impl ActorPanic {
+    pub(super) fn new(actor_id: ActorId, err: Box<dyn Any>) -> Self {
+        Self {
+            actor_id,
+            err: PanicKind::new(err),
+        }
+    }
+
+    pub fn actor_id(&self) -> ActorId {
+        self.actor_id
+    }
+
+    pub fn err(&self) -> Option<&str> {
+        if let PanicKind::String(string) = &self.err {
+            Some(string.as_str())
+        } else {
+            None
+        }
+    }
+}
+
+impl Display for ActorPanic {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let actor_id = &self.actor_id;
+        let err = &self.err;
+        write!(f, "Actor {actor_id} panicked: {err}")
+    }
+}
+
+/// Represents the ways an actor can exit abnormally, either by returning an error or panicking.
+pub(super) enum ActorFault {
+    Error(ActorError),
+    Panic(ActorPanic),
+}
+
+impl ActorFault {
+    pub(crate) fn actor_id(&self) -> ActorId {
+        match self {
+            Self::Error(actor_error) => actor_error.actor_id(),
+            Self::Panic(ActorPanic { actor_id, .. }) => *actor_id,
+        }
+    }
+}
+
+impl Display for ActorFault {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::Error(err) => err.fmt(f),
+            Self::Panic(panic) => panic.fmt(f),
+        }
+    }
+}
+
+#[derive(Clone, Serialize, Deserialize)]
+/// Represents the ways an actor can exit.
+pub enum ActorExit {
+    /// The actor exited normally.
+    Ok,
+    /// The actor exited with an error.
+    Error(ActorErrorStr),
+    /// The actor panicked.
+    Panic(ActorPanic),
+}
+
+impl From<ActorFault> for ActorExit {
+    fn from(value: ActorFault) -> Self {
+        match value {
+            ActorFault::Error(error) => Self::Error(error.into()),
+            ActorFault::Panic(panic) => Self::Panic(panic),
+        }
+    }
+}
+
 /// Represents the terminal state of an actor, where it stops processing messages and halts.
 pub struct End;
 
@@ -219,6 +436,220 @@ pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync {
 /// Trait for messages which expect exactly zero replies.
 pub trait SendMsg: CallMsg {}
 
+pub trait MsgEnum: CallMsg + Named {}
+
+impl<T: CallMsg + Named> MsgEnum for T {}
+
 /// A type used to express when a reply is not expected for a message type.
 #[derive(Serialize, Deserialize)]
 pub enum NoReply {}
+
+/// Messages sent by the runtime to actors.
+pub enum ControlMsg {
+    OwnerExited(ActorExit),
+    OwnedExited { name: ActorName, exit: ActorExit },
+}
+
+impl Named for ControlMsg {
+    fn name(&self) -> Arc<String> {
+        static OWNER_EXITED_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("OwnerExited".into()));
+        static OWNED_EXITED_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("OwnedExited".into()));
+        match self {
+            Self::OwnerExited(_) => OWNER_EXITED_NAME.clone(),
+            Self::OwnedExited { .. } => OWNED_EXITED_NAME.clone(),
+        }
+    }
+}
+
+/// Wrapper around a message type `T` which indicates who the message is from and, if the message
+/// was dispatched with `call`, provides a channel to reply to it.
+pub enum Envelope<T: MsgEnum> {
+    Call {
+        msg: T,
+        reply: Option<oneshot::Sender<T::Reply>>,
+    },
+    Send {
+        msg: T,
+        from: ActorName,
+    },
+    Control(ControlMsg),
+}
+
+impl<T: MsgEnum> Envelope<T> {
+    /// Creates a new envelope containing the given message which does not expect a reply.
+    pub(super) fn new_send(from: ActorName, msg: T) -> Self {
+        Self::Send { from, msg }
+    }
+
+    /// Creates a new envelope containing the given message which expects exactly one reply.
+    pub(super) fn new_call(msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
+        let (tx, rx) = oneshot::channel::<T::Reply>();
+        let envelope = Self::Call {
+            reply: Some(tx),
+            msg,
+        };
+        (envelope, rx)
+    }
+
+    /// Returns the name of the actor which sent this message.
+    pub fn from(&self) -> Option<&ActorName> {
+        match self {
+            Self::Send { from, .. } => Some(from),
+            _ => None,
+        }
+    }
+
+    /// Returns a reference to the message in this envelope.
+    pub fn msg(&self) -> Option<&T> {
+        match self {
+            Self::Call { msg, .. } | Self::Send { msg, .. } => Some(msg),
+            _ => None,
+        }
+    }
+
+    pub const fn name(&self) -> &'static str {
+        match self {
+            Self::Call { .. } => "Call",
+            Self::Send { .. } => "Send",
+            Self::Control { .. } => "Control",
+        }
+    }
+
+    pub fn msg_name(&self) -> Arc<String> {
+        match self {
+            Self::Call { msg, .. } | Self::Send { msg, .. } => msg.name(),
+            Self::Control(msg) => msg.name(),
+        }
+    }
+
+    /// Sends a reply to this message.
+    ///
+    /// If this message is not expecting a reply, or if this message has already been replied to,
+    /// then an error is returned.
+    pub fn reply(&mut self, reply: T::Reply) -> Result<()> {
+        match self {
+            Self::Call { reply: tx, .. } => {
+                if let Some(tx) = tx.take() {
+                    tx.send(reply).map_err(|_| bterr!("Failed to send reply."))
+                } else {
+                    Err(bterr!("Reply has already been sent."))
+                }
+            }
+            _ => Err(bterr!("Can't reply to {} messages.", self.name())),
+        }
+    }
+
+    /// Returns true if this message expects a reply and it has not already been replied to.
+    pub fn needs_reply(&self) -> bool {
+        matches!(self, Self::Call { .. })
+    }
+}
+
+pub(super) type FutureResult = Pin<Box<dyn Send + Future<Output = Result<()>>>>;
+
+pub struct ActorHandle {
+    name: ActorName,
+    handle: AbortHandle,
+    sender: Box<dyn Send + Sync + Any>,
+    deliverer: Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult>,
+    ctrl_deliverer: Box<dyn Send + Sync + Fn(ControlMsg) -> FutureResult>,
+    owner: Option<ActorName>,
+    owns: Vec<ActorName>,
+}
+
+impl ActorHandle {
+    pub(super) fn new<T, F, G>(
+        name: ActorName,
+        handle: AbortHandle,
+        sender: mpsc::Sender<Envelope<T>>,
+        deliverer: F,
+        ctrl_deliverer: G,
+        owner: Option<ActorName>,
+    ) -> Self
+    where
+        T: 'static + MsgEnum,
+        F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult,
+        G: 'static + Send + Sync + Fn(ControlMsg) -> FutureResult,
+    {
+        Self {
+            name,
+            handle,
+            sender: Box::new(sender),
+            deliverer: Box::new(deliverer),
+            ctrl_deliverer: Box::new(ctrl_deliverer),
+            owner,
+            owns: Vec::new(),
+        }
+    }
+
+    fn sender<T: 'static + MsgEnum>(&self) -> Result<&mpsc::Sender<Envelope<T>>> {
+        self.sender
+            .downcast_ref::<mpsc::Sender<Envelope<T>>>()
+            .ok_or_else(|| bterr!("Attempt to send message as the wrong type."))
+    }
+
+    pub fn name(&self) -> &ActorName {
+        &self.name
+    }
+
+    /// Sends a message to the actor represented by this handle.
+    pub async fn send<T: 'static + MsgEnum>(&self, from: ActorName, msg: T) -> Result<()> {
+        let sender = self.sender()?;
+        sender
+            .send(Envelope::new_send(from, msg))
+            .await
+            .map_err(|_| bterr!("failed to enqueue message"))?;
+        Ok(())
+    }
+
+    pub async fn call_through<T: 'static + MsgEnum>(&self, msg: T) -> Result<T::Reply> {
+        let sender = self.sender()?;
+        let (envelope, rx) = Envelope::new_call(msg);
+        sender
+            .send(envelope)
+            .await
+            .map_err(|_| bterr!("failed to enqueue call"))?;
+        let reply = rx.await?;
+        Ok(reply)
+    }
+
+    pub(crate) async fn send_control(&self, msg: ControlMsg) -> Result<()> {
+        (self.ctrl_deliverer)(msg).await
+    }
+
+    pub fn abort(&self) {
+        self.handle.abort();
+    }
+
+    pub(super) async fn deliver(&self, envelope: WireEnvelope<'_>) -> Result<()> {
+        (self.deliverer)(envelope).await
+    }
+
+    pub(crate) fn take_owner(&mut self) -> Option<ActorName> {
+        self.owner.take()
+    }
+
+    pub(crate) fn owns_mut(&mut self) -> &mut Vec<ActorName> {
+        &mut self.owns
+    }
+}
+
+impl Drop for ActorHandle {
+    fn drop(&mut self) {
+        self.abort();
+    }
+}
+
+/// Expands to an implementation of an `actor_impl` method which returns reference to a statically
+/// allocated name.
+#[macro_export]
+macro_rules! actor_name {
+    ($name:literal) => {
+        fn actor_impl() -> ::std::sync::Arc<String> {
+            use ::once_cell::sync::Lazy;
+            use ::std::sync::Arc;
+            static NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new($name.into()));
+            NAME.clone()
+        }
+    };
+}

+ 193 - 111
crates/btrun/tests/runtime_tests.rs

@@ -6,8 +6,8 @@ use btrun::*;
 
 use btlib::Result;
 use btproto::protocol;
-use lazy_static::lazy_static;
 use log;
+use once_cell::sync::Lazy;
 use serde::{Deserialize, Serialize};
 use std::{
     future::{ready, Future, Ready},
@@ -121,56 +121,101 @@ mod ping_pong {
             End(End),
         }
 
+        impl<S> Named for ServerState<S> {
+            fn name(&self) -> Arc<String> {
+                static SERVER_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("Server".into()));
+                static END_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("End".into()));
+                match self {
+                    Self::Server(_) => SERVER_NAME.clone(),
+                    Self::End(_) => END_NAME.clone(),
+                }
+            }
+        }
+
         async fn server_loop<Init, F>(
             _runtime: &'static Runtime,
             make_init: F,
             mut mailbox: Mailbox<PingProtocolMsgs>,
-            _act_id: ActorId,
-        ) where
+            actor_id: ActorId,
+        ) -> ActorResult
+        where
             Init: 'static + Server,
             F: 'static + Send + Sync + FnOnce() -> Init,
         {
             let mut state = ServerState::Server(make_init());
             while let Some(envelope) = mailbox.recv().await {
-                let (msg, msg_kind) = envelope.split();
-                state = match (state, msg) {
-                    (ServerState::Server(listening_state), PingProtocolMsgs::Ping(msg)) => {
-                        match listening_state.handle_ping(msg).await {
-                            TransResult::Ok((new_state, reply)) => match msg_kind {
-                                EnvelopeKind::Call { reply: replier } => {
+                state = match envelope {
+                    Envelope::Call {
+                        msg,
+                        reply: replier,
+                        ..
+                    } => match (state, msg) {
+                        (ServerState::Server(listening_state), PingProtocolMsgs::Ping(msg)) => {
+                            match listening_state.handle_ping(msg).await {
+                                TransResult::Ok((new_state, reply)) => {
                                     let replier =
                                         replier.expect("The reply has already been sent.");
                                     if let Err(_) = replier.send(PingProtocolMsgs::PingReply(reply))
                                     {
-                                        panic!("Failed to send Ping reply.");
+                                        return Err(ActorError::new(
+                                            bterr!("Failed to send Ping reply."),
+                                            ActorErrorCommon {
+                                                actor_id,
+                                                actor_impl: Init::actor_impl(),
+                                                state: Init::state_name(),
+                                                message: PingProtocolMsgKinds::Ping.name(),
+                                                kind: TransKind::Receive,
+                                            },
+                                        ));
                                     }
                                     ServerState::End(new_state)
                                 }
-                                _ => panic!("'Ping' was expected to be a Call message."),
-                            },
-                            TransResult::Abort { from, err } => {
-                                log::warn!("Aborted transition from the {} while handling the {} message: {}", "Server", "Ping", err);
-                                ServerState::Server(from)
-                            }
-                            TransResult::Fatal { err } => {
-                                panic!("Fatal error while handling Ping message in Server state: {err}");
+                                TransResult::Abort { from, err } => {
+                                    log::warn!("Aborted transition from the {} while handling the {} message: {}", "Server", "Ping", err);
+                                    ServerState::Server(from)
+                                }
+                                TransResult::Fatal { err } => {
+                                    return Err(ActorError::new(
+                                        err,
+                                        ActorErrorCommon {
+                                            actor_id,
+                                            actor_impl: Init::actor_impl(),
+                                            state: Init::state_name(),
+                                            message: PingProtocolMsgKinds::Ping.name(),
+                                            kind: TransKind::Receive,
+                                        },
+                                    ));
+                                }
                             }
                         }
+                        (state, _) => state,
+                    },
+                    envelope => {
+                        return Err(ActorError::new(
+                            bterr!("Unexpected envelope type: {}", envelope.name()),
+                            ActorErrorCommon {
+                                actor_id,
+                                actor_impl: Init::actor_impl(),
+                                state: state.name(),
+                                message: envelope.msg_name(),
+                                kind: TransKind::Receive,
+                            },
+                        ))
                     }
-                    (state, _) => state,
                 };
 
                 if let ServerState::End(_) = state {
                     break;
                 }
             }
+            Ok(actor_id)
         }
 
         rt.register::<PingProtocolMsgs, _>(id, move |runtime| {
             let make_init = make_init.clone();
             let fut = async move {
                 let actor_impl = runtime
-                    .spawn(move |_, mailbox, act_id| {
+                    .spawn(None, move |mailbox, act_id, runtime| {
                         server_loop(runtime, make_init, mailbox, act_id)
                     })
                     .await;
@@ -202,9 +247,7 @@ mod ping_pong {
     }
 
     impl Client for ClientState {
-        fn actor_impl() -> String {
-            "client".into()
-        }
+        actor_name!("ping_client");
 
         type OnSendPingFut = impl Future<Output = TransResult<Self, (End, PingReply)>>;
         fn on_send_ping(self, _msg: &mut Ping) -> Self::OnSendPingFut {
@@ -225,9 +268,7 @@ mod ping_pong {
     }
 
     impl Server for ServerState {
-        fn actor_impl() -> String {
-            "server".into()
-        }
+        actor_name!("ping_server");
 
         type HandlePingFut = impl Future<Output = TransResult<Self, (End, PingReply)>>;
         fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
@@ -313,6 +354,8 @@ mod client_callback {
 
     use super::*;
 
+    use btlib::bterr;
+    use once_cell::sync::Lazy;
     use std::{panic::panic_any, time::Duration};
     use tokio::{sync::oneshot, time::timeout};
 
@@ -342,9 +385,7 @@ mod client_callback {
     }
 
     impl Unregistered for UnregisteredState {
-        fn actor_impl() -> String {
-            "client".into()
-        }
+        actor_name!("callback_client");
 
         type OnSendRegisterRegistered = RegisteredState;
         type OnSendRegisterFut = Ready<TransResult<Self, Self::OnSendRegisterRegistered>>;
@@ -372,9 +413,7 @@ mod client_callback {
     }
 
     impl Listening for ListeningState {
-        fn actor_impl() -> String {
-            "server".into()
-        }
+        actor_name!("callback_server");
 
         type HandleRegisterListening = ListeningState;
         type HandleRegisterWorking = WorkingState;
@@ -397,9 +436,7 @@ mod client_callback {
     }
 
     impl Working for WorkingState {
-        fn actor_impl() -> String {
-            "worker".into()
-        }
+        actor_name!("callback_worker");
 
         type OnSendCompletedFut = Ready<TransResult<Self, (End, Completed)>>;
         fn on_send_completed(self) -> Self::OnSendCompletedFut {
@@ -416,12 +453,16 @@ mod client_callback {
         End(End),
     }
 
-    impl<Init: Unregistered> ClientState<Init> {
-        pub fn name(&self) -> &'static str {
+    impl<Init: Unregistered> Named for ClientState<Init> {
+        fn name(&self) -> Arc<String> {
+            static UNREGISTERED_NAME: Lazy<Arc<String>> =
+                Lazy::new(|| Arc::new("Unregistered".into()));
+            static REGISTERED_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("Registered".into()));
+            static END_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("End".into()));
             match self {
-                Self::Unregistered(_) => "Unregistered",
-                Self::Registered(_) => "Registered",
-                Self::End(_) => "End",
+                Self::Unregistered(_) => UNREGISTERED_NAME.clone(),
+                Self::Registered(_) => REGISTERED_NAME.clone(),
+                Self::End(_) => END_NAME.clone(),
             }
         }
     }
@@ -473,38 +514,53 @@ mod client_callback {
         let state = Arc::new(Mutex::new(Some(ClientState::Unregistered(init))));
         let name = {
             let state = state.clone();
-            runtime.spawn(move |_, mut mailbox, _act_id| async move {
+            runtime.spawn(None, move |mut mailbox, actor_id, _| async move {
                 while let Some(envelope) = mailbox.recv().await {
                     let mut guard = state.lock().await;
                     let state = guard.take()
                         .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
-                    let (msg, _kind) = envelope.split();
-                    let new_state = match (state, msg) {
-                        (ClientState::Registered(curr_state), ClientCallbackMsgs::Completed(msg)) => {
-                            match curr_state.handle_completed(msg).await {
-                                TransResult::Ok(next) =>  ClientState::<Init>::End(next),
-                                TransResult::Abort { from, err } => {
-                                    log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Registered", "Completed", err);
-                                    ClientState::Registered(from)
+                    let new_state = match envelope {
+                        Envelope::Send { msg, .. } => {
+                            match (state, msg) {
+                                (ClientState::Registered(curr_state), ClientCallbackMsgs::Completed(msg)) => {
+                                    match curr_state.handle_completed(msg).await {
+                                        TransResult::Ok(next) =>  ClientState::<Init>::End(next),
+                                        TransResult::Abort { from, err } => {
+                                            log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Registered", "Completed", err);
+                                            ClientState::Registered(from)
+                                        }
+                                        TransResult::Fatal { err } => {
+                                            panic_any(ActorError::new(
+                                                err,
+                                                ActorErrorCommon {
+                                                actor_id,
+                                                actor_impl: Init::actor_impl(),
+                                                state: Init::OnSendRegisterRegistered::state_name(),
+                                                message: ClientCallbackMsgKinds::Completed.name(),
+                                                kind: TransKind::Receive,
+                                            }));
+                                        }
+                                    }
                                 }
-                                TransResult::Fatal { err } => {
-                                    panic_any(ActorPanic {
-                                        actor_impl: Init::actor_impl(),
-                                        state: "Registered",
-                                        message: "Completed",
-                                        kind: TransKind::Receive,
-                                        err
-                                    });
+                                (state, msg) => {
+                                    log::error!("Unexpected message {} in state {}.", msg.name(), state.name());
+                                    state
                                 }
                             }
                         }
-                        (state, msg) => {
-                            log::error!("Unexpected message {} in state {}.", msg.name(), state.name());
-                            state
-                        }
+                        envelope => return Err(ActorError::new(
+                            bterr!("Unexpected envelope type: {}", envelope.name()),
+                            ActorErrorCommon {
+                            actor_id,
+                            actor_impl: Init::actor_impl(),
+                            state: state.name(),
+                            message: envelope.msg_name(),
+                            kind: TransKind::Receive,
+                        }))
                     };
                     *guard = Some(new_state);
                 }
+                Ok(actor_id)
             }).await
         };
         ClientHandle {
@@ -527,10 +583,12 @@ mod client_callback {
             Listening(S),
         }
 
-        impl<S: Listening> ServerState<S> {
-            fn name(&self) -> &'static str {
+        impl<S: Listening> Named for ServerState<S> {
+            fn name(&self) -> Arc<String> {
+                static LISTENING_NAME: Lazy<Arc<String>> =
+                    Lazy::new(|| Arc::new("Listening".into()));
                 match self {
-                    Self::Listening(_) => "Listening",
+                    Self::Listening(_) => LISTENING_NAME.clone(),
                 }
             }
         }
@@ -539,51 +597,66 @@ mod client_callback {
             runtime: &'static Runtime,
             make_init: F,
             mut mailbox: Mailbox<ClientCallbackMsgs>,
-            _act_id: ActorId,
-        ) where
+            actor_id: ActorId,
+        ) -> ActorResult
+        where
             Init: 'static + Listening<HandleRegisterListening = Init>,
             F: 'static + Send + Sync + Fn() -> Init,
         {
             let mut state = ServerState::Listening(make_init());
             while let Some(envelope) = mailbox.recv().await {
-                let (msg, msg_kind) = envelope.split();
-                let new_state = match (state, msg) {
-                    (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
-                        match curr_state.handle_register(msg).await {
-                            TransResult::Ok((new_state, working_state)) => {
-                                if let EnvelopeKind::Send { from, .. } = msg_kind {
+                let new_state = match envelope {
+                    Envelope::Send { msg, from, .. } => match (state, msg) {
+                        (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
+                            match curr_state.handle_register(msg).await {
+                                TransResult::Ok((new_state, working_state)) => {
                                     start_worker(working_state, from, runtime).await;
-                                } else {
-                                    log::error!("Expected Register to be a Send message.");
+                                    ServerState::Listening(new_state)
+                                }
+                                TransResult::Abort { from, err } => {
+                                    log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Listening", "Register", err);
+                                    ServerState::Listening(from)
+                                }
+                                TransResult::Fatal { err } => {
+                                    let err = ActorError::new(
+                                        err,
+                                        ActorErrorCommon {
+                                            actor_id,
+                                            actor_impl: Init::actor_impl(),
+                                            state: Init::state_name(),
+                                            message: ClientCallbackMsgKinds::Register.name(),
+                                            kind: TransKind::Receive,
+                                        },
+                                    );
+                                    panic_any(format!("{err}"));
                                 }
-                                ServerState::Listening(new_state)
-                            }
-                            TransResult::Abort { from, err } => {
-                                log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Listening", "Register", err);
-                                ServerState::Listening(from)
-                            }
-                            TransResult::Fatal { err } => {
-                                panic_any(ActorPanic {
-                                    actor_impl: Init::actor_impl(),
-                                    state: "Listening",
-                                    message: "Register",
-                                    kind: TransKind::Receive,
-                                    err,
-                                });
                             }
                         }
-                    }
-                    (state, msg) => {
-                        log::error!(
-                            "Unexpected message {} in state {}.",
-                            msg.name(),
-                            state.name()
-                        );
-                        state
+                        (state, msg) => {
+                            log::error!(
+                                "Unexpected message {} in state {}.",
+                                msg.name(),
+                                state.name()
+                            );
+                            state
+                        }
+                    },
+                    envelope => {
+                        return Err(ActorError::new(
+                            bterr!("Unexpected envelope type: {}", envelope.name()),
+                            ActorErrorCommon {
+                                actor_id,
+                                actor_impl: Init::actor_impl(),
+                                state: state.name(),
+                                message: envelope.msg_name(),
+                                kind: TransKind::Receive,
+                            },
+                        ))
                     }
                 };
                 state = new_state;
             }
+            Ok(actor_id)
         }
 
         runtime
@@ -592,7 +665,7 @@ mod client_callback {
                 let fut = async move {
                     let make_init = make_init.clone();
                     let actor_impl = runtime
-                        .spawn(move |_, mailbox, act_id| {
+                        .spawn(None, move |mailbox, act_id, runtime| {
                             server_loop(runtime, make_init, mailbox, act_id)
                         })
                         .await;
@@ -616,30 +689,39 @@ mod client_callback {
         }
 
         runtime
-            .spawn::<ClientCallbackMsgs, _, _>(move |_, _, act_id| async move {
+            .spawn::<ClientCallbackMsgs, _, _>(None, move |_, actor_id, _| async move {
                 let msg = match init.on_send_completed().await {
                     TransResult::Ok((End, msg)) => msg,
                     TransResult::Abort { err, .. } | TransResult::Fatal { err } => {
-                        panic_any(ActorPanic {
-                            actor_impl: Init::actor_impl(),
-                            state: "Working",
-                            message: "Completed",
-                            kind: TransKind::Send,
+                        let err = ActorError::new(
                             err,
-                        })
+                            ActorErrorCommon {
+                                actor_id,
+                                actor_impl: Init::actor_impl(),
+                                state: Init::state_name(),
+                                message: ClientCallbackMsgKinds::Completed.name(),
+                                kind: TransKind::Send,
+                            },
+                        );
+                        panic_any(format!("{err}"))
                     }
                 };
-                let from = runtime.actor_name(act_id);
+                let from = runtime.actor_name(actor_id);
                 let msg = ClientCallbackMsgs::Completed(msg);
                 runtime.send(owned, from, msg).await.unwrap_or_else(|err| {
-                    panic_any(ActorPanic {
-                        actor_impl: Init::actor_impl(),
-                        state: "Working",
-                        message: "Completed",
-                        kind: TransKind::Send,
+                    let err = ActorError::new(
                         err,
-                    });
+                        ActorErrorCommon {
+                            actor_id,
+                            actor_impl: Init::actor_impl(),
+                            state: Init::state_name(),
+                            message: ClientCallbackMsgKinds::Completed.name(),
+                            kind: TransKind::Send,
+                        },
+                    );
+                    panic_any(format!("{err}"));
                 });
+                Ok(actor_id)
             })
             .await
     }