Ver Fonte

Started implementing code generation for services and workers.

Matthew Carr há 1 ano atrás
pai
commit
c5dcdd2886

+ 4 - 5
crates/btlib/src/error.rs

@@ -5,19 +5,18 @@
 //! one-off string errors. `btensure` takes a boolean expression as its first argument, and its
 //! remaining arguments are the same as those to `bterr`.
 
-use anyhow::anyhow;
 use serde::{Deserialize, Serialize};
 use std::{fmt::Display, io};
 
-use crate::Decompose;
+use crate::{anyhow, Decompose};
 
 /// Creates a new [Error], which contains a stacktrace captured at the point where this macro
 /// was evaluated.
 #[macro_export]
 macro_rules! bterr {
-    ($msg:literal $(,)?) => { $crate::Error::new(anyhow::anyhow!($msg)) };
-    ($err:expr $(,)?) => { $crate::Error::new(anyhow::anyhow!($err)) };
-    ($fmt:expr, $($arg:tt)*) => { $crate::Error::new(anyhow::anyhow!($fmt, $($arg)*)) };
+    ($msg:literal $(,)?) => { $crate::Error::new($crate::anyhow!($msg)) };
+    ($err:expr $(,)?) => { $crate::Error::new($crate::anyhow!($err)) };
+    ($fmt:expr, $($arg:tt)*) => { $crate::Error::new($crate::anyhow!($fmt, $($arg)*)) };
 }
 
 /// Ensures that an expression evaluates to true, and if it does'nt, returns an error.

+ 2 - 0
crates/btlib/src/lib.rs

@@ -29,6 +29,8 @@ extern crate static_assertions;
 #[cfg(test)]
 extern crate lazy_static;
 
+pub use anyhow::anyhow;
+
 use ::log::error;
 use btserde::{read_from, write_to};
 use fuse_backend_rs::abi::fuse_abi::{stat64, Attr};

+ 2 - 0
crates/btproto/src/error.rs

@@ -87,4 +87,6 @@ pub(crate) mod msgs {
         "Only a single reply can be sent in response to any message.";
     pub(crate) const UNOBSERVABLE_STATE: &str =
         "This client state is not allowed because it only receives replies. Such a state cannot be observed.";
+    pub(crate) const CLIENT_USED_IN_SERVICE: &str =
+        "A client state cannot be used inside `service()`.";
 }

+ 359 - 3
crates/btproto/src/generation.rs

@@ -5,13 +5,16 @@ use quote::{format_ident, quote, ToTokens};
 
 use crate::{
     case_convert::CaseConvert,
-    model::{MethodModel, ProtocolModel},
+    model::{
+        ActorKind, ActorModel, MethodModel, MsgInfo, OutputKind, ProtocolModel, TypeParamInfo,
+    },
 };
 
 impl ToTokens for ProtocolModel {
     fn to_tokens(&self, tokens: &mut TokenStream) {
         tokens.extend(self.generate_message_enum());
         tokens.extend(self.generate_state_traits());
+        tokens.extend(self.generate_spawn_functions());
     }
 }
 
