Browse Source

Improved error handling in btrun.

Matthew Carr 1 year ago
parent
commit
5d2f5b3b90

+ 1 - 2
crates/btfs/src/lib.rs

@@ -1,6 +1,5 @@
-use btlib::Result;
 use btproto::protocol;
-use btrun::{ActorName, CallMsg, NoReply, SendMsg};
+use btrun::model::{ActorName, CallMsg, NoReply, SendMsg};
 use btsector::FileId;
 use serde::{Deserialize, Serialize};
 

+ 25 - 8
crates/btproto/src/generation.rs

@@ -24,7 +24,7 @@ impl ProtocolModel {
             quote! {}
         } else {
             quote! {
-                impl ::btrun::SendMsg for #enum_name {}
+                impl ::btrun::model::SendMsg for #enum_name {}
             }
         };
         let proto_name = &self.def().name_def.name;
@@ -44,7 +44,7 @@ impl ProtocolModel {
                 }
             }
 
-            impl ::btrun::CallMsg for #enum_name {
+            impl ::btrun::model::CallMsg for #enum_name {
                 type Reply = Self;
             }
 
@@ -53,14 +53,29 @@ impl ProtocolModel {
     }
 
     fn generate_state_traits(&self) -> TokenStream {
-        let traits = self
-            .states_iter()
-            .map(|state| (state.name(), state.methods().values()));
+        let traits = self.states_iter().map(|state| {
+            (
+                state.name(),
+                state.methods().values(),
+                self.actor_lookup()
+                    .actor_with_init_state(state.name())
+                    .is_some(),
+            )
+        });
         let mut tokens = TokenStream::new();
-        for (trait_ident, methods) in traits {
+        for (trait_ident, methods, is_init_state) in traits {
             let method_tokens = methods.map(|x| x.generate_tokens());
+            let actor_impl_method = if is_init_state {
+                quote! {
+                    #[doc = "The name of the implementation for the actor this state is a part of."]
+                    fn actor_impl() -> String;
+                }
+            } else {
+                quote! {}
+            };
             quote! {
-                pub trait #trait_ident : Send + Sync {
+                pub trait #trait_ident : Send + Sync + Sized {
+                    #actor_impl_method
                     #( #method_tokens )*
                 }
             }
@@ -80,7 +95,9 @@ impl MethodModel {
         let future_name = self.future();
         quote! {
             #( #output_decls )*
-            type #future_name: Send + ::std::future::Future<Output = Result<( #( #output_types ),* )>>;
+            type #future_name: Send + ::std::future::Future<
+                Output = ::btrun::model::TransResult<Self, ( #( #output_types ),* )>
+            >;
             fn #method_ident(self #( , #msg_args )*) -> Self::#future_name;
         }
     }

+ 0 - 8
crates/btproto/src/lib.rs

@@ -24,14 +24,6 @@ macro_rules! unwrap_or_compile_err {
 /// Generates types for the actors participating in a messaging protocol.
 ///
 /// ## Usage Restrictions
-/// A type named `Result` which accepts a single type parameter must be in scope where you call this
-/// macro. This allows you to define the error type used by the generated traits by defining a
-/// type alias:
-/// ```
-/// struct MyError;
-/// type Result<T> = std::result::Result<T, MyError>;
-/// ```
-///
 /// You must also ensure all message types referenced by the protocol are in scope.
 ///
 /// ## Grammar

+ 26 - 6
crates/btproto/src/model.rs

@@ -4,7 +4,7 @@ use std::{
     rc::Rc,
 };
 
-use btrun::End;
+use btrun::model::End;
 use proc_macro2::{Ident, Span, TokenStream};
 use quote::{format_ident, quote, ToTokens};
 
@@ -18,6 +18,7 @@ use crate::{
 pub(crate) struct ProtocolModel {
     def: Protocol,
     msg_lookup: MsgLookup,
+    actor_lookup: ActorLookup,
     actors: HashMap<Rc<Ident>, ActorModel>,
 }
 
@@ -89,6 +90,7 @@ impl ProtocolModel {
         Ok(Self {
             def,
             msg_lookup,
+            actor_lookup,
             actors,
         })
     }
@@ -101,6 +103,10 @@ impl ProtocolModel {
         &self.msg_lookup
     }
 
+    pub(crate) fn actor_lookup(&self) -> &ActorLookup {
+        &self.actor_lookup
+    }
+
     pub(crate) fn actors_iter(&self) -> impl Iterator<Item = &ActorModel> {
         self.actors.values()
     }
@@ -401,7 +407,7 @@ impl OutputModel {
                 let state_trait = def.state_trait.as_ref();
                 if state_trait == End::ident() {
                     let end_ident = format_ident!("{}", End::ident());
-                    (None, Some(quote! { ::btrun::#end_ident }))
+                    (None, Some(quote! { ::btrun::model::#end_ident }))
                 } else {
                     let type_name = format_ident!("{type_prefix}{}", state_trait);
                     (
@@ -419,7 +425,7 @@ impl OutputModel {
                 let type_name = if *part_of_client {
                     if *is_call {
                         Some(quote! {
-                            <#msg_type as ::btrun::CallMsg>::Reply
+                            <#msg_type as ::btrun::model::CallMsg>::Reply
                         })
                     } else {
                         None
@@ -471,6 +477,7 @@ pub(crate) struct ActorLookup {
     parents: HashMap<Rc<Ident>, HashSet<Rc<Ident>>>,
     /// A map from an actor name to the set of actors names which it spawns.
     children: HashMap<Rc<Ident>, HashSet<Rc<Ident>>>,
+    actors_by_init_state: HashMap<Rc<Ident>, Rc<Ident>>,
 }
 
 impl ActorLookup {
@@ -483,13 +490,19 @@ impl ActorLookup {
         let mut actors_by_state = HashMap::new();
         let mut parents = HashMap::new();
         let mut children = HashMap::new();
+        let mut actors_by_init_state = HashMap::new();
         for actor_def in actor_defs {
             let mut states = HashSet::new();
+            let actor_name = &actor_def.actor;
+            let mut first = true;
             for state in actor_def.states.as_ref().iter() {
+                if first {
+                    actors_by_init_state.insert(state.clone(), actor_name.clone());
+                    first = false;
+                }
                 states.insert(state.clone());
                 actors_by_state.insert(state.clone(), actor_def.actor.clone());
             }
-            let actor_name = &actor_def.actor;
             actor_states.insert(actor_name.clone(), states);
             parents.insert(actor_name.clone(), HashSet::new());
             children.insert(actor_name.clone(), HashSet::new());
@@ -523,6 +536,7 @@ impl ActorLookup {
             actors_by_state,
             parents,
             children,
+            actors_by_init_state,
         })
     }
 
@@ -549,6 +563,12 @@ impl ActorLookup {
             .get(actor_name)
             .unwrap_or_else(|| panic!("children: {}", Self::UNKNOWN_ACTOR_ERR))
     }
+
+    /// Returns the name of the actor for which the given state name is the initial state. If the
+    /// given name is not the initial state of any actor, [None] is returned.
+    pub(crate) fn actor_with_init_state(&self, init_state: &Ident) -> Option<&Rc<Ident>> {
+        self.actors_by_init_state.get(init_state)
+    }
 }
 
 pub(crate) struct MsgLookup {
@@ -653,7 +673,7 @@ impl MsgInfo {
                     MessageReplyPart::REPLY_IDENT
                 ));
                 let parent = self.msg_name.as_ref();
-                reply.msg_type = Rc::new(quote! { <#parent as ::btrun::CallMsg>::Reply });
+                reply.msg_type = Rc::new(quote! { <#parent as ::btrun::model::CallMsg>::Reply });
                 Box::new(reply)
             })
         } else {
@@ -682,7 +702,7 @@ impl MsgInfo {
     }
 
     /// The type of this message. If this message is not a reply, this is just `msg_name`. If it is
-    /// a reply, it is `<#msg_name as ::btrun::CallMsg>::Reply`.
+    /// a reply, it is `<#msg_name as ::btrun::model::CallMsg>::Reply`.
     pub(crate) fn msg_type(&self) -> &Rc<TokenStream> {
         &self.msg_type
     }

+ 1 - 1
crates/btproto/src/validation.rs

@@ -2,7 +2,7 @@ use std::{collections::HashSet, hash::Hash};
 
 use proc_macro2::{Ident, Span};
 
-use btrun::End;
+use btrun::model::End;
 
 use crate::{
     error::{self, MaybeErr},

+ 46 - 19
crates/btproto/tests/protocol_tests.rs

@@ -2,9 +2,8 @@
 
 use std::future::{ready, Ready};
 
-use btlib::Result;
 use btproto::protocol;
-use btrun::{CallMsg, End};
+use btrun::model::{CallMsg, End, TransResult};
 use serde::{Deserialize, Serialize};
 
 #[derive(Serialize, Deserialize)]
@@ -28,7 +27,7 @@ fn minimal_syntax() {
 
     protocol! {
         named MinimalTest;
-        let states = [Server];
+        let server = [Server];
         let client = [Client];
         Client -> End, >service(Server)!Msg;
         Server?Msg -> End;
@@ -43,18 +42,26 @@ fn minimal_syntax() {
     struct ServerState;
 
     impl Server for ServerState {
-        type HandleMsgFut = Ready<Result<End>>;
+        fn actor_impl() -> String {
+            "server".into()
+        }
+
+        type HandleMsgFut = Ready<TransResult<Self, End>>;
         fn handle_msg(self, _msg: Msg) -> Self::HandleMsgFut {
-            ready(Ok(End))
+            ready(TransResult::Ok(End))
         }
     }
 
     struct ClientState;
 
     impl Client for ClientState {
-        type OnSendMsgFut = Ready<Result<End>>;
+        fn actor_impl() -> String {
+            "client".into()
+        }
+
+        type OnSendMsgFut = Ready<TransResult<Self, End>>;
         fn on_send_msg(self, _msg: &mut Msg) -> Self::OnSendMsgFut {
-            ready(Ok(End))
+            ready(TransResult::Ok(End))
         }
     }
 }
@@ -79,20 +86,28 @@ fn reply() {
     struct ListeningState;
 
     impl Listening for ListeningState {
+        fn actor_impl() -> String {
+            "server".into()
+        }
+
         type HandlePingListening = Self;
-        type HandlePingFut = Ready<Result<(Self, <Ping as CallMsg>::Reply)>>;
+        type HandlePingFut = Ready<TransResult<Self, (Self, <Ping as CallMsg>::Reply)>>;
         fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
-            ready(Ok((self, ())))
+            ready(TransResult::Ok((self, ())))
         }
     }
 
     struct ClientState;
 
     impl Client for ClientState {
+        fn actor_impl() -> String {
+            "client".into()
+        }
+
         type OnSendPingClient = Self;
-        type OnSendPingFut = Ready<Result<(Self, <Ping as CallMsg>::Reply)>>;
+        type OnSendPingFut = Ready<TransResult<Self, (Self, <Ping as CallMsg>::Reply)>>;
         fn on_send_ping(self, _ping: &mut Ping) -> Self::OnSendPingFut {
-            ready(Ok((self, ())))
+            ready(TransResult::Ok((self, ())))
         }
     }
 }
@@ -125,39 +140,51 @@ fn client_callback() {
     struct UnregisteredState;
 
     impl Unregistered for UnregisteredState {
+        fn actor_impl() -> String {
+            "client".into()
+        }
+
         type OnSendRegisterRegistered = RegisteredState;
-        type OnSendRegisterFut = Ready<Result<Self::OnSendRegisterRegistered>>;
+        type OnSendRegisterFut = Ready<TransResult<Self, Self::OnSendRegisterRegistered>>;
         fn on_send_register(self, _arg: &mut Register) -> Self::OnSendRegisterFut {
-            ready(Ok(RegisteredState))
+            ready(TransResult::Ok(RegisteredState))
         }
     }
 
     struct RegisteredState;
 
     impl Registered for RegisteredState {
-        type HandleCompletedFut = Ready<Result<End>>;
+        type HandleCompletedFut = Ready<TransResult<Self, End>>;
         fn handle_completed(self, _arg: Completed) -> Self::HandleCompletedFut {
-            ready(Ok(End))
+            ready(TransResult::Ok(End))
         }
     }
 
     struct ListeningState;
 
     impl Listening for ListeningState {
+        fn actor_impl() -> String {
+            "server".into()
+        }
+
         type HandleRegisterListening = ListeningState;
         type HandleRegisterWorking = WorkingState;
-        type HandleRegisterFut = Ready<Result<(ListeningState, WorkingState)>>;
+        type HandleRegisterFut = Ready<TransResult<Self, (ListeningState, WorkingState)>>;
         fn handle_register(self, _arg: Register) -> Self::HandleRegisterFut {
-            ready(Ok((self, WorkingState)))
+            ready(TransResult::Ok((self, WorkingState)))
         }
     }
 
     struct WorkingState;
 
     impl Working for WorkingState {
-        type OnSendCompletedFut = Ready<Result<(End, Completed)>>;
+        fn actor_impl() -> String {
+            "worker".into()
+        }
+
+        type OnSendCompletedFut = Ready<TransResult<Self, (End, Completed)>>;
         fn on_send_completed(self) -> Self::OnSendCompletedFut {
-            ready(Ok((End, Completed)))
+            ready(TransResult::Ok((End, Completed)))
         }
     }
 }

+ 71 - 0
crates/btrun/src/kernel.rs

@@ -0,0 +1,71 @@
+//! This module contains the code necessary to run and supervise actors.
+
+use std::{future::Future, pin::Pin};
+
+use log::{debug, error};
+use tokio::{
+    select,
+    sync::{mpsc, oneshot},
+    task::{AbortHandle, JoinSet},
+};
+
+use crate::ActorPanic;
+
+pub(super) type BoxedFuture = Pin<Box<dyn Send + Future<Output = ()>>>;
+
+pub(super) struct SpawnReq {
+    task: BoxedFuture,
+    sender: oneshot::Sender<AbortHandle>,
+}
+
+impl SpawnReq {
+    pub(super) fn new<Fut>(task: Fut) -> (Self, oneshot::Receiver<AbortHandle>)
+    where
+        Fut: 'static + Send + Future<Output = ()>,
+    {
+        let (sender, receiver) = oneshot::channel();
+        let req = Self {
+            task: Box::pin(task),
+            sender,
+        };
+        (req, receiver)
+    }
+}
+
+/// The kernel is responsible for spawning and supervising actors.
+pub(super) async fn kernel(mut tasks: mpsc::Receiver<SpawnReq>) {
+    let mut running = JoinSet::<()>::new();
+
+    loop {
+        select! {
+            option = tasks.recv() => {
+                match option {
+                    Some(SpawnReq { task, sender }) => {
+                        let handle = running.spawn(task);
+                        if sender.send(handle).is_err() {
+                            error!("SpawnReq sender dropped the channel. Aborting actor.");
+                        }
+                    }
+                    None => panic!("The Runtime was dropped!")
+                };
+            }
+            Some(Err(join_error)) = running.join_next() => {
+                match join_error.try_into_panic() {
+                    Ok(panic_obj) => {
+                        if let Some(actor_panic) = panic_obj.downcast_ref::<ActorPanic>() {
+                            error!("{actor_panic}")
+                        } else if let Some(message) = panic_obj.downcast_ref::<String>() {
+                            error!("Actor panicked in an unknown state: {message}");
+                        } else {
+                            error!("Actor panicked for an unknown reason.");
+                        }
+                    }
+                    Err(join_error) => {
+                        debug!("Actor was aborted: {join_error}");
+                    }
+                };
+
+            }
+        }
+    }
+}

+ 257 - 203
crates/btrun/src/lib.rs

@@ -13,14 +13,18 @@ use std::{
 };
 
 use btlib::{bterr, crypto::Creds, error::StringError, BlockPath, Result};
-use btserde::{field_helpers::smart_ptr, from_slice, to_vec, write_to};
+use btserde::{from_slice, to_vec, write_to};
 use bttp::{DeserCallback, MsgCallback, Receiver, Replier, Transmitter};
-use serde::{de::DeserializeOwned, Deserialize, Serialize};
+use kernel::{kernel, SpawnReq};
+use serde::{Deserialize, Serialize};
 use tokio::{
     sync::{mpsc, oneshot, Mutex, RwLock},
-    task::JoinHandle,
+    task::AbortHandle,
 };
-use uuid::Uuid;
+
+mod kernel;
+pub mod model;
+use model::*;
 
 /// Declares a new [Runtime] which listens for messages at the given IP address and uses the given
 /// [Creds]. Runtimes are intended to be created once in a process's lifetime and continue running
@@ -29,9 +33,9 @@ use uuid::Uuid;
 macro_rules! declare_runtime {
     ($name:ident, $ip_addr:expr, $creds:expr) => {
         ::lazy_static::lazy_static! {
-            static ref $name: &'static ::btrun::Runtime = {
+            static ref $name: &'static $crate::Runtime = {
                 ::lazy_static::lazy_static! {
-                    static ref RUNTIME: ::btrun::Runtime =  ::btrun::Runtime::_new($creds).unwrap();
+                    static ref RUNTIME: $crate::Runtime = $crate::Runtime::_new($creds).unwrap();
                     static ref RECEIVER: ::bttp::Receiver = _new_receiver($ip_addr, $creds, &*RUNTIME);
                 }
                 // By dereferencing RECEIVER we ensure it is started.
@@ -63,12 +67,17 @@ pub type Mailbox<T> = mpsc::Receiver<Envelope<T>>;
 /// be ready when the reply has been received.
 pub struct Runtime {
     path: Arc<BlockPath>,
-    handles: RwLock<HashMap<Uuid, ActorHandle>>,
+    handles: RwLock<HashMap<ActorId, ActorHandle>>,
     peers: RwLock<HashMap<Arc<BlockPath>, Transmitter>>,
     registry: RwLock<HashMap<ServiceId, ServiceRecord>>,
+    kernel_sender: mpsc::Sender<SpawnReq>,
 }
 
 impl Runtime {
+    /// The size of the buffer to use for the channel between [Runtime] and [kernel] used for
+    /// spawning tasks.
+    const SPAWN_REQ_BUF_SZ: usize = 16;
+
     ///  This method is not intended to be called directly by downstream crates. Use the macro
     /// [declare_runtime] to create a [Runtime].
     ///
@@ -76,11 +85,14 @@ impl Runtime {
     #[doc(hidden)]
     pub fn _new<C: 'static + Send + Sync + Creds>(creds: Arc<C>) -> Result<Runtime> {
         let path = Arc::new(creds.bind_path()?);
+        let (sender, receiver) = mpsc::channel(Self::SPAWN_REQ_BUF_SZ);
+        tokio::task::spawn(kernel(receiver));
         Ok(Runtime {
             path,
             handles: RwLock::new(HashMap::new()),
             peers: RwLock::new(HashMap::new()),
             registry: RwLock::new(HashMap::new()),
+            kernel_sender: sender,
         })
     }
 
@@ -101,16 +113,16 @@ impl Runtime {
         from: ActorName,
         msg: T,
     ) -> Result<()> {
-        if to.path == self.path {
+        if to.path().as_ref() == self.path.as_ref() {
             let guard = self.handles.read().await;
-            if let Some(handle) = guard.get(&to.act_id) {
+            if let Some(handle) = guard.get(&to.act_id()) {
                 handle.send(from, msg).await
             } else {
                 Err(bterr!("invalid actor name"))
             }
         } else {
             let guard = self.peers.read().await;
-            if let Some(peer) = guard.get(&to.path) {
+            if let Some(peer) = guard.get(to.path()) {
                 let buf = to_vec(&msg)?;
                 let wire_msg = WireMsg {
                     to,
@@ -119,9 +131,7 @@ impl Runtime {
                 };
                 peer.send(wire_msg).await
             } else {
-                // TODO: Use the filesystem to discover the address of the recipient and connect to
-                // it.
-                todo!()
+                todo!("Discover the network location of the recipient runtime and connect to it.")
             }
         }
     }
@@ -158,16 +168,16 @@ impl Runtime {
         from: ActorName,
         msg: T,
     ) -> Result<T::Reply> {
-        if to.path == self.path {
+        if to.path().as_ref() == self.path.as_ref() {
             let guard = self.handles.read().await;
-            if let Some(handle) = guard.get(&to.act_id) {
+            if let Some(handle) = guard.get(&to.act_id()) {
                 handle.call_through(msg).await
             } else {
                 Err(bterr!("invalid actor name"))
             }
         } else {
             let guard = self.peers.read().await;
-            if let Some(peer) = guard.get(&to.path) {
+            if let Some(peer) = guard.get(to.path()) {
                 let buf = to_vec(&msg)?;
                 let wire_msg = WireMsg {
                     to,
@@ -208,7 +218,7 @@ impl Runtime {
         bterr!("Service is not registered: '{id}'")
     }
 
-    async fn service_provider(&'static self, to: &ServiceAddr) -> Result<Uuid> {
+    async fn service_provider(&'static self, to: &ServiceAddr) -> Result<ActorId> {
         let actor_id = {
             let registry = self.registry.read().await;
             if let Some(record) = registry.get(to.service_id()) {
@@ -227,7 +237,7 @@ impl Runtime {
                 if record.actor_ids.is_empty() {
                     let spawner = record.spawner.as_ref();
                     let actor_name = spawner(self).await?;
-                    let actor_id = actor_name.act_id;
+                    let actor_id = actor_name.act_id();
                     record.actor_ids.push(actor_id);
                     actor_id
                 } else {
@@ -245,20 +255,20 @@ impl Runtime {
     where
         Msg: 'static + CallMsg,
         Fut: 'static + Send + Future<Output = ()>,
-        F: FnOnce(&'static Runtime, Mailbox<Msg>, Uuid) -> Fut,
+        F: FnOnce(&'static Runtime, Mailbox<Msg>, ActorId) -> Fut,
     {
         let mut guard = self.handles.write().await;
         let act_id = {
-            let mut act_id = Uuid::new_v4();
+            let mut act_id = ActorId::new();
             while guard.contains_key(&act_id) {
-                act_id = Uuid::new_v4();
+                act_id = ActorId::new();
             }
             act_id
         };
         let act_name = self.actor_name(act_id);
         let (tx, rx) = mpsc::channel::<Envelope<Msg>>(MAILBOX_LIMIT);
         // The deliverer closure is responsible for deserializing messages received over the wire
-        // and delivering them to the actor's mailbox, and sending replies to call messages.
+        // and delivering them to the actor's mailbox, as well as sending replies to call messages.
         let deliverer = {
             let buffer = Arc::new(Mutex::new(Vec::<u8>::new()));
             let tx = tx.clone();
@@ -297,7 +307,14 @@ impl Runtime {
                 fut
             }
         };
-        let handle = tokio::task::spawn(activator(self, rx, act_id));
+        let (req, receiver) = SpawnReq::new(activator(self, rx, act_id));
+        self.kernel_sender
+            .send(req)
+            .await
+            .unwrap_or_else(|err| panic!("The kernel has panicked: {err}"));
+        let handle = receiver
+            .await
+            .unwrap_or_else(|err| panic!("Kernel failed to send abort handle: {err}"));
         let actor_handle = ActorHandle::new(handle, tx, deliverer);
         guard.insert(act_id, actor_handle);
         act_name
@@ -326,9 +343,40 @@ impl Runtime {
         }
     }
 
-    pub async fn take_service(&self, id: &ServiceId) -> Option<ServiceRecord> {
-        let mut registry = self.registry.write().await;
-        registry.remove(id)
+    /// Removes the registration for the service with the given ID.
+    ///
+    /// If a vector reference is given in `service_providers`, the service providers which
+    /// are part of the deregistered service are appended to it. Otherwise, their
+    /// handles are dropped and their tasks are aborted.
+    ///
+    /// A [RuntimeError::BadServiceId] error is returned if there is no service registration with
+    /// the given ID in this runtime.
+    pub async fn deregister(
+        &self,
+        id: &ServiceId,
+        service_providers: Option<&mut Vec<ActorHandle>>,
+    ) -> Result<()> {
+        let record = {
+            let mut registry = self.registry.write().await;
+            if let Some(record) = registry.remove(id) {
+                record
+            } else {
+                return Err(RuntimeError::BadServiceId(id.clone()).into());
+            }
+        };
+        let mut handles = self.handles.write().await;
+        let removed = record
+            .actor_ids
+            .into_iter()
+            .flat_map(|act_id| handles.remove(&act_id));
+        // If a vector was provided, we put all the removed service providers in it. Otherwise
+        // we just drop them.
+        if let Some(service_providers) = service_providers {
+            service_providers.extend(removed);
+        } else {
+            for _ in removed {}
+        }
+        Ok(())
     }
 
     /// Returns the [ActorHandle] for the actor with the given name.
@@ -340,9 +388,9 @@ impl Runtime {
     /// returned when the handle is dropped), and no further messages will be delivered to it by
     /// this runtime.
     pub async fn take(&self, name: &ActorName) -> Result<ActorHandle> {
-        if name.path == self.path {
+        if name.path().as_ref() == self.path.as_ref() {
             let mut guard = self.handles.write().await;
-            if let Some(handle) = guard.remove(&name.act_id) {
+            if let Some(handle) = guard.remove(&name.act_id()) {
                 Ok(handle)
             } else {
                 Err(RuntimeError::BadActorName(name.clone()).into())
@@ -353,7 +401,7 @@ impl Runtime {
     }
 
     /// Returns the name of the actor in this runtime with the given actor ID.
-    pub fn actor_name(&self, act_id: Uuid) -> ActorName {
+    pub fn actor_name(&self, act_id: ActorId) -> ActorName {
         ActorName::new(self.path.clone(), act_id)
     }
 }
@@ -364,28 +412,13 @@ impl Drop for Runtime {
     }
 }
 
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub enum RuntimeError {
-    BadActorName(ActorName),
-}
-
-impl Display for RuntimeError {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        match self {
-            Self::BadActorName(name) => write!(f, "bad actor name: {name}"),
-        }
-    }
-}
-
-impl std::error::Error for RuntimeError {}
-
 /// Closure type used to spawn new service providers.
-pub type Spawner =
+type Spawner =
     Box<dyn Send + Sync + Fn(&'static Runtime) -> Pin<Box<dyn Future<Output = Result<ActorName>>>>>;
 
-pub struct ServiceRecord {
+struct ServiceRecord {
     spawner: Spawner,
-    actor_ids: Vec<Uuid>,
+    actor_ids: Vec<ActorId>,
 }
 
 impl ServiceRecord {
@@ -403,38 +436,32 @@ impl ServiceRecord {
     }
 }
 
-/// Represents the terminal state of an actor, where it stops processing messages and halts.
-pub struct End;
-
-impl End {
-    /// Returns the identifier for this type which is expected in protocol definitions.
-    pub fn ident() -> &'static str {
-        stringify!(End)
-    }
-}
-
-#[allow(dead_code)]
-/// Delivered to an actor implementation when it starts up.
-pub struct Activate {
-    /// A reference to the `Runtime` which is running this actor.
-    rt: &'static Runtime,
-    /// The ID assigned to this actor.
-    act_id: Uuid,
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum RuntimeError {
+    BadActorName(ActorName),
+    BadServiceId(ServiceId),
 }
 
-impl Activate {
-    pub fn new(rt: &'static Runtime, act_id: Uuid) -> Self {
-        Self { rt, act_id }
+impl Display for RuntimeError {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::BadActorName(name) => write!(f, "bad actor name: {name}"),
+            Self::BadServiceId(service_id) => {
+                write!(f, "service ID is not registered: {service_id}")
+            }
+        }
     }
 }
 
+impl std::error::Error for RuntimeError {}
+
 /// Deserializes replies sent over the wire.
-pub struct ReplyCallback<T> {
+struct ReplyCallback<T> {
     _phantom: PhantomData<T>,
 }
 
 impl<T: CallMsg> ReplyCallback<T> {
-    pub fn new() -> Self {
+    fn new() -> Self {
         Self {
             _phantom: PhantomData,
         }
@@ -500,7 +527,7 @@ impl RuntimeCallback {
 
     async fn deliver_local(&self, msg: WireMsg<'_>, replier: Option<Replier>) -> Result<()> {
         let guard = self.rt.handles.read().await;
-        if let Some(handle) = guard.get(&msg.to.act_id) {
+        if let Some(handle) = guard.get(&msg.to.act_id()) {
             let envelope = if let Some(replier) = replier {
                 WireEnvelope::Call { msg, replier }
             } else {
@@ -545,135 +572,20 @@ impl MsgCallback for RuntimeCallback {
     }
 }
 
-/// A unique identifier for a particular service.
-#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
-pub struct ServiceId(#[serde(with = "smart_ptr")] Arc<String>);
-
-impl ServiceId {
-    pub fn new(value: Arc<String>) -> Self {
-        Self(value)
-    }
-}
-
-impl AsRef<str> for ServiceId {
-    fn as_ref(&self) -> &str {
-        self.0.as_str()
-    }
-}
-
-impl<T: Into<String>> From<T> for ServiceId {
-    fn from(value: T) -> Self {
-        Self(Arc::new(value.into()))
-    }
-}
-
-impl Display for ServiceId {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.write_str(self.as_ref())
-    }
-}
-
-/// A unique identifier for a service.
-///
-/// A service is a collection of actors in the same directory which provide some functionality.
-#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
-pub struct ServiceName {
-    /// The path to the directory containing the service.
-    #[serde(with = "smart_ptr")]
-    path: Arc<BlockPath>,
-    /// The id of the service.
-    service_id: ServiceId,
-}
-
-impl ServiceName {
-    pub fn new(path: Arc<BlockPath>, service_id: ServiceId) -> Self {
-        Self { path, service_id }
-    }
-}
-
-/// Indicates the set of service providers a message is addressed to.
-///
-/// The message could be delivered to any of the service providers in this set, at random.
-pub struct ServiceAddr {
-    /// The [ServiceName] to address the message to.
-    name: ServiceName,
-    #[allow(dead_code)]
-    /// Indicates if the message should be routed towards the root of the tree or away from it.
-    rootward: bool,
-}
-
-impl ServiceAddr {
-    pub fn new(name: ServiceName, rootward: bool) -> Self {
-        Self { name, rootward }
-    }
-
-    pub fn path(&self) -> &Arc<BlockPath> {
-        &self.name.path
-    }
-
-    pub fn service_id(&self) -> &ServiceId {
-        &self.name.service_id
-    }
-}
-
-/// A unique identifier for a specific actor activation.
-#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
-pub struct ActorName {
-    /// The path to the directory containing this actor.
-    #[serde(with = "smart_ptr")]
-    path: Arc<BlockPath>,
-    /// A unique identifier for an actor activation. Even as an actor transitions to different types
-    /// as it handles messages, this value does not change. Thus this value can be used to trace an
-    /// actor through a series of state transitions.
-    act_id: Uuid,
-}
-
-impl ActorName {
-    pub fn new(path: Arc<BlockPath>, act_id: Uuid) -> Self {
-        Self { path, act_id }
-    }
-
-    pub fn path(&self) -> &Arc<BlockPath> {
-        &self.path
-    }
-
-    pub fn act_id(&self) -> Uuid {
-        self.act_id
-    }
-}
-
-impl Display for ActorName {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "{}@{}", self.act_id, self.path)
-    }
-}
-
-/// Trait for messages which expect exactly one reply.
-pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync {
-    /// The reply type expected for this message.
-    type Reply: Serialize + DeserializeOwned + Send + Sync;
-}
-
-/// Trait for messages which expect exactly zero replies.
-pub trait SendMsg: CallMsg {}
-
-/// A type used to express when a reply is not expected for a message type.
-#[derive(Serialize, Deserialize)]
-pub enum NoReply {}
-
 /// The maximum number of messages which can be kept in an actor's mailbox.
 const MAILBOX_LIMIT: usize = 32;
 
 /// The type of messages sent over the wire between runtimes.
 #[derive(Serialize, Deserialize)]
-pub struct WireMsg<'a> {
+struct WireMsg<'a> {
     to: ActorName,
     from: ActorName,
     payload: &'a [u8],
 }
 
 impl<'a> WireMsg<'a> {
-    pub fn new(to: ActorName, from: ActorName, payload: &'a [u8]) -> Self {
+    #[allow(dead_code)]
+    fn new(to: ActorName, from: ActorName, payload: &'a [u8]) -> Self {
         Self { to, from, payload }
     }
 }
@@ -685,7 +597,7 @@ impl<'a> bttp::CallMsg<'a> for WireMsg<'a> {
 impl<'a> bttp::SendMsg<'a> for WireMsg<'a> {}
 
 #[derive(Serialize, Deserialize)]
-pub enum WireReply<'a> {
+enum WireReply<'a> {
     Ok(&'a [u8]),
     Err(&'a str),
 }
@@ -796,19 +708,19 @@ impl<T: CallMsg> Envelope<T> {
 type FutureResult = Pin<Box<dyn Send + Future<Output = Result<()>>>>;
 
 pub struct ActorHandle {
-    handle: Option<JoinHandle<()>>,
+    handle: AbortHandle,
     sender: Box<dyn Send + Sync + Any>,
     deliverer: Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult>,
 }
 
 impl ActorHandle {
-    fn new<T, F>(handle: JoinHandle<()>, sender: mpsc::Sender<Envelope<T>>, deliverer: F) -> Self
+    fn new<T, F>(handle: AbortHandle, sender: mpsc::Sender<Envelope<T>>, deliverer: F) -> Self
     where
         T: 'static + CallMsg,
         F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult,
     {
         Self {
-            handle: Some(handle),
+            handle,
             sender: Box::new(sender),
             deliverer: Box::new(deliverer),
         }
@@ -841,17 +753,8 @@ impl ActorHandle {
         Ok(reply)
     }
 
-    pub async fn returned(&mut self) -> Result<()> {
-        if let Some(handle) = self.handle.take() {
-            handle.await?;
-        }
-        Ok(())
-    }
-
-    pub fn abort(&mut self) {
-        if let Some(handle) = self.handle.take() {
-            handle.abort();
-        }
+    pub fn abort(&self) {
+        self.handle.abort();
     }
 }
 
@@ -860,3 +763,154 @@ impl Drop for ActorHandle {
         self.abort();
     }
 }
+
+/// Sets up variable declarations and logging configuration to facilitate testing with a [Runtime].
+#[macro_export]
+macro_rules! test_setup {
+    () => {
+        const RUNTIME_ADDR: ::std::net::IpAddr =
+            ::std::net::IpAddr::V4(::std::net::Ipv4Addr::new(127, 0, 0, 1));
+        lazy_static! {
+            static ref RUNTIME_CREDS: ::std::sync::Arc<::btlib::crypto::ConcreteCreds> = {
+                let test_store = &::btlib_tests::TEST_STORE;
+                ::btlib::crypto::CredStore::node_creds(test_store).unwrap()
+            };
+        }
+        declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
+
+        lazy_static! {
+            /// A tokio async runtime.
+            ///
+            /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
+            /// is created for each test
+            /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
+            /// This creates a problem, because the first test thread to access the `RUNTIME` static
+            /// will initialize its `Receiver` in its runtime, which will stop running at the end of
+            /// the test. Hence subsequent tests will not be able to send remote messages to this
+            /// `Runtime`.
+            ///
+            /// By creating a single async runtime which is used by all of the tests, we can avoid this
+            /// problem.
+            static ref ASYNC_RT: tokio::runtime::Runtime = ::tokio::runtime::Builder
+                ::new_current_thread()
+                .enable_all()
+                .build()
+                .unwrap();
+        }
+
+        /// The log level to use when running tests.
+        const LOG_LEVEL: &str = "warn";
+
+        #[::ctor::ctor]
+        #[allow(non_snake_case)]
+        fn ctor() {
+            ::std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
+            let mut builder = ::env_logger::Builder::from_default_env();
+            ::btlib::log::BuilderExt::btformat(&mut builder).init();
+        }
+    };
+}
+
+#[cfg(test)]
+pub mod test {
+    use super::*;
+
+    use btlib::crypto::{CredStore, CredsPriv};
+    use btlib_tests::TEST_STORE;
+    use bttp::BlockAddr;
+    use lazy_static::lazy_static;
+    use serde::{Deserialize, Serialize};
+
+    use crate::CallMsg;
+
+    test_setup!();
+
+    #[derive(Serialize, Deserialize)]
+    struct EchoMsg(String);
+
+    impl CallMsg for EchoMsg {
+        type Reply = EchoMsg;
+    }
+
+    async fn echo(
+        _rt: &'static Runtime,
+        mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
+        _act_id: ActorId,
+    ) {
+        while let Some(envelope) = mailbox.recv().await {
+            let (msg, kind) = envelope.split();
+            match kind {
+                EnvelopeKind::Call { reply } => {
+                    let replier =
+                        reply.unwrap_or_else(|| panic!("The reply has already been sent."));
+                    if let Err(_) = replier.send(msg) {
+                        panic!("failed to send reply");
+                    }
+                }
+                _ => panic!("Expected EchoMsg to be a Call Message."),
+            }
+        }
+    }
+
+    #[test]
+    fn local_call() {
+        ASYNC_RT.block_on(async {
+            const EXPECTED: &str = "hello";
+            let name = RUNTIME.spawn(echo).await;
+            let from = ActorName::new(name.path().clone(), ActorId::new());
+
+            let reply = RUNTIME
+                .call(name.clone(), from, EchoMsg(EXPECTED.into()))
+                .await
+                .unwrap();
+
+            assert_eq!(EXPECTED, reply.0);
+
+            RUNTIME.take(&name).await.unwrap();
+        })
+    }
+
+    /// Tests the `num_running` method.
+    ///
+    /// This test uses its own runtime and so can use the `#[tokio::test]` attribute.
+    #[tokio::test]
+    async fn num_running() {
+        declare_runtime!(
+            LOCAL_RT,
+            // This needs to be different from the address where `RUNTIME` is listening.
+            IpAddr::from([127, 0, 0, 2]),
+            TEST_STORE.node_creds().unwrap()
+        );
+        assert_eq!(0, LOCAL_RT.num_running().await);
+        let name = LOCAL_RT.spawn(echo).await;
+        assert_eq!(1, LOCAL_RT.num_running().await);
+        LOCAL_RT.take(&name).await.unwrap();
+        assert_eq!(0, LOCAL_RT.num_running().await);
+    }
+
+    #[test]
+    fn remote_call() {
+        ASYNC_RT.block_on(async {
+            const EXPECTED: &str = "hello";
+            let actor_name = RUNTIME.spawn(echo).await;
+            let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
+            let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
+            let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
+                .await
+                .unwrap();
+            let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
+            let wire_msg =
+                WireMsg::new(actor_name.clone(), RUNTIME.actor_name(ActorId::new()), &buf);
+
+            let reply = transmitter
+                .call(wire_msg, ReplyCallback::<EchoMsg>::new())
+                .await
+                .unwrap()
+                .unwrap();
+
+            assert_eq!(EXPECTED, reply.0);
+
+            RUNTIME.take(&actor_name).await.unwrap();
+        });
+    }
+}

+ 224 - 0
crates/btrun/src/model.rs

@@ -0,0 +1,224 @@
+//! This module contains types used to model the actor system implemented by the runtime.
+
+use std::{fmt::Display, sync::Arc};
+
+use btlib::BlockPath;
+use btserde::field_helpers::smart_ptr;
+use serde::{de::DeserializeOwned, Deserialize, Serialize};
+use uuid::Uuid;
+
+/// Represents the result of an actor state transition, which can be one of the following:
+/// * `Success`: The transition succeeded and the new state is contained in the result.
+/// * `Abort`: The transition failed and the previous state along with an error describing
+///   the failure is in the result.
+/// * `Fatal`: The transition failed and the actor cannot recover from this failure. An error
+///   describing the failure is contained in the result.
+pub enum TransResult<From, To> {
+    /// Represents a successful transition.
+    Ok(To),
+    /// Represents an aborted transition.
+    Abort { from: From, err: btlib::Error },
+    /// Represents a failed transition which kills the actor which attempted it.
+    Fatal { err: btlib::Error },
+}
+
+/// Specifies a kind of transition, either a `Send` or a `Receive`.
+pub enum TransKind {
+    /// A transition which doesn't receive any message, but sends one or more.
+    Send,
+    /// A transition which receives a message.
+    Receive,
+}
+
+impl TransKind {
+    const fn verb(&self) -> &'static str {
+        match self {
+            Self::Send => "sending",
+            Self::Receive => "receiving",
+        }
+    }
+}
+
+/// A struct which conveys information about where an actor panic occurred to the kernel.
+pub struct ActorPanic {
+    /// The name of the actor implementation which panicked.
+    pub actor_impl: String,
+    /// The name of the state the actor was in.
+    pub state: &'static str,
+    /// The name of the message the actor was handling, or the name of the first message it was
+    /// trying to send.
+    pub message: &'static str,
+    /// The kind of transition which was being attempted.
+    pub kind: TransKind,
+    /// An error describing why the panic occurred.
+    pub err: btlib::Error,
+}
+
+impl Display for ActorPanic {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "Actor {} panicked in state {} while {} a message of type {}: {}",
+            self.actor_impl,
+            self.state,
+            self.kind.verb(),
+            self.message,
+            self.err
+        )
+    }
+}
+
+/// Represents the terminal state of an actor, where it stops processing messages and halts.
+pub struct End;
+
+impl End {
+    /// Returns the identifier for this type which is expected in protocol definitions.
+    pub fn ident() -> &'static str {
+        stringify!(End)
+    }
+}
+
+/// A unique identifier for a particular service.
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
+pub struct ServiceId(#[serde(with = "smart_ptr")] Arc<String>);
+
+impl ServiceId {
+    pub fn new(value: Arc<String>) -> Self {
+        Self(value)
+    }
+}
+
+impl AsRef<str> for ServiceId {
+    fn as_ref(&self) -> &str {
+        self.0.as_str()
+    }
+}
+
+impl<T: Into<String>> From<T> for ServiceId {
+    fn from(value: T) -> Self {
+        Self(Arc::new(value.into()))
+    }
+}
+
+impl Display for ServiceId {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.write_str(self.as_ref())
+    }
+}
+
+/// A unique identifier for a service.
+///
+/// A service is a collection of actors in the same directory which provide some functionality.
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
+pub struct ServiceName {
+    /// The path to the directory containing the service.
+    #[serde(with = "smart_ptr")]
+    path: Arc<BlockPath>,
+    /// The id of the service.
+    service_id: ServiceId,
+}
+
+impl ServiceName {
+    pub fn new(path: Arc<BlockPath>, service_id: ServiceId) -> Self {
+        Self { path, service_id }
+    }
+}
+
+/// Indicates the set of service providers a message is addressed to.
+///
+/// The message could be delivered to any of the service providers in this set, at random.
+pub struct ServiceAddr {
+    /// The [ServiceName] to address the message to.
+    name: ServiceName,
+    #[allow(dead_code)]
+    /// Indicates if the message should be routed towards the root of the tree or away from it.
+    rootward: bool,
+}
+
+impl ServiceAddr {
+    pub fn new(name: ServiceName, rootward: bool) -> Self {
+        Self { name, rootward }
+    }
+
+    pub fn path(&self) -> &Arc<BlockPath> {
+        &self.name.path
+    }
+
+    pub fn service_id(&self) -> &ServiceId {
+        &self.name.service_id
+    }
+}
+
+/// An identifier for an actor which is unique in a given runtime.
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
+pub struct ActorId(Uuid);
+
+impl ActorId {
+    pub fn new() -> Self {
+        Self(Uuid::new_v4())
+    }
+
+    /// Returns an actor ID that can't have messages delivered to it.
+    pub fn undeliverable() -> Self {
+        Self(Uuid::from_bytes([0; 16]))
+    }
+}
+
+impl Default for ActorId {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl Copy for ActorId {}
+
+impl Display for ActorId {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        self.0.fmt(f)
+    }
+}
+
+/// A unique identifier for a specific actor activation.
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
+pub struct ActorName {
+    /// The path to the directory containing this actor.
+    #[serde(with = "smart_ptr")]
+    path: Arc<BlockPath>,
+    /// A unique identifier for an actor activation. Even as an actor transitions to different types
+    /// as it handles messages, this value does not change. Thus this value can be used to trace an
+    /// actor through a series of state transitions.
+    act_id: ActorId,
+}
+
+impl ActorName {
+    pub fn new(path: Arc<BlockPath>, act_id: ActorId) -> Self {
+        Self { path, act_id }
+    }
+
+    pub fn path(&self) -> &Arc<BlockPath> {
+        &self.path
+    }
+
+    pub fn act_id(&self) -> ActorId {
+        self.act_id
+    }
+}
+
+impl Display for ActorName {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}@{}", self.act_id, self.path)
+    }
+}
+
+/// Trait for messages which expect exactly one reply.
+pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync {
+    /// The reply type expected for this message.
+    type Reply: Serialize + DeserializeOwned + Send + Sync;
+}
+
+/// Trait for messages which expect exactly zero replies.
+pub trait SendMsg: CallMsg {}
+
+/// A type used to express when a reply is not expected for a message type.
+#[derive(Serialize, Deserialize)]
+pub enum NoReply {}

+ 145 - 206
crates/btrun/tests/runtime_tests.rs

@@ -1,155 +1,23 @@
 #![feature(impl_trait_in_assoc_type)]
 
+use btrun::model::*;
+use btrun::test_setup;
 use btrun::*;
 
-use btlib::{
-    crypto::{ConcreteCreds, CredStore, CredsPriv},
-    log::BuilderExt,
-    Result,
-};
-use btlib_tests::TEST_STORE;
+use btlib::Result;
 use btproto::protocol;
-use btserde::to_vec;
-use bttp::{BlockAddr, Transmitter};
-use ctor::ctor;
 use lazy_static::lazy_static;
 use log;
 use serde::{Deserialize, Serialize};
 use std::{
     future::{ready, Future, Ready},
-    net::{IpAddr, Ipv4Addr},
     sync::{
         atomic::{AtomicU8, Ordering},
         Arc,
     },
 };
-use tokio::{runtime::Builder, sync::mpsc};
-use uuid::Uuid;
-
-const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
-lazy_static! {
-    static ref RUNTIME_CREDS: Arc<ConcreteCreds> = TEST_STORE.node_creds().unwrap();
-}
-declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
-
-lazy_static! {
-    /// A tokio async runtime.
-    ///
-    /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
-    /// is created for each test
-    /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
-    /// This creates a problem, because the first test thread to access the `RUNTIME` static
-    /// will initialize its `Receiver` in its runtime, which will stop running at the end of
-    /// the test. Hence subsequent tests will not be able to send remote messages to this
-    /// `Runtime`.
-    ///
-    /// By creating a single async runtime which is used by all of the tests, we can avoid this
-    /// problem.
-    static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread()
-        .enable_all()
-        .build()
-        .unwrap();
-}
-
-/// The log level to use when running tests.
-const LOG_LEVEL: &str = "warn";
-
-#[ctor]
-fn ctor() {
-    std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
-    env_logger::Builder::from_default_env().btformat().init();
-}
-
-#[derive(Serialize, Deserialize)]
-struct EchoMsg(String);
-
-impl CallMsg for EchoMsg {
-    type Reply = EchoMsg;
-}
-
-async fn echo(
-    _rt: &'static Runtime,
-    mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
-    _act_id: Uuid,
-) {
-    while let Some(envelope) = mailbox.recv().await {
-        let (msg, kind) = envelope.split();
-        match kind {
-            EnvelopeKind::Call { reply } => {
-                let replier = reply.unwrap_or_else(|| panic!("The reply has already been sent."));
-                if let Err(_) = replier.send(msg) {
-                    panic!("failed to send reply");
-                }
-            }
-            _ => panic!("Expected EchoMsg to be a Call Message."),
-        }
-    }
-}
-
-#[test]
-fn local_call() {
-    ASYNC_RT.block_on(async {
-        const EXPECTED: &str = "hello";
-        let name = RUNTIME.spawn(echo).await;
-        let from = ActorName::new(name.path().clone(), Uuid::default());
-
-        let reply = RUNTIME
-            .call(name.clone(), from, EchoMsg(EXPECTED.into()))
-            .await
-            .unwrap();
-
-        assert_eq!(EXPECTED, reply.0);
-
-        RUNTIME.take(&name).await.unwrap();
-    })
-}
-
-#[test]
-fn remote_call() {
-    ASYNC_RT.block_on(async {
-        const EXPECTED: &str = "hello";
-        let actor_name = RUNTIME.spawn(echo).await;
-        let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
-        let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
-        let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
-            .await
-            .unwrap();
-        let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
-        let wire_msg = WireMsg::new(
-            actor_name.clone(),
-            RUNTIME.actor_name(Uuid::default()),
-            &buf,
-        );
-
-        let reply = transmitter
-            .call(wire_msg, ReplyCallback::<EchoMsg>::new())
-            .await
-            .unwrap()
-            .unwrap();
 
-        assert_eq!(EXPECTED, reply.0);
-
-        RUNTIME.take(&actor_name).await.unwrap();
-    });
-}
-
-/// Tests the `num_running` method.
-///
-/// This test uses its own runtime and so can use the `#[tokio::test]` attribute.
-#[tokio::test]
-async fn num_running() {
-    declare_runtime!(
-        LOCAL_RT,
-        // This needs to be different from the address where `RUNTIME` is listening.
-        IpAddr::from([127, 0, 0, 2]),
-        TEST_STORE.node_creds().unwrap()
-    );
-    assert_eq!(0, LOCAL_RT.num_running().await);
-    let name = LOCAL_RT.spawn(echo).await;
-    assert_eq!(1, LOCAL_RT.num_running().await);
-    LOCAL_RT.take(&name).await.unwrap();
-    assert_eq!(0, LOCAL_RT.num_running().await);
-}
+test_setup!();
 
 mod ping_pong {
     use super::*;
@@ -167,16 +35,12 @@ mod ping_pong {
     }
     //
     // In words, the protocol is described as follows.
-    // 1. The ClientInit state receives the Activate message. It returns the SentPing state and a
-    //    Ping message to be sent to the Listening state.
-    // 2. The ServerInit state receives the Activate message. It returns the Listening state.
-    // 3. When the Listening state receives the Ping message it returns the End state and a
+    // 1. When the Listening state receives the Ping message it returns the End state and a
     //    Ping::Reply message to be sent to the SentPing state.
-    // 4. When the SentPing state receives the Ping::Reply message it returns the End state.
+    // 2. When the SentPing state receives the Ping::Reply message it returns the End state.
     //
     // The End state represents an end to the session described by the protocol. When an actor
     // transitions to the End state its function returns.
-    // The generated actor implementation is the sender of the Activate message.
     // When a state is expecting a Reply message, an error occurs if the message is not received
     // in a timely manner.
 
@@ -206,15 +70,21 @@ mod ping_pong {
                 .take()
                 .ok_or_else(|| bterr!("State was not returned."))?;
             let (new_state, result) = match state {
-                PingClientState::Client(state) => {
-                    let (new_state, _) = state.on_send_ping(&mut msg).await?;
-                    let new_state = PingClientState::End(new_state);
-                    let result = self
-                        .runtime
-                        .call_service(service, PingProtocolMsgs::Ping(msg))
-                        .await;
-                    (new_state, result)
-                }
+                PingClientState::Client(state) => match state.on_send_ping(&mut msg).await {
+                    TransResult::Ok((new_state, _)) => {
+                        let new_state = PingClientState::End(new_state);
+                        let result = self
+                            .runtime
+                            .call_service(service, PingProtocolMsgs::Ping(msg))
+                            .await;
+                        (new_state, result)
+                    }
+                    TransResult::Abort { from, err } => {
+                        let new_state = PingClientState::Client(from);
+                        (new_state, Err(err))
+                    }
+                    TransResult::Fatal { err } => return Err(err),
+                },
                 state => {
                     let result = Err(bterr!("Can't send Ping in state {}.", state.name()));
                     (state, result)
@@ -255,7 +125,7 @@ mod ping_pong {
             _runtime: &'static Runtime,
             make_init: F,
             mut mailbox: Mailbox<PingProtocolMsgs>,
-            _act_id: Uuid,
+            _act_id: ActorId,
         ) where
             Init: 'static + Server,
             F: 'static + Send + Sync + FnOnce() -> Init,
@@ -265,16 +135,26 @@ mod ping_pong {
                 let (msg, msg_kind) = envelope.split();
                 state = match (state, msg) {
                     (ServerState::Server(listening_state), PingProtocolMsgs::Ping(msg)) => {
-                        let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
-                        match msg_kind {
-                            EnvelopeKind::Call { reply: replier } => {
-                                let replier = replier.expect("The reply has already been sent.");
-                                if let Err(_) = replier.send(PingProtocolMsgs::PingReply(reply)) {
-                                    panic!("Failed to send Ping reply.");
+                        match listening_state.handle_ping(msg).await {
+                            TransResult::Ok((new_state, reply)) => match msg_kind {
+                                EnvelopeKind::Call { reply: replier } => {
+                                    let replier =
+                                        replier.expect("The reply has already been sent.");
+                                    if let Err(_) = replier.send(PingProtocolMsgs::PingReply(reply))
+                                    {
+                                        panic!("Failed to send Ping reply.");
+                                    }
+                                    ServerState::End(new_state)
                                 }
-                                ServerState::End(new_state)
+                                _ => panic!("'Ping' was expected to be a Call message."),
+                            },
+                            TransResult::Abort { from, err } => {
+                                log::warn!("Aborted transition from the {} while handling the {} message: {}", "Server", "Ping", err);
+                                ServerState::Server(from)
+                            }
+                            TransResult::Fatal { err } => {
+                                panic!("Fatal error while handling Ping message in Server state: {err}");
                             }
-                            _ => panic!("'Ping' was expected to be a Call message."),
                         }
                     }
                     (state, _) => state,
@@ -289,12 +169,12 @@ mod ping_pong {
         rt.register::<PingProtocolMsgs, _>(id, move |runtime| {
             let make_init = make_init.clone();
             let fut = async move {
-                let actor_name = runtime
+                let actor_impl = runtime
                     .spawn(move |_, mailbox, act_id| {
                         server_loop(runtime, make_init, mailbox, act_id)
                     })
                     .await;
-                Ok(actor_name)
+                Ok(actor_impl)
             };
             Box::pin(fut)
         })
@@ -322,10 +202,14 @@ mod ping_pong {
     }
 
     impl Client for ClientState {
-        type OnSendPingFut = impl Future<Output = Result<(End, PingReply)>>;
+        fn actor_impl() -> String {
+            "client".into()
+        }
+
+        type OnSendPingFut = impl Future<Output = TransResult<Self, (End, PingReply)>>;
         fn on_send_ping(self, _msg: &mut Ping) -> Self::OnSendPingFut {
             self.counter.fetch_sub(1, Ordering::SeqCst);
-            ready(Ok((End, PingReply)))
+            ready(TransResult::Ok((End, PingReply)))
         }
     }
 
@@ -341,10 +225,14 @@ mod ping_pong {
     }
 
     impl Server for ServerState {
-        type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
+        fn actor_impl() -> String {
+            "server".into()
+        }
+
+        type HandlePingFut = impl Future<Output = TransResult<Self, (End, PingReply)>>;
         fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
             self.counter.fetch_sub(1, Ordering::SeqCst);
-            ready(Ok((End, PingReply)))
+            ready(TransResult::Ok((End, PingReply)))
         }
     }
 
@@ -370,7 +258,7 @@ mod ping_pong {
 
             assert_eq!(0, counter.load(Ordering::SeqCst));
 
-            RUNTIME.take_service(&service_id).await.unwrap();
+            RUNTIME.deregister(&service_id, None).await.unwrap();
         });
     }
 }
@@ -425,7 +313,7 @@ mod client_callback {
 
     use super::*;
 
-    use std::time::Duration;
+    use std::{panic::panic_any, time::Duration};
     use tokio::{sync::oneshot, time::timeout};
 
     #[derive(Serialize, Deserialize)]
@@ -454,10 +342,14 @@ mod client_callback {
     }
 
     impl Unregistered for UnregisteredState {
+        fn actor_impl() -> String {
+            "client".into()
+        }
+
         type OnSendRegisterRegistered = RegisteredState;
-        type OnSendRegisterFut = Ready<Result<Self::OnSendRegisterRegistered>>;
+        type OnSendRegisterFut = Ready<TransResult<Self, Self::OnSendRegisterRegistered>>;
         fn on_send_register(self, _arg: &mut Register) -> Self::OnSendRegisterFut {
-            ready(Ok(RegisteredState {
+            ready(TransResult::Ok(RegisteredState {
                 sender: self.sender,
             }))
         }
@@ -468,10 +360,10 @@ mod client_callback {
     }
 
     impl Registered for RegisteredState {
-        type HandleCompletedFut = Ready<Result<End>>;
+        type HandleCompletedFut = Ready<TransResult<Self, End>>;
         fn handle_completed(self, arg: Completed) -> Self::HandleCompletedFut {
             self.sender.send(arg.value).unwrap();
-            ready(Ok(End))
+            ready(TransResult::Ok(End))
         }
     }
 
@@ -480,12 +372,16 @@ mod client_callback {
     }
 
     impl Listening for ListeningState {
+        fn actor_impl() -> String {
+            "server".into()
+        }
+
         type HandleRegisterListening = ListeningState;
         type HandleRegisterWorking = WorkingState;
-        type HandleRegisterFut = Ready<Result<(ListeningState, WorkingState)>>;
+        type HandleRegisterFut = Ready<TransResult<Self, (ListeningState, WorkingState)>>;
         fn handle_register(self, arg: Register) -> Self::HandleRegisterFut {
             let multiple = self.multiple;
-            ready(Ok((
+            ready(TransResult::Ok((
                 self,
                 WorkingState {
                     factor: arg.factor,
@@ -501,10 +397,14 @@ mod client_callback {
     }
 
     impl Working for WorkingState {
-        type OnSendCompletedFut = Ready<Result<(End, Completed)>>;
+        fn actor_impl() -> String {
+            "worker".into()
+        }
+
+        type OnSendCompletedFut = Ready<TransResult<Self, (End, Completed)>>;
         fn on_send_completed(self) -> Self::OnSendCompletedFut {
             let value = self.multiple * self.factor;
-            ready(Ok((End, Completed { value })))
+            ready(TransResult::Ok((End, Completed { value })))
         }
     }
 
@@ -539,16 +439,26 @@ mod client_callback {
                 .take()
                 .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
             let new_state = match state {
-                ClientState::Unregistered(state) => {
-                    let new_state = state.on_send_register(&mut msg).await?;
-                    let msg = ClientCallbackMsgs::Register(msg);
-                    self.runtime
-                        .send_service(to, self.name.clone(), msg)
-                        .await?;
-                    // QUESTION: Should `on_send_register` be required to return the previous state
-                    // if it encounters an error?
-                    ClientState::Registered(new_state)
-                }
+                ClientState::Unregistered(state) => match state.on_send_register(&mut msg).await {
+                    TransResult::Ok(new_state) => {
+                        let msg = ClientCallbackMsgs::Register(msg);
+                        self.runtime
+                            .send_service(to, self.name.clone(), msg)
+                            .await?;
+                        ClientState::Registered(new_state)
+                    }
+                    TransResult::Abort { from, err } => {
+                        log::warn!(
+                            "Aborted transition from the {} state: {}",
+                            "Unregistered",
+                            err
+                        );
+                        ClientState::Unregistered(from)
+                    }
+                    TransResult::Fatal { err } => {
+                        return Err(err);
+                    }
+                },
                 state => state,
             };
             *guard = Some(new_state);
@@ -572,15 +482,24 @@ mod client_callback {
                     let new_state = match (state, msg) {
                         (ClientState::Registered(curr_state), ClientCallbackMsgs::Completed(msg)) => {
                             match curr_state.handle_completed(msg).await {
-                                Ok(next) =>  ClientState::<Init>::End(next),
-                                Err(err) => {
-                                    log::error!("Failed to handle 'Completed' message in 'Registered' state: {err}");
-                                    panic!("We can't transition to a new state because we gave ownership away and that method failed!")
+                                TransResult::Ok(next) =>  ClientState::<Init>::End(next),
+                                TransResult::Abort { from, err } => {
+                                    log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Registered", "Completed", err);
+                                    ClientState::Registered(from)
+                                }
+                                TransResult::Fatal { err } => {
+                                    panic_any(ActorPanic {
+                                        actor_impl: Init::actor_impl(),
+                                        state: "Registered",
+                                        message: "Completed",
+                                        kind: TransKind::Receive,
+                                        err
+                                    });
                                 }
                             }
                         }
                         (state, msg) => {
-                            log::error!("Unexpected message '{}' in state '{}'.", msg.name(), state.name());
+                            log::error!("Unexpected message {} in state {}.", msg.name(), state.name());
                             state
                         }
                     };
@@ -620,7 +539,7 @@ mod client_callback {
             runtime: &'static Runtime,
             make_init: F,
             mut mailbox: Mailbox<ClientCallbackMsgs>,
-            _act_id: Uuid,
+            _act_id: ActorId,
         ) where
             Init: 'static + Listening<HandleRegisterListening = Init>,
             F: 'static + Send + Sync + Fn() -> Init,
@@ -631,7 +550,7 @@ mod client_callback {
                 let new_state = match (state, msg) {
                     (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
                         match curr_state.handle_register(msg).await {
-                            Ok((new_state, working_state)) => {
+                            TransResult::Ok((new_state, working_state)) => {
                                 if let EnvelopeKind::Send { from, .. } = msg_kind {
                                     start_worker(working_state, from, runtime).await;
                                 } else {
@@ -639,15 +558,24 @@ mod client_callback {
                                 }
                                 ServerState::Listening(new_state)
                             }
-                            Err(err) => {
-                                log::error!("Failed to handle the Register message: {err}");
-                                todo!("Need to recover the previous state from err.")
+                            TransResult::Abort { from, err } => {
+                                log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Listening", "Register", err);
+                                ServerState::Listening(from)
+                            }
+                            TransResult::Fatal { err } => {
+                                panic_any(ActorPanic {
+                                    actor_impl: Init::actor_impl(),
+                                    state: "Listening",
+                                    message: "Register",
+                                    kind: TransKind::Receive,
+                                    err,
+                                });
                             }
                         }
                     }
                     (state, msg) => {
                         log::error!(
-                            "Unexpected message '{}' in state '{}'.",
+                            "Unexpected message {} in state {}.",
                             msg.name(),
                             state.name()
                         );
@@ -663,12 +591,12 @@ mod client_callback {
                 let make_init = make_init.clone();
                 let fut = async move {
                     let make_init = make_init.clone();
-                    let actor_name = runtime
+                    let actor_impl = runtime
                         .spawn(move |_, mailbox, act_id| {
                             server_loop(runtime, make_init, mailbox, act_id)
                         })
                         .await;
-                    Ok(actor_name)
+                    Ok(actor_impl)
                 };
                 Box::pin(fut)
             })
@@ -690,17 +618,28 @@ mod client_callback {
         runtime
             .spawn::<ClientCallbackMsgs, _, _>(move |_, _, act_id| async move {
                 let msg = match init.on_send_completed().await {
-                    Ok((End, msg)) => msg,
-                    Err(err) => {
-                        log::error!("Failed to send Completed message: {err}");
-                        return;
+                    TransResult::Ok((End, msg)) => msg,
+                    TransResult::Abort { err, .. } | TransResult::Fatal { err } => {
+                        panic_any(ActorPanic {
+                            actor_impl: Init::actor_impl(),
+                            state: "Working",
+                            message: "Completed",
+                            kind: TransKind::Send,
+                            err,
+                        })
                     }
                 };
                 let from = runtime.actor_name(act_id);
                 let msg = ClientCallbackMsgs::Completed(msg);
-                if let Err(err) = runtime.send(owned, from, msg).await {
-                    log::error!("Failed to send Completed message: {err}");
-                }
+                runtime.send(owned, from, msg).await.unwrap_or_else(|err| {
+                    panic_any(ActorPanic {
+                        actor_impl: Init::actor_impl(),
+                        state: "Working",
+                        message: "Completed",
+                        kind: TransKind::Send,
+                        err,
+                    });
+                });
             })
             .await
     }

+ 2 - 2
crates/btsector/src/lib.rs

@@ -1,8 +1,8 @@
 //! Types which define the protocol used by the sector layer.
 
-use btlib::{crypto::merkle_stream::VariantMerkleTree, BlockMeta, Inode, Result};
+use btlib::{crypto::merkle_stream::VariantMerkleTree, BlockMeta, Inode};
 use btproto::protocol;
-use btrun::CallMsg;
+use btrun::model::CallMsg;
 
 use serde::{Deserialize, Serialize};