@@ -25,8 +28,8 @@ impl ProtocolModel {
             .collect();
         let msg_types = msg_lookup.msg_iter().map(|msg| msg.msg_type());
         let all_replies = msg_lookup.msg_iter().all(|msg| msg.is_reply());
-        let enum_name = format_ident!("{}Msgs", self.def().name_def.name);
-        let enum_kinds_name = format_ident!("{}MsgKinds", self.def().name_def.name);
+        let enum_name = self.msg_enum_ident();
+        let enum_kinds_name = self.msg_enum_kinds_ident();
         let name_decl_vec: Vec<(Ident, TokenStream)> = variants
             .iter()
             .map(|variant| {
@@ -122,6 +125,7 @@ impl ProtocolModel {
                     #actor_impl_method
                     #( #method_tokens )*
 
+                    #[doc = "The name of this state."]
                     fn state_name() -> ::std::sync::Arc<String> {
                         use ::btrun::model::Lazy;
                         use ::std::sync::Arc;
@@ -136,6 +140,358 @@ impl ProtocolModel {
         }
         tokens
     }
+
+    fn generate_spawn_functions(&self) -> TokenStream {
+        let mut output = TokenStream::default();
+        for actor in self.actors_iter() {
+            output.extend(self.generate_spawn_function(actor));
+        }
+        output
+    }
+
+    fn generate_spawn_function(&self, actor: &ActorModel) -> TokenStream {
+        match actor.kind() {
+            ActorKind::Service => self.generate_service_spawn_function(actor),
+            ActorKind::Worker => self.generate_worker_spawn_function(actor),
+            ActorKind::Client => TokenStream::default(),
+        }
+    }
+
+    fn generate_worker_spawn_function(&self, actor: &ActorModel) -> TokenStream {
+        let function_name = actor.spawn_function_ident();
+        let TypeParamInfo {
+            type_params,
+            constraints,
+        } = self.type_param_info_for(actor.name());
+        let init_state_type_param = actor.init_state().type_param();
+        let state_enum_decl = self.generate_state_enum(actor);
+        let init_state_var = self.init_state_var();
+        let server_loop = self.generate_loop(actor);
+        quote! {
+            async fn #function_name<#( #type_params ),*>(
+                runtime: &'static ::btrun::Runtime,
+                owner_name: ::btrun::model::ActorName,
+                #init_state_var: #init_state_type_param,
+            ) -> ::btlib::Result<::btrun::model::ActorName>
+            where
+                #( #constraints ),*
+            {
+                #state_enum_decl
+
+                let actor = #server_loop ;
+                runtime.spawn(None, actor).await
+            }
+        }
+    }
+
+    fn generate_service_spawn_function(&self, actor: &ActorModel) -> TokenStream {
+        let function_name = format_ident!("register_{}", actor.def().actor.as_ref());
+        let TypeParamInfo {
+            type_params,
+            constraints,
+        } = self.type_param_info_for(actor.name());
+        let init_state_type_param = actor.init_state().type_param();
+        let state_enum_decl = self.generate_state_enum(actor);
+        let msg_enum = self.msg_enum_ident();
+        let init_state_var = self.init_state_var();
+        let server_loop = self.generate_loop(actor);
+        quote! {
+            async fn #function_name<#( #type_params ),*, F>(
+                runtime: &'static ::btrun::Runtime,
+                service_id: ::btrun::model::ServiceId,
+                make_init: F,
+            ) -> ::btlib::Result<::btrun::model::ServiceName>
+            where
+                F: 'static + Send + Sync + Clone + Fn() -> #init_state_type_param,
+                #( #constraints ),*
+            {
+                #state_enum_decl
+
+                runtime.register::<#msg_enum, _>(
+                    service_id,
+                    move |runtime: &'static ::btrun::Runtime| {
+                        let make_init = make_init.clone();
+                        let fut = async move {
+                            let #init_state_var = make_init();
+                            let server_loop = #server_loop ;
+                            // This shouldn't panic because we passed in None as the owner.
+                            let actor_impl = runtime.spawn(None, server_loop).await.unwrap();
+                            Ok(actor_impl)
+                        };
+                        ::std::boxed::Box::pin(fut)
+                    }
+                ).await
+            }
+        }
+    }
+
+    fn generate_loop(&self, actor: &ActorModel) -> TokenStream {
+        let init_state = actor.init_state().name();
+        let state_enum_ident = actor.state_enum_ident();
+        let msg_enum = self.msg_enum_ident();
+        let init_state_var = self.init_state_var();
+        let end_ident = self.end_ident();
+        let mailbox = self.mailbox_param();
+        let actor_id = self.actor_id_param();
+        let runtime = self.runtime_param();
+        let from = self.from_ident();
+        let reply = self.reply_ident();
+        let msg = self.msg_ident();
+        let actor_name = self.actor_name_ident();
+        let call_transitions = self.generate_call_transitions(actor);
+        let send_transitions = self.generate_send_transitions(actor);
+        let control_transitions = self.generate_control_transitions(actor);
+        quote! {
+            move |
+                mut #mailbox: ::btrun::Mailbox<#msg_enum>,
+                #actor_id: ::btrun::model::ActorId,
+                #runtime: &'static ::btrun::Runtime
+            | async move {
+                use ::btlib::bterr;
+                use ::btrun::{
+                    log,
+                    model::{
+                        Envelope, ControlMsg, Named, TransResult, ActorError, ActorErrorPayload,
+                        TransKind,
+                    }
+                };
+                let #actor_name = #runtime . actor_name(#actor_id);
+                let mut state = #state_enum_ident :: #init_state (#init_state_var);
+                while let Some(envelope) = #mailbox.recv().await {
+                    let new_state = match envelope {
+                        Envelope::Call { #msg, #from, mut #reply, .. } => #call_transitions
+                        Envelope::Send { #msg, #from, .. } => #send_transitions
+                        Envelope::Control(#msg) => #control_transitions
+                    };
+                    state = new_state;
+                    if let #state_enum_ident::#end_ident(_) = &state {
+                        break;
+                    }
+                }
+                Ok(#actor_id)
+            }
+        }
+    }
+
+    fn generate_call_transitions(&self, actor: &ActorModel) -> TokenStream {
+        self.generate_transitions(actor, |msg_info| msg_info.is_call())
+    }
+
+    fn generate_send_transitions(&self, actor: &ActorModel) -> TokenStream {
+        self.generate_transitions(actor, |msg_info| !msg_info.is_call())
+    }
+
+    fn generate_transitions(
+        &self,
+        actor: &ActorModel,
+        filter: impl Fn(&MsgInfo) -> bool,
+    ) -> TokenStream {
+        let state_enum_ident = actor.state_enum_ident();
+        let msg_enum_ident = self.msg_enum_ident();
+        let init_state_type_param = actor.init_state().type_param();
+        let msg_enum_kinds = self.msg_enum_kinds_ident();
+        let actor_id = self.actor_id_param();
+        let transitions = actor.states().values().flat_map(|state| {
+            state.methods().values().filter(|method| {
+                if let Some(in_msg) = method.def().in_msg() {
+                    let msg_info = self.msg_lookup().lookup(in_msg);
+                    filter(msg_info)
+                } else {
+                    false
+                }
+            })
+            .map(|method| {
+                let state_type_param = state.type_param();
+                let mut output_iter = method.outputs().iter();
+                let next_state = output_iter.next()
+                    .unwrap_or_else(|| panic!("There are no outputs for method {} in state {}.", method.name(), state.name()));
+                let next_state_name = if let OutputKind::State { def, .. } = next_state.kind() {
+                    def.state_trait.as_ref()
+                } else {
+                    panic!("First output of {} method was not a state.", method.name());
+                };
+                let next_state_var = next_state.var_name();
+                let out_states = output_iter
+                    .flat_map(|output| {
+                        if let OutputKind::State { def, .. } = output.kind() {
+                            Some((output.var_name(), def))
+                        } else {
+                            None
+                        }
+                    })
+                    .map(|(var_name, def)| {
+                        let spawning_actor = self.actor_lookup().actor_with_state(&def.state_trait);
+                        let spawning_model = self.actors().get(spawning_actor)
+                            .unwrap_or_else(|| panic!("There was no actor named {spawning_actor}."));
+                        let spawn_function = spawning_model.spawn_function_ident()
+                            .unwrap_or_else(|| panic!("Actor {spawning_actor} of kind {:?} has no spawn function.", spawning_model.kind()));
+                        let from = self.from_ident();
+                        let runtime = self.runtime_param();
+                        let method_name = method.name();
+                        quote! {
+                            if let Err(err) = #spawn_function(#runtime, #from, #var_name).await {
+                                log::error!(
+                                    "Failed to spawn {} actor after the {} method: {err}",
+                                    stringify!(#spawning_actor),
+                                    stringify!(#method_name)
+                                )
+                            }
+                        }
+                    });
+                let out_msgs = method.outputs().iter()
+                    .flat_map(|output| {
+                        if let OutputKind::Msg { def, .. } = output.kind() {
+                            Some((output.var_name(), def))
+                        } else {
+                            None
+                        }
+                    })
+                    .map(|(var_name, dest)| {
+                        if dest.msg.is_reply() {
+                            let reply = self.reply_ident();
+                            let msg_type = &dest.msg.msg_type;
+                            let reply_variant = self.msg_lookup().lookup(&dest.msg).msg_name();
+                            let error_msg = format!("Failed to send {} reply.", msg_type);
+                            quote! {
+                                if let Some(mut reply) = #reply.take() {
+                                    if let Err(_) = reply.send(#msg_enum_ident :: #reply_variant (#var_name)) {
+                                        return Err(ActorError::new(
+                                            bterr!(#error_msg),
+                                            ActorErrorPayload {
+                                                actor_id: #actor_id,
+                                                actor_impl: #init_state_type_param :: actor_impl(),
+                                                state: #state_type_param :: state_name(),
+                                                message: #msg_enum_kinds :: #msg_type .name(),
+                                                kind: TransKind::Receive,
+                                            }
+                                        ));
+                                    }
+                                } else {
+                                    log::error!(
+                                        "Reply to {} message has already been sent.",
+                                        #msg_enum_kinds :: #msg_type .name()
+                                    );
+                                }
+                            }
+                        } else {
+                            todo!("Send message to an owned state or to a service.");
+                        }
+                    });
+                let method_name = method.name();
+                let state_name = state.name();
+                let msg_name = method.msg_received_input().unwrap().msg_name();
+                let out_vars = method.outputs().iter().map(|output| output.var_name());
+                quote! {
+                    (#state_enum_ident :: #state_name(state), #msg_enum_ident :: #msg_name(msg)) => {
+                        match state.#method_name(msg).await {
+                            TransResult::Ok(( #( #out_vars ),* )) => {
+                                #( #out_states )*
+                                #( #out_msgs )*
+                                #state_enum_ident :: #next_state_name (#next_state_var)
+                            }
+                            TransResult::Abort { from, err, .. } => {
+                                log::warn!(
+                                    "Aborted transition from the {} state while handling the {} message: {}",
+                                    stringify!(#state_name),
+                                    stringify!(#msg_name),
+                                    err
+                                );
+                                #state_enum_ident :: #state_name(from)
+                            }
+                            TransResult::Fatal { err, .. } => {
+                                return Err(ActorError::new(
+                                    err,
+                                    ActorErrorPayload {
+                                        actor_id: #actor_id,
+                                        actor_impl: #init_state_type_param :: actor_impl(),
+                                        state: #state_type_param :: state_name(),
+                                        message: #msg_enum_kinds :: #msg_name . name(),
+                                        kind: TransKind::Receive
+                                    }
+                                ));
+                            }
+                        }
+                    },
+                }
+            })
+        });
+        quote! {
+            match (state, msg) {
+                #( #transitions )*
+                (state, msg) => {
+                    log::error!(
+                        "Unexpected message {} in state {}.", msg.name(), state.name()
+                    );
+                    state
+                }
+            }
+        }
+    }
+
+    fn generate_control_transitions(&self, _actor: &ActorModel) -> TokenStream {
+        quote! {
+            todo!()
+        }
+    }
+
+    fn generate_state_enum(&self, actor: &ActorModel) -> TokenStream {
+        let enum_ident = actor.state_enum_ident();
+        let end_ident = self.end_ident();
+        let pairs: Vec<_> = actor
+            .states()
+            .values()
+            .map(|state| {
+                let trait_type = state.name();
+                let type_param = format_ident!("T{trait_type}");
+                (trait_type, Some(type_param))
+            })
+            .chain(std::iter::once((end_ident, None)))
+            .collect();
+        let type_param_decls = pairs
+            .iter()
+            .filter(|(_, type_param)| type_param.is_some())
+            .map(|(trait_type, type_param)| quote! { #type_param: #trait_type });
+        let decl_type_params = quote! { < #( #type_param_decls ),* > };
+        let variants = pairs
+            .iter()
+            .filter(|(_, type_param)| type_param.is_some())
+            .map(|(trait_type, type_param)| quote! { #trait_type(#type_param) })
+            .chain(std::iter::once(
+                quote! { #end_ident(::btrun::model::#end_ident) },
+            ));
+        let type_params = pairs.iter().flat_map(|(_, type_param)| type_param);
+        let name_pairs: Vec<_> = pairs
+            .iter()
+            .map(|(trait_type, _)| {
+                let trait_type_string = trait_type.to_string();
+                let name_ident = format_ident!("{}_NAME", trait_type_string.pascal_to_snake().to_uppercase());
+                let name_decl = quote! {
+                    static #name_ident: ::btrun::model::Lazy<::std::sync::Arc<String>> =
+                        ::btrun::model::Lazy::new(|| ::std::sync::Arc::new(#trait_type_string.into()));
+                };
+                let match_branch = quote! {
+                    Self::#trait_type(_) => #name_ident.clone(),
+                };
+                (name_decl, match_branch)
+            })
+            .collect();
+        let name_decls = name_pairs.iter().map(|(name_decl, _)| name_decl);
+        let name_match_branches = name_pairs.iter().map(|(_, match_branch)| match_branch);
+        quote! {
+            enum #enum_ident #decl_type_params {
+                #( #variants ),*
+            }
+
+            impl #decl_type_params ::btrun::model::Named for #enum_ident<#( #type_params ),*> {
+                fn name(&self) -> ::std::sync::Arc<String> {
+                    #( #name_decls )*
+                    match self {
+                        #( #name_match_branches )*
+                    }
+                }
+            }
+        }
+    }
 }
 
 impl MethodModel {

+ 376 - 38
crates/btproto/src/model.rs

@@ -1,5 +1,5 @@
 use std::{
-    collections::{HashMap, HashSet},
+    collections::{HashMap, HashSet, LinkedList},
     hash::Hash,
     rc::Rc,
 };
@@ -11,8 +11,8 @@ use quote::{format_ident, quote, ToTokens};
 use crate::{
     case_convert::CaseConvert,
     error,
-    parsing::MessageReplyPart,
     parsing::{ActorDef, Dest, GetSpan, Message, Protocol, State, Transition},
+    parsing::{DestinationState, MessageReplyPart},
 };
 
 pub(crate) struct ProtocolModel {
@@ -20,6 +20,28 @@ pub(crate) struct ProtocolModel {
     msg_lookup: MsgLookup,
     actor_lookup: ActorLookup,
     actors: HashMap<Rc<Ident>, ActorModel>,
+    /// The name of the message enum used by this protocol.
+    msg_enum_name: Ident,
+    /// The name of the message kinds enum used by this protocol.
+    msg_enum_kinds_name: Ident,
+    /// The [Ident] containing [End::ident].
+    end_ident: Ident,
+    /// The name of the [btrun::Mailbox] parameter in an actor closure.
+    mailbox_param: Ident,
+    /// The name of the [btrun::ActorId] parameter in an actor closure.
+    actor_id_param: Ident,
+    /// The name of the `&'static `[Runtime] parameter in an actor closure.
+    runtime_param: Ident,
+    /// The name of the variable used to hold the `msg` field from a [btrun::Envelope].
+    msg_ident: Ident,
+    /// The name of the variable used to hold the `reply` field from a [btrun::Envelope].
+    reply_ident: Ident,
+    /// The name of the variable used to hold the `from` field of a [btrun::Envelope].
+    from_ident: Ident,
+    /// The name of the local variable in an actor closure used to hold the actor's name.
+    actor_name_ident: Ident,
+    /// The identifier for the variable holding the initial state of an actor.
+    init_state_var: Ident,
 }
 
 impl ProtocolModel {
@@ -79,22 +101,43 @@ impl ProtocolModel {
                     .cloned();
                 (state_name.clone(), transitions)
             });
-            let actor = ActorModel::new(
-                actor_def.clone(),
-                &msg_lookup,
-                *is_client.get(&actor_def.actor).unwrap(),
-                transitions_by_state,
-            )?;
+            let is_client = *is_client.get(&actor_def.actor).unwrap();
+            let is_service = actor_lookup.service_providers.contains(&actor_def.actor);
+            let kind = ActorKind::new(is_client, is_service).ok_or_else(|| {
+                syn::Error::new(actor_def.actor.span(), error::msgs::CLIENT_USED_IN_SERVICE)
+            })?;
+            let actor =
+                ActorModel::new(actor_def.clone(), &msg_lookup, kind, transitions_by_state)?;
             actors.insert(actor_name.clone(), actor);
         }
+
         Ok(Self {
+            msg_enum_name: Self::make_msg_enum_name(&def),
+            msg_enum_kinds_name: Self::make_msg_enum_kinds_name(&def),
             def,
             msg_lookup,
             actor_lookup,
             actors,
+            end_ident: format_ident!("{}", End::ident()),
+            mailbox_param: format_ident!("mailbox"),
+            actor_id_param: format_ident!("actor_id"),
+            runtime_param: format_ident!("runtime"),
+            msg_ident: format_ident!("msg"),
+            reply_ident: format_ident!("reply"),
+            from_ident: format_ident!("from"),
+            actor_name_ident: format_ident!("actor_name"),
+            init_state_var: format_ident!("init"),
         })
     }
 
+    fn make_msg_enum_name(def: &Protocol) -> Ident {
+        format_ident!("{}Msgs", def.name_def.name)
+    }
+
+    fn make_msg_enum_kinds_name(def: &Protocol) -> Ident {
+        format_ident!("{}MsgKinds", def.name_def.name)
+    }
+
     pub(crate) fn def(&self) -> &Protocol {
         &self.def
     }
@@ -107,6 +150,10 @@ impl ProtocolModel {
         &self.actor_lookup
     }
 
+    pub(crate) fn actors(&self) -> &HashMap<Rc<Ident>, ActorModel> {
+        &self.actors
+    }
+
     pub(crate) fn actors_iter(&self) -> impl Iterator<Item = &ActorModel> {
         self.actors.values()
     }
@@ -126,25 +173,160 @@ impl ProtocolModel {
         self.methods_iter()
             .flat_map(|method| method.outputs().iter())
     }
+
+    pub(crate) fn msg_enum_ident(&self) -> &Ident {
+        &self.msg_enum_name
+    }
+
+    pub(crate) fn msg_enum_kinds_ident(&self) -> &Ident {
+        &self.msg_enum_kinds_name
+    }
+
+    pub(crate) fn end_ident(&self) -> &Ident {
+        &self.end_ident
+    }
+
+    pub(crate) fn mailbox_param(&self) -> &Ident {
+        &self.mailbox_param
+    }
+
+    pub(crate) fn actor_id_param(&self) -> &Ident {
+        &self.actor_id_param
+    }
+
+    pub(crate) fn runtime_param(&self) -> &Ident {
+        &self.runtime_param
+    }
+
+    #[allow(clippy::wrong_self_convention)]
+    pub(crate) fn from_ident(&self) -> &Ident {
+        &self.from_ident
+    }
+
+    pub(crate) fn reply_ident(&self) -> &Ident {
+        &self.reply_ident
+    }
+
+    pub(crate) fn msg_ident(&self) -> &Ident {
+        &self.msg_ident
+    }
+
+    pub(crate) fn actor_name_ident(&self) -> &Ident {
+        &self.actor_name_ident
+    }
+
+    pub(crate) fn init_state_var(&self) -> &Ident {
+        &self.init_state_var
+    }
+
+    fn get_actor<'a>(&'a self, actor_name: &Ident) -> &'a ActorModel {
+        self.actors
+            .get(actor_name)
+            .unwrap_or_else(|| panic!("Invalid actor name: '{actor_name}'"))
+    }
+
+    fn get_state<'a>(&'a self, state_name: &Ident) -> &'a StateModel {
+        let actor_name = self.actor_lookup().actor_with_state(state_name);
+        let actor = self.get_actor(actor_name);
+        actor
+            .states()
+            .get(state_name)
+            .unwrap_or_else(|| panic!("Actor {actor_name} doesn't contain state {state_name}."))
+    }
+
+    /// Returns a struct with the type params and their associated constraints for the given actor's
+    /// spawn function.
+    pub(crate) fn type_param_info_for<'a>(&'a self, actor_name: &Ident) -> TypeParamInfo<'a> {
+        let mut type_params = Vec::<&Ident>::new();
+        let mut constraints = Vec::<TokenStream>::new();
+        // We do a breadth-first traversal over the associated types referenced by this actor,
+        // starting with it's initial state.
+        let mut visited = HashSet::<&Ident>::new();
+        let mut queue = LinkedList::<&Ident>::new();
+        queue.push_front(self.get_actor(actor_name).init_state().name());
+        while !queue.is_empty() {
+            let state_name = queue.pop_back().unwrap();
+            visited.insert(state_name);
+            let state = self.get_state(state_name);
+            let mut any = false;
+            let eqs = state
+                .out_states_and_assoc_types()
+                .map(|(out_state, assoc_type)| {
+                    any = true;
+                    if !visited.contains(out_state) {
+                        queue.push_front(out_state);
+                    }
+                    let type_param = self.get_state(out_state).type_param();
+                    quote! { #assoc_type = #type_param }
+                });
+            let constraint_list = quote! { #( #eqs ),* };
+            let constraint = if any {
+                quote! { < #constraint_list > }
+            } else {
+                constraint_list
+            };
+            let type_param = state.type_param();
+            type_params.push(type_param);
+            let state_name = state.name();
+            let constraint = quote! { #type_param: 'static + #state_name #constraint };
+            constraints.push(constraint);
+        }
+        TypeParamInfo {
+            constraints,
+            type_params,
+        }
+    }
 }
 
-pub(crate) struct ActorModel {
-    #[allow(dead_code)]
-    def: Rc<ActorDef>,
-    /// Indicates if this actor is a client.
-    ///
+pub(crate) struct TypeParamInfo<'a> {
+    pub(crate) type_params: Vec<&'a Ident>,
+    pub(crate) constraints: Vec<TokenStream>,
+}
+
+#[derive(Clone, Debug)]
+/// A categorization of different actor types based on their messaging behavior.
+pub(crate) enum ActorKind {
     /// A client is an actor which is not spawned by another actor (i.e. has no parents) and
     /// whose initial state has no transition which receives a message, or which is spawned by a
     /// client.
-    is_client: bool,
+    Client,
+    /// Any actor with a state that appears wrapped in `service()` is in this category.
+    Service,
+    /// Any actor not in either of the other two categories falls into this one.
+    Worker,
+}
+
+impl ActorKind {
+    fn new(is_client: bool, is_service: bool) -> Option<Self> {
+        match (is_client, is_service) {
+            (true, false) => Some(Self::Client),
+            (false, true) => Some(Self::Service),
+            (false, false) => Some(Self::Worker),
+            (true, true) => None,
+        }
+    }
+
+    pub(crate) fn is_client(&self) -> bool {
+        matches!(self, Self::Client)
+    }
+}
+
+impl Copy for ActorKind {}
+
+pub(crate) struct ActorModel {
+    #[allow(dead_code)]
+    def: Rc<ActorDef>,
+    kind: ActorKind,
+    state_enum_ident: Ident,
     states: HashMap<Rc<Ident>, StateModel>,
+    spawn_function_ident: Option<Ident>,
 }
 
 impl ActorModel {
     fn new<S, T>(
         def: Rc<ActorDef>,
         messages: &MsgLookup,
-        is_client: bool,
+        kind: ActorKind,
         state_iter: S,
     ) -> syn::Result<Self>
     where
@@ -156,6 +338,7 @@ impl ActorModel {
             .map(|(name, transitions)| (name, transitions.into_iter().collect()))
             .collect();
         let mut states = HashMap::new();
+        let is_client = matches!(kind, ActorKind::Client);
         for (name, transitions) in transitions.into_iter() {
             let state = StateModel::new(name.clone(), messages, transitions, is_client)?;
             if let Some(prev) = states.insert(name, state) {
@@ -165,25 +348,66 @@ impl ActorModel {
                 );
             }
         }
+
+        let actor_name = def.actor.as_ref();
         Ok(Self {
+            state_enum_ident: Self::make_state_enum_ident(actor_name),
+            spawn_function_ident: Self::make_spawn_function_ident(kind, actor_name),
             def,
-            is_client,
+            kind,
             states,
         })
     }
 
-    pub(crate) fn is_client(&self) -> bool {
-        self.is_client
+    fn make_state_enum_ident(actor_name: &Ident) -> Ident {
+        format_ident!("{}State", actor_name.to_string().snake_to_pascal())
+    }
+
+    fn make_spawn_function_ident(kind: ActorKind, actor_name: &Ident) -> Option<Ident> {
+        // Services are registered, not spawned, so they have no spawn function.
+        if let ActorKind::Service = kind {
+            None
+        } else {
+            Some(format_ident!("spawn_{actor_name}"))
+        }
+    }
+
+    pub(crate) fn def(&self) -> &ActorDef {
+        &self.def
+    }
+
+    pub(crate) fn name(&self) -> &Ident {
+        &self.def.actor
+    }
+
+    pub(crate) fn kind(&self) -> ActorKind {
+        self.kind
     }
 
     pub(crate) fn states(&self) -> &HashMap<Rc<Ident>, StateModel> {
         &self.states
     }
+
+    pub(crate) fn init_state(&self) -> &StateModel {
+        // It's a syntax error to have an IdentArray with no states in it, so this unwrap
+        // shouldn't panic.
+        let init = self.def.states.as_ref().first().unwrap();
+        self.states.get(init).unwrap()
+    }
+
+    pub(crate) fn state_enum_ident(&self) -> &Ident {
+        &self.state_enum_ident
+    }
+
+    pub(crate) fn spawn_function_ident(&self) -> Option<&Ident> {
+        self.spawn_function_ident.as_ref()
+    }
 }
 
 pub(crate) struct StateModel {
     name: Rc<Ident>,
     methods: HashMap<Rc<Ident>, MethodModel>,
+    type_param: Ident,
 }
 
 impl StateModel {
@@ -207,7 +431,15 @@ impl StateModel {
                 ));
             }
         }
-        Ok(Self { name, methods })
+        Ok(Self {
+            type_param: Self::make_generic_type_param(name.as_ref()),
+            name,
+            methods,
+        })
+    }
+
+    fn make_generic_type_param(name: &Ident) -> Ident {
+        format_ident!("T{name}")
     }
 
     pub(crate) fn name(&self) -> &Ident {
@@ -217,6 +449,22 @@ impl StateModel {
     pub(crate) fn methods(&self) -> &HashMap<Rc<Ident>, MethodModel> {
         &self.methods
     }
+
+    pub(crate) fn type_param(&self) -> &Ident {
+        &self.type_param
+    }
+
+    fn out_states_and_assoc_types(&self) -> impl Iterator<Item = (&Ident, &Ident)> {
+        self.methods().values().flat_map(|method| {
+            method.outputs().iter().flat_map(|output| {
+                output
+                    .kind()
+                    .state_trait()
+                    .map(|ptr| ptr.as_ref())
+                    .zip(output.assoc_type())
+            })
+        })
+    }
 }
 
 impl GetSpan for StateModel {
@@ -272,7 +520,7 @@ impl MethodModel {
 
     fn new_inputs(def: &Transition, messages: &MsgLookup, part_of_client: bool) -> Vec<InputModel> {
         let mut inputs = Vec::new();
-        let arg_kind = if def.is_client() {
+        let arg_kind = if def.not_receiving() {
             InputKind::ByMutRef
         } else {
             InputKind::ByValue
@@ -298,6 +546,22 @@ impl MethodModel {
         inputs
     }
 
+    /// Returns the input associated with the message this method is handling, or [None] if this
+    /// method is not handling a message.
+    pub(crate) fn msg_received_input(&self) -> Option<&InputModel> {
+        if self.def.in_msg().is_some() {
+            let input_model = self.inputs().get(0).unwrap_or_else(|| {
+                panic!(
+                    "Method {} had no inputs despite handling a message.",
+                    self.name()
+                )
+            });
+            Some(input_model)
+        } else {
+            None
+        }
+    }
+
     fn new_outputs(
         def: &Transition,
         type_prefix: &str,
@@ -365,19 +629,26 @@ impl Copy for InputKind {}
 #[cfg_attr(test, derive(Debug))]
 pub(crate) struct InputModel {
     name: Ident,
+    msg_name: Rc<Ident>,
     arg_type: Rc<TokenStream>,
     arg_kind: InputKind,
 }
 
 impl InputModel {
-    fn new(type_name: Rc<Ident>, arg_type: Rc<TokenStream>, arg_kind: InputKind) -> Self {
-        let name = format_ident!("{}_arg", type_name.to_string().pascal_to_snake());
+    fn new(msg_name: Rc<Ident>, arg_type: Rc<TokenStream>, arg_kind: InputKind) -> Self {
+        let name = format_ident!("{}_arg", msg_name.pascal_to_snake());
         Self {
             name,
+            msg_name,
             arg_type,
             arg_kind,
         }
     }
+
+    /// Returns the name of the message this input is for.
+    pub(crate) fn msg_name(&self) -> &Ident {
+        self.msg_name.as_ref()
+    }
 }
 
 impl ToTokens for InputModel {
@@ -394,25 +665,27 @@ impl ToTokens for InputModel {
 
 #[cfg_attr(test, derive(Debug))]
 pub(crate) struct OutputModel {
+    kind: OutputKind,
     type_name: Option<TokenStream>,
+    assoc_type: Option<Ident>,
     decl: Option<TokenStream>,
-    #[allow(dead_code)]
-    kind: OutputKind,
+    var_name: Ident,
 }
 
 impl OutputModel {
     fn new(kind: OutputKind, type_prefix: &str) -> Self {
-        let (decl, type_name) = match &kind {
+        let (decl, type_name, assoc_type) = match &kind {
             OutputKind::State { def, .. } => {
                 let state_trait = def.state_trait.as_ref();
                 if state_trait == End::ident() {
                     let end_ident = format_ident!("{}", End::ident());
-                    (None, Some(quote! { ::btrun::model::#end_ident }))
+                    (None, Some(quote! { ::btrun::model::#end_ident }), None)
                 } else {
-                    let type_name = format_ident!("{type_prefix}{}", state_trait);
+                    let assoc_type = format_ident!("{type_prefix}{}", state_trait);
                     (
-                        Some(quote! { type  #type_name: #state_trait; }),
-                        Some(quote! { Self::#type_name }),
+                        Some(quote! { type  #assoc_type: #state_trait; }),
+                        Some(quote! { Self::#assoc_type }),
+                        Some(assoc_type),
                     )
                 }
             }
@@ -433,13 +706,15 @@ impl OutputModel {
                 } else {
                     Some(quote! { #msg_type })
                 };
-                (None, type_name)
+                (None, type_name, None)
             }
         };
         Self {
+            var_name: kind.var_name(),
             type_name,
             decl,
             kind,
+            assoc_type,
         }
     }
 
@@ -450,6 +725,18 @@ impl OutputModel {
     pub(crate) fn decl(&self) -> Option<&TokenStream> {
         self.decl.as_ref()
     }
+
+    pub(crate) fn kind(&self) -> &OutputKind {
+        &self.kind
+    }
+
+    pub(crate) fn assoc_type(&self) -> Option<&Ident> {
+        self.assoc_type.as_ref()
+    }
+
+    pub(crate) fn var_name(&self) -> &Ident {
+        &self.var_name
+    }
 }
 
 #[cfg_attr(test, derive(Debug))]
@@ -466,6 +753,24 @@ pub(crate) enum OutputKind {
     },
 }
 
+impl OutputKind {
+    fn var_name(&self) -> Ident {
+        let ident = match self {
+            Self::State { def, .. } => def.state_trait.as_ref(),
+            Self::Msg { def, .. } => def.msg.msg_type.as_ref(),
+        };
+        format_ident!("{}_out", ident.pascal_to_snake())
+    }
+
+    pub(crate) fn state_trait(&self) -> Option<&Rc<Ident>> {
+        if let Self::State { def, .. } = self {
+            Some(&def.state_trait)
+        } else {
+            None
+        }
+    }
+}
+
 /// A type used to query information about actors, states, and their relationships.
 pub(crate) struct ActorLookup {
     /// A map from an actor name to the set of states which are part of that actor.
@@ -477,7 +782,10 @@ pub(crate) struct ActorLookup {
     parents: HashMap<Rc<Ident>, HashSet<Rc<Ident>>>,
     /// A map from an actor name to the set of actors names which it spawns.
     children: HashMap<Rc<Ident>, HashSet<Rc<Ident>>>,
+    /// A map from the initial state of an actor to the actor.
     actors_by_init_state: HashMap<Rc<Ident>, Rc<Ident>>,
+    /// The set of actors which are service providers in this protocol.
+    service_providers: HashSet<Rc<Ident>>,
 }
 
 impl ActorLookup {
@@ -486,6 +794,7 @@ impl ActorLookup {
         A: IntoIterator<Item = &'a ActorDef>,
         T: IntoIterator<Item = &'a Transition>,
     {
+        // First we gather all the information we can by iterating over the actor definitions.
         let mut actor_states = HashMap::new();
         let mut actors_by_state = HashMap::new();
         let mut parents = HashMap::new();
@@ -508,15 +817,25 @@ impl ActorLookup {
             children.insert(actor_name.clone(), HashSet::new());
         }
 
+        // Then, we gather information by iterating over the transitions.
+        let mut transitions_to = HashMap::new();
+        let mut service_providers = HashSet::new();
         for transition in transitions {
-            let in_state = transition.in_state.state_trait.as_ref();
+            let in_state = &transition.in_state.state_trait;
             let parent = actors_by_state
                 .get(in_state)
                 .ok_or_else(|| syn::Error::new(in_state.span(), error::msgs::UNDECLARED_STATE))?;
-            // The first output state is skipped because the current actor is transitioning to it,
-            // its not creating a new actor.
-            for out_state in transition.out_states.as_ref().iter().skip(1) {
-                let out_state = out_state.state_trait.as_ref();
+            for (index, out_state) in transition.out_states.as_ref().iter().enumerate() {
+                let out_state = &out_state.state_trait;
+                transitions_to
+                    .entry(in_state.clone())
+                    .or_insert_with(HashSet::new)
+                    .insert(out_state.clone());
+                // The first output state is skipped because the current actor is transitioning to
+                // it, its not creating a new actor.
+                if 0 == index {
+                    continue;
+                }
                 let child = actors_by_state.get(out_state).ok_or_else(|| {
                     syn::Error::new(out_state.span(), error::msgs::UNDECLARED_STATE)
                 })?;
@@ -529,6 +848,15 @@ impl ActorLookup {
                     .or_insert_with(HashSet::new)
                     .insert(child.clone());
             }
+            for dest in transition.out_msgs.as_ref().iter() {
+                if let DestinationState::Service(service) = &dest.state {
+                    let dest_state = &service.state_trait;
+                    let actor_name = actors_by_state.get(dest_state).ok_or_else(|| {
+                        syn::Error::new(dest_state.span(), error::msgs::UNDECLARED_STATE)
+                    })?;
+                    service_providers.insert(actor_name.clone());
+                }
+            }
         }
 
         Ok(Self {
@@ -537,6 +865,7 @@ impl ActorLookup {
             parents,
             children,
             actors_by_init_state,
+            service_providers,
         })
     }
 
@@ -552,6 +881,15 @@ impl ActorLookup {
             .unwrap_or_else(|| panic!("actor_states: {}", Self::UNKNOWN_ACTOR_ERR))
     }
 
+    /// Returns the name of the actor containing the given state.
+    ///
+    /// **Panics**, if called with an unknown state name.
+    pub(crate) fn actor_with_state(&self, state_name: &Ident) -> &Rc<Ident> {
+        self.actors_by_state
+            .get(state_name)
+            .unwrap_or_else(|| panic!("can't find state {state_name}"))
+    }
+
     pub(crate) fn parents(&self, actor_name: &Ident) -> &HashSet<Rc<Ident>> {
         self.parents
             .get(actor_name)
@@ -961,7 +1299,7 @@ mod tests {
         let actual = ProtocolModel::new(input).unwrap();
 
         let server = actual.actors.get(&format_ident!("server")).unwrap();
-        assert!(!server.is_client());
+        assert!(!server.kind().is_client());
     }
 
     #[test]
@@ -971,7 +1309,7 @@ mod tests {
         let actual = ProtocolModel::new(input).unwrap();
 
         let client = actual.actors.get(&format_ident!("client")).unwrap();
-        assert!(client.is_client());
+        assert!(client.kind().is_client());
     }
 
     #[test]
@@ -1023,6 +1361,6 @@ mod tests {
         let actual = ProtocolModel::new(input).unwrap();
 
         let worker = actual.actors.get(&format_ident!("worker")).unwrap();
-        assert!(!worker.is_client());
+        assert!(!worker.kind().is_client());
     }
 }

+ 3 - 2
crates/btproto/src/parsing.rs

@@ -220,8 +220,8 @@ pub(crate) struct ActorDef {
     pub(crate) states: IdentArray,
 }
 
-#[cfg(test)]
 impl ActorDef {
+    #[cfg(test)]
     pub(crate) fn new<T, I>(actor: &str, state_names: T) -> Self
     where
         T: IntoIterator<IntoIter = I>,
@@ -334,7 +334,8 @@ impl Transition {
         self.in_msg.as_ref().map(|(_, msg)| msg)
     }
 
-    pub(crate) fn is_client(&self) -> bool {
+    /// Returns true if and only if this [Transition] is not receiving a message.
+    pub(crate) fn not_receiving(&self) -> bool {
         self.in_msg.is_none()
     }
 }

+ 53 - 22
crates/btproto/src/validation.rs

@@ -130,7 +130,7 @@ impl ProtocolModel {
                     };
                     let msg_info = self.msg_lookup().lookup(&dest.msg);
                     outgoing.insert(MsgEndpoint::new(dest_state, msg_info));
-                    if actor.is_client() {
+                    if actor.kind().is_client() {
                         if let Some(reply) = msg_info.reply() {
                             incoming.insert(MsgEndpoint::new(&transition.in_state, reply));
                         }
@@ -244,7 +244,7 @@ impl ProtocolModel {
     /// be observed, because the methods which sent the original messages will return their replies.
     fn no_unobservable_states(&self) -> MaybeErr {
         self.actors_iter()
-            .filter(|actor| actor.is_client())
+            .filter(|actor| actor.kind().is_client())
             .flat_map(|actor| actor.states().values())
             .filter(|state| {
                 state.methods().values().all(|method| {
@@ -441,16 +441,36 @@ mod tests {
     fn receivers_and_senders_matched_unmatched_sender_err() {
         let input = ProtocolModel::new(Protocol::new(
             NameDef::new("Unbalanced"),
-            [ActorDef::new("actor", ["Init"])],
-            [Transition::new(
-                State::new("Init", []),
-                None,
-                [State::new("Init", [])],
-                [Dest::new(
-                    DestinationState::Service(State::new("Init", [])),
-                    Message::new("Msg", false, []),
-                )],
-            )],
+            [
+                ActorDef::new("client", ["Init"]),
+                ActorDef::new("server", ["Server"]),
+            ],
+            [
+                Transition::new(
+                    State::new("Init", []),
+                    None,
+                    [State::new("Init", [])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Server", [])),
+                        Message::new("UnexpectedMsg", false, []),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Init", []),
+                    None,
+                    [State::new("Init", [])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Server", [])),
+                        Message::new("ExpectedMsg", false, []),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Server", []),
+                    Some(Message::new("ExpectedMsg", false, [])),
+                    [State::new("End", [])],
+                    [],
+                ),
+            ],
         ))
         .unwrap();
 
@@ -564,16 +584,27 @@ mod tests {
     fn no_undeliverable_msgs_service_ok() {
         let input = ProtocolModel::new(Protocol::new(
             NameDef::new("Undeliverable"),
-            [ActorDef::new("actor", ["Client", "Server"])],
-            [Transition::new(
-                State::new("Client", []),
-                None,
-                [State::new("Client", [])],
-                [Dest::new(
-                    DestinationState::Service(State::new("Server", [])),
-                    Message::new("Msg", false, []),
-                )],
-            )],
+            [
+                ActorDef::new("client", ["Client"]),
+                ActorDef::new("server", ["Server"]),
+            ],
+            [
+                Transition::new(
+                    State::new("Client", []),
+                    None,
+                    [State::new("Client", [])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Server", [])),
+                        Message::new("Msg", false, []),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Server", []),
+                    Some(Message::new("Msg", false, [])),
+                    [State::new("End", [])],
+                    [],
+                ),
+            ],
         ))
         .unwrap();
 

+ 1 - 2
crates/btrun/src/kernel.rs

@@ -121,8 +121,7 @@ mod tests {
 
     use crate::{
         tests::{EchoMsg, ASYNC_RT, RUNTIME},
-        ActorError, ActorErrorPayload, ActorName, ActorResult, Envelope, Mailbox, Named,
-        TransKind,
+        ActorError, ActorErrorPayload, ActorName, ActorResult, Envelope, Mailbox, Named, TransKind,
     };
 
     #[derive(Debug)]

+ 43 - 4
crates/btrun/src/lib.rs

@@ -22,6 +22,7 @@ use tokio::{
 };
 
 pub use bttp::Receiver;
+pub use log;
 mod kernel;
 use kernel::{kernel, FaultResult};
 pub mod model;
@@ -187,7 +188,7 @@ impl Runtime {
         if to.path().as_ref() == self.path.as_ref() {
             let guard = self.handles.read().await;
             if let Some(handle) = guard.get(&to.actor_id()) {
-                handle.call_through(msg).await
+                handle.call_through(from, msg).await
             } else {
                 Err(bterr!("invalid actor name"))
             }
@@ -211,13 +212,14 @@ impl Runtime {
     pub async fn call_service<T: 'static + MsgEnum>(
         &'static self,
         to: ServiceAddr,
+        from: ActorName,
         msg: T,
     ) -> Result<T::Reply> {
         if to.path().as_ref() == self.path.as_ref() {
             let actor_id = self.service_provider(&to).await?;
             let handles = self.handles.read().await;
             if let Some(handle) = handles.get(&actor_id) {
-                handle.call_through(msg).await
+                handle.call_through(from, msg).await
             } else {
                 panic!(
                     "Service record '{}' had a non-existent actor with ID '{}'.",
@@ -408,8 +410,8 @@ impl Runtime {
     }
 
     /// Returns the name of the actor in this runtime with the given actor ID.
-    pub fn actor_name(&self, act_id: ActorId) -> ActorName {
-        ActorName::new(self.path.clone(), act_id)
+    pub fn actor_name(&self, actor_id: ActorId) -> ActorName {
+        ActorName::new(self.path.clone(), actor_id)
     }
 }
 
@@ -443,6 +445,43 @@ impl ServiceRecord {
     }
 }
 
+#[derive(Serialize, Deserialize)]
+pub struct ExitNow;
+
+impl Named for ExitNow {
+    fn name(&self) -> Arc<String> {
+        static NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("ExitNow".into()));
+        NAME.clone()
+    }
+}
+
+impl CallMsg for ExitNow {
+    type Reply = NoReply;
+}
+
+impl SendMsg for ExitNow {}
+
+/// An actor that does nothing, and simply exits when it receives a message `Send` message in its
+/// mailbox.
+pub async fn do_nothing_actor(
+    mut mailbox: Mailbox<ExitNow>,
+    actor_id: ActorId,
+    _runtime: &'static Runtime,
+) -> ActorResult {
+    while let Some(envelope) = mailbox.recv().await {
+        match envelope {
+            Envelope::Send { .. } => break,
+            Envelope::Call { .. } => continue,
+            Envelope::Control(msg) => {
+                if let ControlMsg::OwnerExited(_) = msg {
+                    break;
+                }
+            }
+        }
+    }
+    Ok(actor_id)
+}
+
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum RuntimeError {
     BadActorName(ActorName),

+ 11 - 4
crates/btrun/src/model.rs

@@ -473,6 +473,7 @@ impl Named for ControlMsg {
 pub enum Envelope<T: MsgEnum> {
     Call {
         msg: T,
+        from: ActorName,
         reply: Option<oneshot::Sender<T::Reply>>,
     },
     Send {
@@ -490,9 +491,10 @@ impl<T: MsgEnum> Envelope<T> {
 
     /// Creates a new envelope containing the given message which expects exactly one reply.
     /// The receiver end of the reply channel is also returned.
-    pub(super) fn new_call(msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
+    pub(super) fn new_call(from: ActorName, msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
         let (tx, rx) = oneshot::channel::<T::Reply>();
         let envelope = Self::Call {
+            from,
             reply: Some(tx),
             msg,
         };
@@ -604,6 +606,7 @@ impl ActorHandle {
         let buffer = Arc::new(Mutex::new(Vec::<u8>::new()));
         let closure = move |envelope: WireEnvelope| {
             let (wire_msg, replier) = envelope.into_parts();
+            let from = wire_msg.from;
             let result = from_slice(wire_msg.payload);
             let buffer = buffer.clone();
             let sender = sender.clone();
@@ -611,7 +614,7 @@ impl ActorHandle {
             let fut: FutureResult = Box::pin(async move {
                 let msg = result?;
                 if let Some(mut replier) = replier {
-                    let (envelope, rx) = Envelope::new_call(msg);
+                    let (envelope, rx) = Envelope::new_call(from, msg);
                     sender.send(envelope).await.map_err(|_| {
                         bterr!("failed to deliver message. Recipient may have halted.")
                     })?;
@@ -678,9 +681,13 @@ impl ActorHandle {
     }
 
     /// Sends a call message to this actor and awaits the reply.
-    pub async fn call_through<T: 'static + MsgEnum>(&self, msg: T) -> Result<T::Reply> {
+    pub async fn call_through<T: 'static + MsgEnum>(
+        &self,
+        from: ActorName,
+        msg: T,
+    ) -> Result<T::Reply> {
         let sender = self.sender()?;
-        let (envelope, rx) = Envelope::new_call(msg);
+        let (envelope, rx) = Envelope::new_call(from, msg);
         sender
             .send(envelope)
             .await

+ 57 - 40
crates/btrun/tests/runtime_tests.rs

@@ -60,6 +60,7 @@ mod ping_pong {
 
     struct ClientHandle<T: Client> {
         state: Option<PingClientState<T>>,
+        client_name: ActorName,
         runtime: &'static Runtime,
     }
 
@@ -75,7 +76,11 @@ mod ping_pong {
                         let new_state = PingClientState::End(new_state);
                         let result = self
                             .runtime
-                            .call_service(service, PingProtocolMsgs::Ping(msg))
+                            .call_service(
+                                service,
+                                self.client_name.clone(),
+                                PingProtocolMsgs::Ping(msg),
+                            )
                             .await;
                         (new_state, result)
                     }
@@ -102,12 +107,17 @@ mod ping_pong {
         }
     }
 
-    async fn spawn_client<T: Client>(init: T, runtime: &'static Runtime) -> ClientHandle<T> {
+    async fn spawn_client_manual<T: Client>(init: T, runtime: &'static Runtime) -> ClientHandle<T> {
         let state = Some(PingClientState::Client(init));
-        ClientHandle { state, runtime }
+        let client_name = runtime.spawn(None, do_nothing_actor).await.unwrap();
+        ClientHandle {
+            state,
+            client_name,
+            runtime,
+        }
     }
 
-    async fn register_server<Init, F>(
+    async fn register_server_manual<Init, F>(
         make_init: F,
         rt: &'static Runtime,
         id: ServiceId,
@@ -153,8 +163,11 @@ mod ping_pong {
                         (ServerState::Server(listening_state), PingProtocolMsgs::Ping(msg)) => {
                             match listening_state.handle_ping(msg).await {
                                 TransResult::Ok((new_state, reply)) => {
-                                    let replier =
-                                        replier.expect("The reply has already been sent.");
+                                    let replier = replier
+                                        .ok_or_else(|| bterr!("Reply has already been sent."))
+                                        .unwrap();
+                                    //let replier =
+                                    //    replier.expect("The reply has already been sent.");
                                     if let Err(_) = replier.send(PingProtocolMsgs::PingReply(reply))
                                     {
                                         return Err(ActorError::new(
@@ -290,11 +303,12 @@ mod ping_pong {
                     let server_counter = service_counter.clone();
                     ServerState::new(server_counter)
                 };
-                register_server(make_init, &RUNTIME, service_id.clone())
+                register_server_manual(make_init, &RUNTIME, service_id.clone())
                     .await
                     .unwrap()
             };
-            let mut client_handle = spawn_client(ClientState::new(counter.clone()), &RUNTIME).await;
+            let mut client_handle =
+                spawn_client_manual(ClientState::new(counter.clone()), &RUNTIME).await;
             let service_addr = ServiceAddr::new(service_name, true);
             client_handle.send_ping(Ping, service_addr).await.unwrap();
 
@@ -508,7 +522,7 @@ mod client_callback {
         }
     }
 
-    async fn spawn_client<Init>(init: Init, runtime: &'static Runtime) -> ClientHandle<Init>
+    async fn spawn_client_manual<Init>(init: Init, runtime: &'static Runtime) -> ClientHandle<Init>
     where
         Init: 'static + Unregistered,
     {
@@ -571,7 +585,7 @@ mod client_callback {
         }
     }
 
-    async fn register_server<Init, F>(
+    async fn register_server_manual<Init, F>(
         make_init: F,
         runtime: &'static Runtime,
         service_id: ServiceId,
@@ -580,8 +594,8 @@ mod client_callback {
         Init: 'static + Listening<HandleRegisterListening = Init>,
         F: 'static + Send + Sync + Clone + Fn() -> Init,
     {
-        enum ServerState<S: Listening> {
-            Listening(S),
+        enum ServerState<Init: Listening> {
+            Listening(Init),
         }
 
         impl<S: Listening> Named for ServerState<S> {
@@ -611,7 +625,7 @@ mod client_callback {
                         (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
                             match curr_state.handle_register(msg).await {
                                 TransResult::Ok((new_state, working_state)) => {
-                                    start_worker(working_state, from, runtime).await;
+                                    start_worker_manual(working_state, from, runtime).await;
                                     ServerState::Listening(new_state)
                                 }
                                 TransResult::Abort { from, err } => {
@@ -678,7 +692,7 @@ mod client_callback {
             .await
     }
 
-    async fn start_worker<Init>(
+    async fn start_worker_manual<Init>(
         init: Init,
         owned: ActorName,
         runtime: &'static Runtime,
@@ -691,10 +705,28 @@ mod client_callback {
         }
 
         runtime
-            .spawn::<ClientCallbackMsgs, _, _>(None, move |_, actor_id, _| async move {
-                let msg = match init.on_send_completed().await {
-                    TransResult::Ok((End, msg)) => msg,
-                    TransResult::Abort { err, .. } | TransResult::Fatal { err } => {
+            .spawn::<ClientCallbackMsgs, _, _>(
+                Some(owned.clone()),
+                move |_, actor_id, _| async move {
+                    let msg = match init.on_send_completed().await {
+                        TransResult::Ok((End, msg)) => msg,
+                        TransResult::Abort { err, .. } | TransResult::Fatal { err } => {
+                            let err = ActorError::new(
+                                err,
+                                ActorErrorPayload {
+                                    actor_id,
+                                    actor_impl: Init::actor_impl(),
+                                    state: Init::state_name(),
+                                    message: ClientCallbackMsgKinds::Completed.name(),
+                                    kind: TransKind::Send,
+                                },
+                            );
+                            panic_any(format!("{err}"))
+                        }
+                    };
+                    let from = runtime.actor_name(actor_id);
+                    let msg = ClientCallbackMsgs::Completed(msg);
+                    runtime.send(owned, from, msg).await.unwrap_or_else(|err| {
                         let err = ActorError::new(
                             err,
                             ActorErrorPayload {
@@ -705,26 +737,11 @@ mod client_callback {
                                 kind: TransKind::Send,
                             },
                         );
-                        panic_any(format!("{err}"))
-                    }
-                };
-                let from = runtime.actor_name(actor_id);
-                let msg = ClientCallbackMsgs::Completed(msg);
-                runtime.send(owned, from, msg).await.unwrap_or_else(|err| {
-                    let err = ActorError::new(
-                        err,
-                        ActorErrorPayload {
-                            actor_id,
-                            actor_impl: Init::actor_impl(),
-                            state: Init::state_name(),
-                            message: ClientCallbackMsgKinds::Completed.name(),
-                            kind: TransKind::Send,
-                        },
-                    );
-                    panic_any(format!("{err}"));
-                });
-                Ok(actor_id)
-            })
+                        panic_any(format!("{err}"));
+                    });
+                    Ok(actor_id)
+                },
+            )
             .await
             .unwrap()
     }
@@ -736,12 +753,12 @@ mod client_callback {
             let service_id = ServiceId::from(SERVICE_ID);
             let service_name = {
                 let make_init = move || ListeningState { multiple: 2 };
-                register_server(make_init, &RUNTIME, service_id.clone())
+                register_server_manual(make_init, &RUNTIME, service_id.clone())
                     .await
                     .unwrap()
             };
             let (sender, receiver) = oneshot::channel();
-            let client_handle = spawn_client(UnregisteredState { sender }, &RUNTIME).await;
+            let client_handle = spawn_client_manual(UnregisteredState { sender }, &RUNTIME).await;
             let service_addr = ServiceAddr::new(service_name, false);
             client_handle
                 .send_register(service_addr, Register { factor: 21 })