瀏覽代碼

Got both of the runtime tests to pass with the generated code.

Matthew Carr 1 年之前
父節點
當前提交
5f46436994

+ 0 - 3
crates/btfs/src/lib.rs

@@ -36,9 +36,6 @@ protocol! {
 
     FileHandle[Opened] -> FileHandle[Opened], >Opened!FileOp;
     Opened?FileOp -> Opened, >FileHandle!FileOp::Reply;
-
-    FileHandle[Opened] -> End, >Opened!Close;
-    Opened?Close -> End;
 }
 
 #[derive(Serialize, Deserialize)]

+ 10 - 2
crates/btproto/src/error.rs

@@ -64,7 +64,7 @@ pub(crate) fn assert_ok<T, E: Into<syn::Result<T>>>(maybe_err: E) {
 pub(crate) fn assert_err<T, E: Into<syn::Result<T>>>(maybe_err: E, expected_msg: &str) {
     let result: syn::Result<T> = maybe_err.into();
     assert!(result.is_err());
-    assert_eq!(expected_msg, result.err().unwrap().to_string());
+    assert_eq!(result.err().unwrap().to_string(), expected_msg);
 }
 
 /// User-visible compile time error messages.
@@ -80,7 +80,7 @@ pub(crate) mod msgs {
     pub(crate) const UNMATCHED_OUTGOING: &str = "No receiver found for message type.";
     pub(crate) const UNMATCHED_INCOMING: &str = "No sender found for message type.";
     pub(crate) const UNDELIVERABLE: &str =
-        "Receiver must either be a service, an owned state, or an out state, or the message must be a reply.";
+        "Receiver must either be a service, an owned state, or an out state. Or, the message must be a reply.";
     pub(crate) const INVALID_REPLY: &str =
         "Replies can only be used in transitions which handle messages.";
     pub(crate) const MULTIPLE_REPLIES: &str =
@@ -89,4 +89,12 @@ pub(crate) mod msgs {
         "This client state is not allowed because it only receives replies. Such a state cannot be observed.";
     pub(crate) const CLIENT_USED_IN_SERVICE: &str =
         "A client state cannot be used inside `service()`.";
+    pub(crate) const AMBIGUOUS_TRANS: &str =
+        "Ambiguous transition definitions. A non-client state can only have a single non-receiving transition.";
+    pub(crate) const EXIT_RACE: &str =
+        "There is a race condition between an exit message and this application message. Avoid sending messages to owned states when exiting.";
+    pub(crate) const INVALID_OUT_STATE_PROVENANCE: &str =
+        "Ownership of this state cannot be given because it is not owned by this transition's input state or input message.";
+    pub(crate) const INVALID_OUT_MSG_PROVENANCE: &str =
+        "Ownership of this state cannot be given because it is not owned by this transition's input state or its input message, and it is not an output state.";
 }

+ 382 - 184
crates/btproto/src/generation.rs

@@ -1,15 +1,16 @@
 use std::collections::{HashMap, HashSet};
 
-use btrun::model::End;
+use btrun::model::{End, TransKind};
 use proc_macro2::{Ident, TokenStream};
 use quote::{format_ident, quote, ToTokens};
 
 use crate::{
     case_convert::CaseConvert,
     model::{
-        ActorKind, ActorModel, MethodModel, MsgInfo, ProtocolModel, StateModel, TypeParamInfo,
-        ValueKind, ValueModel,
+        ActorKind, ActorModel, DestKind, MethodModel, MsgInfo, ProtocolModel, StateModel,
+        TypeParamInfo, ValueKind, ValueModel,
     },
+    parsing::DestinationState,
 };
 
 impl ToTokens for ProtocolModel {
@@ -159,7 +160,7 @@ impl ProtocolModel {
                 #state_enum_decl
 
                 let actor = #server_loop ;
-                runtime.spawn(None, actor).await
+                runtime.spawn(Some(owner_name), actor).await
             }
         }
     }
@@ -285,7 +286,9 @@ impl ProtocolModel {
     }
 
     fn generate_actor_loop(&self, actor: &ActorModel) -> TokenStream {
-        let init_state = actor.init_state().name();
+        let init_state = actor.init_state();
+        let startup = self.generate_startup_method_calls(actor);
+        let init_type_param = init_state.type_param();
         let state_enum_ident = actor.state_enum_ident();
         let msg_enum = self.msg_enum_ident();
         let init_state_var = self.init_state_var();
@@ -300,15 +303,13 @@ impl ProtocolModel {
         let send_transitions = self.generate_send_transitions(actor);
         let control_transitions = self.generate_control_transitions(actor);
         let use_statements = self.use_statements();
-        quote! {
-            move |
-                mut mailbox: ::btrun::Mailbox<#msg_enum>,
-                #actor_id: ::btrun::model::ActorId,
-                #runtime: &'static ::btrun::Runtime
-            | async move {
-                #use_statements
-                let #actor_name = #runtime . actor_name(#actor_id);
-                let mut state = #state_enum_ident :: #init_state (#init_state_var);
+        let state_var = self.state_var();
+        let state_after_startup_name = actor.state_after_startup();
+        let actor_loop = if state_after_startup_name == End::ident() {
+            quote! {}
+        } else {
+            quote! {
+                let mut #state_var = #state_enum_ident::<#init_type_param>::#state_after_startup_name(#state_var);
                 while let Some(envelope) = mailbox.recv().await {
                     state = match envelope {
                         Envelope::Call { #msg, #from, mut #reply, .. } => #call_transitions
@@ -319,11 +320,67 @@ impl ProtocolModel {
                         break;
                     }
                 }
+            }
+        };
+        quote! {
+            move |
+                mut mailbox: ::btrun::Mailbox<#msg_enum>,
+                #actor_id: ::btrun::model::ActorId,
+                #runtime: &'static ::btrun::Runtime
+            | async move {
+                #use_statements
+                let #actor_name = #runtime.actor_name(#actor_id);
+                let #state_var = #init_state_var;
+                #( #startup )*
+                #actor_loop
                 Ok(#actor_id)
             }
         }
     }
 
+    fn generate_startup_method_calls<'a>(
+        &'a self,
+        actor: &'a ActorModel,
+    ) -> impl 'a + Iterator<Item = TokenStream> {
+        let state_var = self.state_var();
+        actor.startup_methods().map(move |method| {
+            let output_vars = method.output_vars().skip(1);
+            let input_vars = method.inputs().iter().map(|input| input.var_name());
+            let state_name = method.def().in_state.state_trait.as_ref();
+            let state = actor.states().get(state_name).unwrap();
+            let method_name = method.name();
+            let dispatch_msgs = method
+                .outputs()
+                .iter()
+                .flat_map(|output| self.generate_client_handle_msg_dispatch(actor, state, method, output));
+            let spawn_actors = method.outputs().iter().skip(1).flat_map(|output| {
+                if let ValueKind::State { def: _, .. } = output.kind() {
+                    Some(quote! {
+                        todo!("Spawn actor returned by startup method.");
+                    })
+                } else {
+                    None
+                }
+            });
+            let error_ident = format_ident!("err");
+            let error = self.generate_actor_error(actor, state, method, quote! { #error_ident });
+            quote! {
+                let (#state_var, #( #output_vars ),*) = {
+                    match #state_var.#method_name(#( #input_vars ),*).await {
+                        TransResult::Ok(tuple) => {
+                            tuple
+                        }
+                        TransResult::Abort { #error_ident, .. } | TransResult::Fatal { err, .. } => {
+                            return Err(#error)
+                        }
+                    }
+                };
+                #( #dispatch_msgs )*
+                #( #spawn_actors )*
+            }
+        })
+    }
+
     fn generate_call_transitions(&self, actor: &ActorModel) -> TokenStream {
         self.generate_transitions(actor, |msg_info| msg_info.is_call())
     }
@@ -385,13 +442,10 @@ impl ProtocolModel {
     ) -> TokenStream {
         let msg_enum_ident = self.msg_enum_ident();
         let msg_enum_kinds = self.msg_enum_kinds_ident();
-        let state_type_param = state.type_param();
-        let actor_id = self.actor_id_param();
         let init_state_type_param = actor.init_state().type_param();
         let state_enum_ident = actor.state_enum_ident();
 
-        let mut output_iter = method.output_values().iter();
-        let next_state = output_iter.next().unwrap_or_else(|| {
+        let next_state = method.outputs().first().unwrap_or_else(|| {
             panic!(
                 "There are no outputs for method {} in state {}.",
                 method.name(),
@@ -404,117 +458,65 @@ impl ProtocolModel {
             panic!("First output of {} method was not a state.", method.name());
         };
         let next_state_var = next_state.var_name();
-        let out_states = output_iter
+        let out_states = self.generate_out_state_spawns(actor, state, method);
+        let out_msgs = method.outputs().iter()
             .flat_map(|output| {
-                if let ValueKind::State { def, .. } = output.kind() {
-                    Some((output.var_name(), def))
+                if let ValueKind::Dest { def, .. } = output.kind() {
+                    Some((output, def))
                 } else {
                     None
                 }
             })
-            .map(|(var_name, def)| {
-                let spawning_actor = self.actor_lookup().actor_with_state(&def.state_trait);
-                let spawning_model = self
-                    .actors()
-                    .get(spawning_actor)
-                    .unwrap_or_else(|| panic!("There was no actor named {spawning_actor}."));
-                let spawn_function = spawning_model.spawn_function_ident().unwrap_or_else(|| {
-                    panic!(
-                        "Actor {spawning_actor} of kind {:?} has no spawn function.",
-                        spawning_model.kind()
-                    )
-                });
-                let from = self.from_ident();
-                let runtime = self.runtime_param();
-                let method_name = method.name();
-                quote! {
-                    if let Err(err) = #spawn_function(#runtime, #from, #var_name).await {
-                        log::error!(
-                            "Failed to spawn {} actor after the {} method: {err}",
-                            stringify!(#spawning_actor),
-                            stringify!(#method_name)
-                        )
-                    }
-                }
-            });
-        let out_msgs = method.output_values().iter()
-            .flat_map(|output| {
-                if let ValueKind::Msg { def, .. } = output.kind() {
-                    Some((output.var_name(), def))
-                } else {
-                    None
-                }
-            })
-            .map(|(var_name, msg)| {
-                if msg.is_reply() {
-                    let reply = self.reply_ident();
-                    let msg_type = &msg.msg_type;
-                    let reply_variant = self.msg_lookup().lookup(msg).msg_name();
-                    let error_msg = format!("Failed to send '{}'.", msg_type);
+            .map(|(output, def)| {
+                let msg_info = self.msg_lookup().lookup(&def.msg);
+                let msg_name = msg_info.msg_name();
+                let var_name = output.var_name();
+                if msg_info.is_reply() {
+                    let reply_ident = self.reply_ident();
+                    let reply_variant = self.msg_lookup().lookup(&def.msg).msg_name();
+                    let error_msg = format!("Failed to send '{}'.", msg_name);
+                    let inner_error = quote! { bterr!(#error_msg) };
+                    let error = self.generate_actor_error(actor, state, method, inner_error);
                     quote! {
-                        if let Some(mut reply) = #reply.take() {
+                        if let Some(mut reply) = #reply_ident.take() {
                             if let Err(_) = reply.send(#msg_enum_ident :: #reply_variant (#var_name)) {
-                                return Err(ActorError::new(
-                                    bterr!(#error_msg),
-                                    ActorErrorPayload {
-                                        actor_id: #actor_id,
-                                        actor_impl: #init_state_type_param :: actor_impl(),
-                                        state: #state_type_param :: state_name(),
-                                        message: #msg_enum_kinds :: #msg_type .name(),
-                                        kind: TransKind::Receive,
-                                    }
-                                ));
+                                return Err(#error);
                             }
                         } else {
                             log::error!(
                                 "Reply to '{}' has already been sent.",
-                                #msg_enum_kinds :: #msg_type .name()
+                                #msg_enum_kinds :: #msg_name .name()
                             );
                         }
                     }
                 } else {
-                    quote! {}
-                    //match &dest.state {
-                    //    DestinationState::Service(state) => {
-                    //        let msg = &dest.msg;
-                    //        let msg_info = self.msg_lookup().lookup(msg);
-                    //        let runtime = self.runtime_param();
-                    //        if msg_info.is_call() {
-                    //            quote! { todo!("Call a service.") }
-                    //        } else {
-                    //            quote! { todo!("Send to a service.") }
-                    //        }
-                    //    }
-                    //    DestinationState::Individual(state)
-                    //        => quote! { todo!("Send a message to an owned or owner state.") },
-                    //}
+                    match &def.state {
+                        DestinationState::Service(_state) => {
+                            quote! {
+                                todo!("dispatch message to service");
+                            }
+                        }
+                        DestinationState::Individual(state) => {
+                            match method.dest_kind(state) {
+                                DestKind::Owner => {
+                                    quote! { todo!("dispatch message to current owner"); }
+                                }
+                                DestKind::Owned(_name_var) => {
+                                    quote! { todo!("dispatch message to owned state"); }
+                                }
+                                DestKind::Sender => {
+                                    quote! { todo!("dispatch message to sender"); }
+                                }
+                            }
+                        }
+                    }
                 }
             });
         let method_name = method.name();
         let state_name = state.name();
         let out_vars = method.output_vars();
-        let (trans_kind, msg_name) = if let Some(input) = method.msg_received_input() {
-            let trans_kind = quote! { TransKind::Receive };
-            let msg_name = input.msg_type.as_ref();
-            (trans_kind, msg_name)
-        } else {
-            let trans_kind = quote! { TransKind::Send };
-            let msg_name = method.output_values().iter().flat_map(|output| {
-                if let ValueKind::Msg { def, .. } = output.kind() {
-                    Some(def.msg_type.as_ref())
-                } else {
-                    None
-                }
-            })
-            .next()
-            .unwrap_or_else(|| {
-                panic!(
-                    "Method '{}' does not receive or send any messages. It should not have passed validation.",
-                    method_name
-                )
-            });
-            (trans_kind, msg_name)
-        };
+        let error_ident = format_ident!("err");
+        let error = self.generate_actor_error(actor, state, method, quote! { #error_ident });
         quote! {
             match state.#method_name(#( #args ),*).await {
                 TransResult::Ok(( #( #out_vars ),* )) => {
@@ -531,22 +533,44 @@ impl ProtocolModel {
                     );
                     #state_enum_ident :: #state_name(from)
                 }
-                TransResult::Fatal { err, .. } => {
-                    return Err(ActorError::new(
-                        err,
-                        ActorErrorPayload {
-                            actor_id: #actor_id,
-                            actor_impl: #init_state_type_param :: actor_impl(),
-                            state: #state_type_param :: state_name(),
-                            message: #msg_enum_kinds :: #msg_name . name(),
-                            kind: #trans_kind,
-                        }
-                    ));
+                TransResult::Fatal { #error_ident, .. } => {
+                    return Err(#error);
                 }
             }
         }
     }
 
+    fn generate_actor_error(
+        &self,
+        actor: &ActorModel,
+        state: &StateModel,
+        method: &MethodModel,
+        inner_error: TokenStream,
+    ) -> TokenStream {
+        let actor_id = self.actor_id_param();
+        let init_state_type_param = actor.init_state().type_param();
+        let state_type_param = state.type_param();
+        let msg_enum_kinds = self.msg_enum_kinds_ident();
+        let attribution = method.attribution();
+        let msg_name = attribution.message().msg_type.as_ref();
+        let trans_kind = match attribution.kind() {
+            TransKind::Receive => quote! { TransKind::Receive },
+            TransKind::Send => quote! { TransKind::Send },
+        };
+        quote! {
+            ActorError::new(
+                #inner_error,
+                ActorErrorPayload {
+                    actor_id: #actor_id,
+                    actor_impl: #init_state_type_param :: actor_impl(),
+                    state: #state_type_param :: state_name(),
+                    message: #msg_enum_kinds :: #msg_name . name(),
+                    kind: #trans_kind,
+                }
+            )
+        }
+    }
+
     fn generate_control_transitions(&self, _actor: &ActorModel) -> TokenStream {
         quote! {
             todo!()
@@ -747,13 +771,6 @@ impl ProtocolModel {
         let init_type_param = init_state.type_param();
         let new_type_param = self.new_state_type_param();
 
-        let handle_method_name = method.handle_name().unwrap_or_else(|| {
-            panic!(
-                "Method '{}' in client '{}' had no handle method name.",
-                method.name(),
-                actor.name()
-            )
-        });
         let current_name = current_state.name();
         let current_type_param = current_state.type_param();
         let new_state_out = method.next_state();
@@ -865,21 +882,6 @@ impl ProtocolModel {
         };
 
         let params = method.inputs().iter().map(|input| input.as_handle_param());
-        let first_msg_type = {
-            let first_input = method
-                .inputs()
-                .get(0)
-                .unwrap_or_else(|| panic!("Method '{}' had no inputs.", method.name()));
-            if let ValueKind::Dest { msg_type, .. } = first_input.kind() {
-                msg_type
-            } else {
-                panic!(
-                    "First input to method '{}' was not a destination.",
-                    method.name()
-                )
-            }
-        };
-        let msg_enum_kinds_ident = self.msg_enum_kinds_ident();
         let actor_name_ident = self.actor_name_ident();
         let actor_id_param = self.actor_id_param();
         let struct_ident = actor.handle_struct_ident().unwrap_or_else(|| {
@@ -895,59 +897,258 @@ impl ProtocolModel {
         let runtime_param = self.runtime_param();
         let from_ident = self.from_ident();
         let actor_name = self.actor_name_ident();
+        let return_assoc_type = method
+            .outputs()
+            .iter()
+            .find(|output| matches!(output.kind(), ValueKind::Return { .. }))
+            .map(|output| {
+                output
+                    .assoc_type()
+                    .unwrap_or_else(|| panic!("Return value has no associated type."))
+            })
+            .unwrap_or_else(|| panic!("Client method has no return output."));
+        let state_var = self.state_var();
+        let guard_var = self.guard_var();
+        let give_back_current = self.give_back(actor, current_name);
+        let give_back_new = self.give_back(actor, new_name);
+        let to_var = self.to_var();
+        let dispatch_msgs =
+            self.generate_client_handle_msg_dispatches(actor, current_state, method);
+        let handle_method_name = method.handle_name().unwrap_or_else(|| {
+            panic!(
+                "Method '{}' in client '{}' had no handle method name.",
+                method.name(),
+                actor.name()
+            )
+        });
+        let method_to_call = method.name();
+        let call_args = method
+            .inputs()
+            .iter()
+            .map(|input| input.as_client_method_call(self));
+        let out_state_vars = method
+            .outputs()
+            .iter()
+            .flat_map(|output| {
+                if let ValueKind::State { .. } = output.kind() {
+                    let var_name = output.var_name();
+                    Some(quote! { #var_name, })
+                } else {
+                    None
+                }
+            })
+            .skip(1);
         quote! {
-            #[allow(unreachable_code)]
             impl<#type_constraints> #struct_ident<#init_type_param, #current_type_param> {
                 async fn #handle_method_name(
                     self,
-                    to: ::btrun::model::ServiceAddr,
+                    #to_var: ::btrun::model::ServiceAddr,
                     #( #params ),*
-                ) -> ::std::result::Result<#struct_ident<#init_type_param, #new_type_param>, ::btrun::model::ActorError> {
+                ) -> ::btrun::model::TransResult<
+                    Self,
+                    (
+                        #struct_ident<#init_type_param, #new_type_param>,
+                        #init_type_param::#return_assoc_type
+                    )
+                > {
                     #use_statements
                     let #actor_id_param = self.#actor_name_ident.actor_id();
                     let #runtime_param = self.#runtime_param;
                     let #from_ident = &self.#actor_name;
-                    {
-                        let mut guard = self.#state_field.lock().await;
-                        let state = guard
-                            .take()
-                            .ok_or_else(|| {
-                                ActorError::new(
-                                    bterr!("Client shared state was not returned."),
-                                    ActorErrorPayload {
-                                        actor_id: #actor_id_param,
-                                        actor_impl: #init_type_param :: actor_impl(),
-                                        state: #current_type_param :: state_name(),
-                                        message: #msg_enum_kinds_ident::#first_msg_type.name(),
-                                        kind: TransKind::Send,
-                                    }
-                                )
-                            })?;
-                        let new_state = match state {
-                            #state_enum_ident::#current_name(state) => {
-                                todo!()
-                            },
-                            state => {
-                                *guard = Some(state);
-                                return Err(ActorError::new(
-                                    bterr!("Client is in an unexpected state."),
-                                    ActorErrorPayload {
-                                        actor_id: #actor_id_param,
-                                        actor_impl: #init_type_param :: actor_impl(),
-                                        state: #current_type_param :: state_name(),
-                                        message: #msg_enum_kinds_ident::#first_msg_type.name(),
-                                        kind: TransKind::Send,
+                    let mut #guard_var = self.#state_field.lock().await;
+                    let state = if let Some(state) = #guard_var.take() {
+                        state
+                    } else {
+                        let err = bterr!(
+                            "Handle is no longer usable. Client shared state was not returned."
+                        );
+                        return TransResult::Fatal { err };
+                    };
+                    match state {
+                        #state_enum_ident::#current_name(#state_var) => {
+                            #( #dispatch_msgs )*
+                            let result = #state_var.#method_to_call(#( #call_args ),*).await;
+                            match result {
+                                TransResult::Ok((#state_var, #( #out_state_vars )* return_val)) => {
+                                    #give_back_new
+                                    TransResult::Ok((self.#new_state_method(), return_val))
+                                }
+                                TransResult::Abort { from: #state_var, err, .. } => {
+                                    #give_back_current
+                                    TransResult::Abort { from: self, err }
+                                }
+                                TransResult::Fatal { err, .. } => TransResult::Fatal { err },
+                            }
+                        },
+                        state => {
+                            let name = state.name();
+                            *#guard_var = Some(state);
+                            drop(#guard_var);
+                            let err = bterr!("Client is in an unexpected state: '{name}'.");
+                            TransResult::Abort { from: self, err }
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    fn generate_client_handle_msg_dispatches<'a>(
+        &'a self,
+        actor: &'a ActorModel,
+        state: &'a StateModel,
+        method: &'a MethodModel,
+    ) -> impl 'a + Iterator<Item = TokenStream> {
+        method.inputs().iter().flat_map(move |input| {
+            self.generate_client_handle_msg_dispatch(actor, state, method, input)
+        })
+    }
+
+    fn generate_client_handle_msg_dispatch(
+        &self,
+        actor: &ActorModel,
+        state: &StateModel,
+        method: &MethodModel,
+        value: &ValueModel,
+    ) -> Option<TokenStream> {
+        let msg_enum_ident = self.msg_enum_ident();
+        let runtime_param = self.runtime_param();
+        let to_var = self.to_var();
+        let from_ident = self.from_ident();
+        let give_back_current = self.give_back(actor, state.name());
+        let actor_id = self.actor_id_param();
+        if let ValueKind::Dest { def, msg_type, .. } = value.kind() {
+            let var_name = value.var_name();
+            let msg_info = self.msg_lookup().lookup(&def.msg);
+            if let Some(reply) = msg_info.reply() {
+                let reply_name = reply.msg_name();
+                Some(quote! {
+                    let #var_name = {
+                        let arg = #msg_enum_ident::#msg_type(#var_name);
+                        let result = #runtime_param
+                            .call_service(#to_var, #from_ident.clone(), arg)
+                            .await;
+                        match result {
+                            Ok(value) => {
+                                match value {
+                                    #msg_enum_ident::#reply_name(value) => value,
+                                    value => {
+                                        #give_back_current
+                                        let err = bterr!("Unexpected message of type '{}' sent in response '{}' message.", value.name(), stringify!(#msg_type));
+                                        return TransResult::Abort { from: self, err };
                                     }
-                                ))
+                                }
+                            }
+                            Err(err) => {
+                                #give_back_current
+                                return TransResult::Abort { from: self, err };
+                            }
+                        }
+                    };
+                })
+            } else {
+                match &def.state {
+                    DestinationState::Service(_) => Some(quote! {
+                        let #var_name = {
+                            let arg = #msg_enum_ident::#msg_type(#var_name);
+                            let result = #runtime_param
+                                .send_service(#to_var, #from_ident.clone(), arg)
+                                .await;
+                            match result {
+                                Ok(()) => (),
+                                Err(err) => {
+                                    #give_back_current
+                                    return TransResult::Abort { from: self, err };
+                                }
                             }
                         };
-                        *guard = Some(new_state);
-                    }
-                    Ok(self.#new_state_method())
+                    }),
+                    DestinationState::Individual(state) => match method.dest_kind(state) {
+                        DestKind::Owner => {
+                            let actor_name = actor.name();
+                            Some(quote! {
+                                {
+                                    let msg = #msg_enum_ident::#msg_type(#var_name);
+                                    if let Err(err) = #runtime_param.send_owner(#actor_id, msg).await {
+                                        log::error!("Failed to send message to owner of actor '{}': {err}", stringify!(#actor_name));
+                                    }
+                                }
+                            })
+                        }
+                        DestKind::Owned(_name_var) => {
+                            Some(quote! { todo!("Dispatch to owned state."); })
+                        }
+                        DestKind::Sender => Some(quote! { todo!("Dispatch to sender.") }),
+                    },
                 }
             }
+        } else {
+            None
         }
     }
+
+    fn generate_out_state_spawns<'a>(
+        &'a self,
+        actor: &'a ActorModel,
+        state: &'a StateModel,
+        method: &'a MethodModel,
+    ) -> impl 'a + Iterator<Item = TokenStream> {
+        method
+            .outputs()
+            .iter()
+            .skip(1)
+            .flat_map(|output| {
+                if let ValueKind::State { def, .. } = output.kind() {
+                    Some((output.var_name(), def))
+                } else {
+                    None
+                }
+            })
+            .map(|(var_name, def)| {
+                let actor_to_spawn = self.actor_lookup().actor_with_state(&def.state_trait);
+                let model_to_spawn = self
+                    .actors()
+                    .get(actor_to_spawn)
+                    .unwrap_or_else(|| panic!("There was no actor named {actor_to_spawn}."));
+                let spawn_function = model_to_spawn.spawn_function_ident().unwrap_or_else(|| {
+                    panic!(
+                        "Actor {actor_to_spawn} of kind {:?} has no spawn function.",
+                        model_to_spawn.kind()
+                    )
+                });
+                let runtime = self.runtime_param();
+                let method_name = method.name();
+                // If the state has an owner specified, then given ownership to the caller,
+                // otherwise retain ownership.
+                if def.states_array_owner().is_some() {
+                    let from = self.from_ident();
+                    let owner = quote! { #from };
+                    quote! {
+                        if let Err(err) = #spawn_function(#runtime, #owner, #var_name).await {
+                            log::error!(
+                                "Failed to spawn {} actor after the {} method: {err}",
+                                stringify!(#actor_to_spawn),
+                                stringify!(#method_name)
+                            )
+                        }
+                    }
+                } else {
+                    let actor_name = self.actor_name_ident();
+                    let owner = quote! { #actor_name.clone() };
+                    let error_ident = format_ident!("err");
+                    let error =
+                        self.generate_actor_error(actor, state, method, quote! { #error_ident });
+                    quote! {
+                        let #var_name = match #spawn_function(#runtime, #owner, #var_name).await {
+                            Ok(name) => name,
+                            Err(#error_ident) => {
+                                return Err(#error);
+                            }
+                        };
+                    }
+                }
+            })
+    }
 }
 
 impl MethodModel {
@@ -955,11 +1156,8 @@ impl MethodModel {
     fn generate_trait_def(&self) -> TokenStream {
         let method_ident = self.name().as_ref();
         let msg_args = self.inputs().iter().map(|input| input.in_method_decl());
-        let output_decls = self.output_values().iter().flat_map(|output| output.decl());
-        let output_types = self
-            .output_values()
-            .iter()
-            .flat_map(|output| output.type_name());
+        let output_decls = self.outputs().iter().flat_map(|output| output.decl());
+        let output_types = self.outputs().iter().flat_map(|output| output.type_name());
         let future_name = self.future();
         quote! {
             #( #output_decls )*

+ 1 - 1
crates/btproto/src/lib.rs

@@ -36,7 +36,7 @@ macro_rules! unwrap_or_compile_err {
 /// name_def : "named" Ident ;
 /// version_def: "version" LitInt;
 /// actor_def : "let" Ident '=' ident_array ;
-/// ident_array : '[' Ident ( ',' Ident )* ','? ']' ;
+/// ident_array : '[' '*'? Ident ( ',' Ident )* ','? ']' ;
 /// transition : state ( '?' message )?  "->" states_list ( '>' dest_list )? ;
 /// state : Ident ident_array? ;
 /// states_list : state ( ',' state )* ','? ;

+ 260 - 25
crates/btproto/src/model.rs

@@ -4,7 +4,7 @@ use std::{
     rc::Rc,
 };
 
-use btrun::model::End;
+use btrun::model::{End, TransKind};
 use proc_macro2::{Ident, Span, TokenStream};
 use quote::{format_ident, quote, quote_spanned};
 
@@ -37,6 +37,8 @@ pub(crate) struct ProtocolModel {
     new_state_type_param: Ident,
     new_state_method: Ident,
     state_var: Ident,
+    to_var: Ident,
+    guard_var: Ident,
 }
 
 impl ProtocolModel {
@@ -55,7 +57,7 @@ impl ProtocolModel {
             if !actor_lookup.parents(actor_def.actor.as_ref()).is_empty() {
                 continue;
             }
-            let init_state = actor_def.states.as_ref().first().unwrap();
+            let init_state = actor_def.states.all().next().unwrap();
             let init_state_receives_no_msgs = def
                 .transitions
                 .iter()
@@ -129,6 +131,8 @@ impl ProtocolModel {
             new_state_type_param: format_ident!("NewState"),
             new_state_method: format_ident!("new_state"),
             state_var: format_ident!("state"),
+            to_var: format_ident!("to"),
+            guard_var: format_ident!("guard"),
         })
     }
 
@@ -178,7 +182,6 @@ impl ProtocolModel {
         self.actors_iter().flat_map(|actor| actor.states().values())
     }
 
-    #[cfg(test)]
     pub(crate) fn methods_iter(&self) -> impl Iterator<Item = &MethodModel> {
         self.states_iter()
             .flat_map(|state| state.methods().values())
@@ -187,7 +190,7 @@ impl ProtocolModel {
     #[cfg(test)]
     pub(crate) fn outputs_iter(&self) -> impl Iterator<Item = &ValueModel> {
         self.methods_iter()
-            .flat_map(|method| method.output_values().iter())
+            .flat_map(|method| method.outputs().iter())
     }
 
     /// Returns the tokens for the use statements which bring the types used inside spawn functions
@@ -286,6 +289,28 @@ impl ProtocolModel {
         &self.state_var
     }
 
+    /// The identifier for the variable used to hold the service or actor address a message is
+    /// being dispatched to in a client handle method.
+    pub(crate) fn to_var(&self) -> &Ident {
+        &self.to_var
+    }
+
+    /// The identifier for the variable holding the lock guard for a client's shared state.
+    pub(crate) fn guard_var(&self) -> &Ident {
+        &self.guard_var
+    }
+
+    /// Returns the code needed to give back a client's shared state in a handle method.
+    pub(crate) fn give_back(&self, actor: &ActorModel, state_name: &Ident) -> TokenStream {
+        let guard_var = self.guard_var();
+        let state_var = self.state_var();
+        let state_enum_ident = actor.state_enum_ident();
+        quote! {
+            *#guard_var = Some(#state_enum_ident::#state_name(#state_var));
+            drop(#guard_var);
+        }
+    }
+
     /// Returns an iterator over the [Ident]s for each of the states in [actor].
     pub(crate) fn state_idents<'a>(
         &'a self,
@@ -365,7 +390,7 @@ impl ProtocolModel {
             .methods()
             .values()
             // The first output of a method is the state this state transitions to.
-            .flat_map(|method| method.output_values().first().zip(Some(method.name())))
+            .flat_map(|method| method.outputs().first().zip(Some(method.name())))
             .filter(|(output, method_name)| {
                 let state_trait = output.kind().state_trait()
                     .unwrap_or_else(|| panic!("The first output of method {method_name} in state {} was not the correct kind.", state.name()));
@@ -498,7 +523,7 @@ impl ActorModel {
     pub(crate) fn init_state(&self) -> &StateModel {
         // It's a syntax error to have an IdentArray with no states in it, so this unwrap
         // shouldn't panic.
-        let init = self.def.states.as_ref().first().unwrap();
+        let init = self.def.states.all().next().unwrap();
         self.states.get(init).unwrap()
     }
 
@@ -513,6 +538,80 @@ impl ActorModel {
     pub(crate) fn handle_struct_ident(&self) -> Option<&Ident> {
         self.handle_struct_ident.as_ref()
     }
+
+    /// Returns an iterator over the startup methods in this actor, in the order in which they
+    /// should be called. A startup method is the unique method in a state which doesn't receive a
+    /// message. Client actors do not have startup methods.
+    pub(crate) fn startup_methods(&self) -> impl Iterator<Item = &'_ MethodModel> {
+        StartupMethodsIter::new(self)
+    }
+
+    /// Returns the state this actor is in after all of its startup methods have been called.
+    pub(crate) fn state_after_startup(&self) -> &Ident {
+        if let Some(method) = self.startup_methods().last() {
+            let state_name = method.def().in_state.state_trait.as_ref();
+            let state = self.states().get(state_name)
+                .unwrap_or_else(|| {
+                    panic!(
+                        "Actor does not contain state '{}' despite the fact that method '{}' transitions to it.",
+                        self.name(),
+                        method.name(),
+                    )
+                });
+            let method = state.startup_method().unwrap();
+            if let ValueKind::State { def, .. } = method.next_state().kind() {
+                def.state_trait.as_ref()
+            } else {
+                panic!("Next state output was not the `State` kind.");
+            }
+        } else {
+            self.init_state().name()
+        }
+    }
+}
+
+struct StartupMethodsIter<'a> {
+    actor: &'a ActorModel,
+    current: Option<&'a MethodModel>,
+}
+
+impl<'a> StartupMethodsIter<'a> {
+    fn new(actor: &'a ActorModel) -> Self {
+        let current = actor.init_state().startup_method();
+        Self { current, actor }
+    }
+}
+
+impl<'a> Iterator for StartupMethodsIter<'a> {
+    type Item = &'a MethodModel;
+    fn next(&mut self) -> Option<Self::Item> {
+        if let Some(method) = self.current.take() {
+            let next_name = if let ValueKind::State { def, .. } = method.next_state().kind() {
+                def.state_trait.as_ref()
+            } else {
+                panic!(
+                    "First output of method '{}' was not the `State` kind.",
+                    method.name()
+                );
+            };
+            self.current = if next_name == End::ident() {
+                None
+            } else {
+                self.actor.states().get(next_name)
+                .unwrap_or_else(|| {
+                    panic!(
+                        "Can't find state '{next_name}' in actor '{}' even though it was the first output of method '{}'.", 
+                        self.actor.name(),
+                        method.name()
+                    )
+                })
+                .startup_method()
+            };
+            Some(method)
+        } else {
+            None
+        }
+    }
 }
 
 pub(crate) struct StateModel {
@@ -567,7 +666,7 @@ impl StateModel {
 
     fn out_states_and_assoc_types(&self) -> impl Iterator<Item = (&Ident, &Ident)> {
         self.methods().values().flat_map(|method| {
-            method.output_values().iter().flat_map(|output| {
+            method.outputs().iter().flat_map(|output| {
                 output
                     .kind()
                     .state_trait()
@@ -576,6 +675,16 @@ impl StateModel {
             })
         })
     }
+
+    /// Returns the single method in this state which is not handling any messages. If no such
+    /// method exists, or if there are multiple such methods, then [None] is returned.
+    pub(crate) fn startup_method(&self) -> Option<&MethodModel> {
+        self.methods()
+            .values()
+            .filter(|method| method.msg_received_input().is_none())
+            .fold(SetOnce::Unset, |accum, curr| accum.set(curr))
+            .into_option()
+    }
 }
 
 impl GetSpan for StateModel {
@@ -699,7 +808,7 @@ impl MethodModel {
         &self.inputs
     }
 
-    pub(crate) fn output_values(&self) -> &Vec<ValueModel> {
+    pub(crate) fn outputs(&self) -> &Vec<ValueModel> {
         &self.outputs
     }
 
@@ -714,15 +823,111 @@ impl MethodModel {
 
     /// Returns the output for the state this method transitions into.
     pub(crate) fn next_state(&self) -> &ValueModel {
-        self.output_values().get(0).unwrap()
+        self.outputs()
+            .get(0)
+            .unwrap_or_else(|| panic!("Method '{}' had no outputs.", self.name()))
     }
 
     /// Returns an iterator over the output variables returned by this method.
     pub(crate) fn output_vars(&self) -> impl Iterator<Item = &'_ Ident> {
-        self.output_values()
+        self.outputs()
             .iter()
             .flat_map(|output| output.output_var_name())
     }
+
+    /// Returns the [MethodAttribution] for this method.
+    pub(crate) fn attribution(&self) -> MethodAttribution<'_> {
+        if let Some(in_msg) = self.def().in_msg() {
+            let message = in_msg.as_ref();
+            MethodAttribution {
+                message,
+                kind: TransKind::Receive,
+            }
+        } else {
+            let message = self
+                .def()
+                .out_msgs
+                .as_ref()
+                .first()
+                .unwrap_or_else(|| {
+                    panic!(
+                        "Method '{}' is not receiving or sending any messages.",
+                        self.name()
+                    )
+                })
+                .msg
+                .as_ref();
+            MethodAttribution {
+                message,
+                kind: TransKind::Receive,
+            }
+        }
+    }
+
+    /// Returns the [DestKind] of the given [State].
+    pub(crate) fn dest_kind<'a>(&'a self, state: &'a State) -> DestKind<'a> {
+        let in_state = &self.def().in_state;
+
+        let owner = in_state.states_array_owner();
+        if Some(&state.state_trait) == owner {
+            return DestKind::Owner;
+        }
+
+        let owned = in_state
+            .states_array_owned()
+            .find(|owned| *owned == &state.state_trait);
+        if let Some(_owned) = owned {
+            todo!("Figure out how to identify the variable an owned state's name is stored in.");
+        }
+
+        let in_msg_match = self
+            .def()
+            .in_msg()
+            .into_iter()
+            .flat_map(|msg| msg.states_array.as_ref().into_iter())
+            .flat_map(|states_array| states_array.all())
+            .any(|in_msg_state| in_msg_state == &state.state_trait);
+        if in_msg_match {
+            return DestKind::Sender;
+        }
+
+        panic!(
+            "Don't know how to deliver a message to state '{}' in method '{}'.",
+            state.state_trait.as_ref(),
+            self.name(),
+        );
+    }
+}
+
+/// Represents the kind of individual (non-service) recipients a message can be sent to.
+pub(crate) enum DestKind<'a> {
+    /// The recipient is the owner of the current actor.
+    Owner,
+    /// The recipient is owned by the current actor. The identifier of the variable containing the
+    /// owned actors name is contained in this variant.
+    #[allow(dead_code)]
+    Owned(&'a Ident),
+    /// The sender of the message which is currently being processed is the recipient.
+    Sender,
+}
+
+/// Indicates the [Message] a given method is associated with, and the [TransKind] which describes
+/// how that message was being handled.
+pub(crate) struct MethodAttribution<'a> {
+    message: &'a Message,
+    kind: TransKind,
+}
+
+impl<'a> MethodAttribution<'a> {
+    /// Return the [Message] which the method is attributable to.
+    pub(crate) fn message(&'a self) -> &'a Message {
+        self.message
+    }
+
+    /// Returns the [TransKind] which indicates how the `message()` is processed by the method.
+    pub(crate) fn kind(&'a self) -> TransKind {
+        self.kind
+    }
 }
 
 impl GetSpan for MethodModel {
@@ -834,7 +1039,7 @@ impl ValueModel {
     /// If this output actually appears in the tuple returned by its method, then its variable name
     /// is returned. Otherwise, `None` is returned.
     ///
-    /// An output of a transition is not actually be an output of the corresponding trait method in
+    /// An output of a transition may not actually be an output of the corresponding trait method in
     /// the case of a method in a client actor.
     pub(crate) fn output_var_name(&self) -> Option<&Ident> {
         if self.type_name.is_some() {
@@ -846,22 +1051,23 @@ impl ValueModel {
 
     /// Returns the token for this input when it appears in a client handle method.
     pub(crate) fn as_handle_param(&self) -> TokenStream {
-        let name = &self.var_name;
+        let var_name = &self.var_name;
         if let ValueKind::Dest { msg_type, .. } = &self.kind {
-            quote_spanned! {self.span()=> mut #name: #msg_type }
+            quote_spanned! {self.span()=> mut #var_name: #msg_type }
         } else {
             quote_spanned! {self.span() => }
         }
     }
 
-    #[allow(dead_code)]
-    pub(crate) fn as_method_call(&self) -> TokenStream {
-        if let ValueKind::Msg { .. } | ValueKind::Dest { .. } = &self.kind {
-            let var_name = &self.var_name;
-            quote_spanned! {self.span()=> #var_name }
-        } else {
-            quote_spanned! {self.span()=> }
+    pub(crate) fn as_client_method_call(&self, protocol: &ProtocolModel) -> TokenStream {
+        if let ValueKind::Dest { def, .. } = &self.kind {
+            let msg_info = protocol.msg_lookup().lookup(&def.msg);
+            if msg_info.is_call() {
+                let var_name = &self.var_name;
+                return quote_spanned! {self.span()=> #var_name };
+            }
         }
+        quote_spanned! {self.span()=> }
     }
 
     pub(crate) fn in_method_decl(&self) -> TokenStream {
@@ -869,19 +1075,19 @@ impl ValueModel {
         match &self.kind {
             ValueKind::Msg { def, .. } => {
                 let msg_type = def.msg_type.as_ref();
-                quote! { #var_name: #msg_type }
+                quote_spanned! {self.span()=> #var_name: #msg_type }
             }
             ValueKind::State { .. } => quote_spanned! {self.span()=> },
             // Dest values only ever occur in the inputs of clients. In client handles, only call
             // replies are passed in.
             ValueKind::Dest { reply_type, .. } => {
                 if let Some(reply_type) = reply_type {
-                    quote! { #var_name: #reply_type }
+                    quote_spanned! {self.span()=> #var_name: #reply_type }
                 } else {
-                    quote! {}
+                    quote_spanned! {self.span()=>}
                 }
             }
-            ValueKind::Return => quote! {},
+            ValueKind::Return => quote_spanned! {self.span()=>},
         }
     }
 }
@@ -978,7 +1184,7 @@ impl ActorLookup {
             let mut states = HashSet::new();
             let actor_name = &actor_def.actor;
             let mut first = true;
-            for state in actor_def.states.as_ref().iter() {
+            for state in actor_def.states.all() {
                 if first {
                     actors_by_init_state.insert(state.clone(), actor_name.clone());
                     first = false;
@@ -1247,6 +1453,35 @@ impl Hash for MsgInfo {
     }
 }
 
+/// Represents a variable which can only be set once. An attempt to set it two or more times will
+/// cause it to lose it's value and become an error.
+pub(crate) enum SetOnce<T> {
+    Unset,
+    Set(T),
+    Error,
+}
+
+impl<T> SetOnce<T> {
+    /// Sets the value in this instance, if this instance has not already been set.
+    pub(crate) fn set(self, value: T) -> Self {
+        match self {
+            SetOnce::Unset => SetOnce::Set(value),
+            SetOnce::Set(_) => SetOnce::Error,
+            SetOnce::Error => SetOnce::Error,
+        }
+    }
+
+    /// Returns the value contained in this instance, if it has been set exactly once. Otherwise
+    /// [None] is returned.
+    pub(crate) fn into_option(self) -> Option<T> {
+        if let SetOnce::Set(value) = self {
+            Some(value)
+        } else {
+            None
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::{

+ 164 - 60
crates/btproto/src/parsing.rs

@@ -220,9 +220,13 @@ pub(crate) struct ActorDef {
     pub(crate) states: IdentArray,
 }
 
+impl ActorDef {
+    const STAR_ERROR: &str = "An actor definition cannot contain a star.";
+}
+
 impl ActorDef {
     #[cfg(test)]
-    pub(crate) fn new<T, I>(actor: &str, state_names: T) -> Self
+    pub(crate) fn new<T, I>(actor: &str, states: T) -> Self
     where
         T: IntoIterator<IntoIter = I>,
         I: ExactSizeIterator<Item = &'static str>,
@@ -231,7 +235,7 @@ impl ActorDef {
             let_token: Token![let](Span::call_site()),
             actor: new_ident(actor).into(),
             eq_token: Token![=](Span::call_site()),
-            states: IdentArray::new(state_names).unwrap(),
+            states: IdentArray::new(states).unwrap(),
         }
     }
 }
@@ -239,12 +243,16 @@ impl ActorDef {
 impl Parse for ActorDef {
     /// actor_def : "let" Ident '=' ident_array ;
     fn parse(input: syn::parse::ParseStream) -> syn::Result<Self> {
-        Ok(Self {
+        let actor_def = Self {
             let_token: input.parse()?,
             actor: Rc::new(input.parse()?),
             eq_token: input.parse()?,
             states: input.parse()?,
-        })
+        };
+        if let Some(star) = &actor_def.states.star {
+            return Err(syn::Error::new(star.span(), Self::STAR_ERROR));
+        }
+        Ok(actor_def)
     }
 }
 
@@ -262,24 +270,67 @@ impl GetSpan for ActorDef {
 #[derive(Hash, PartialEq, Eq)]
 pub(crate) struct IdentArray {
     bracket: Bracket,
+    star: Option<Token![*]>,
     idents: Punctuated<Rc<Ident>, Token![,]>,
 }
 
 impl IdentArray {
     const EMPTY_ERR: &str = "at least one state is required";
+
+    /// Returns the owner state in this array, if this array contains an owner. Otherwise, [None] is
+    /// returned.
+    pub(crate) fn owner(&self) -> Option<&Rc<Ident>> {
+        if self.star.is_some() {
+            self.idents.first()
+        } else {
+            None
+        }
+    }
+
+    /// Returns an iterator over the owned states in this array.
+    pub(crate) fn owned(&self) -> impl Iterator<Item = &Rc<Ident>> {
+        let mut idents = self.idents.iter();
+        if self.star.is_some() {
+            // Skip the first ident because it's the owner.
+            idents.next();
+        }
+        idents
+    }
+
+    /// Returns an iterator over all of the states in this array, owner and owned alike.
+    pub(crate) fn all(&self) -> impl Iterator<Item = &Rc<Ident>> {
+        self.idents.iter()
+    }
 }
 
 #[cfg(test)]
 impl IdentArray {
-    pub(crate) fn new<T, I>(state_names: T) -> Option<Self>
+    pub(crate) fn new<T, I>(owned_names: T) -> Option<Self>
     where
         T: IntoIterator<IntoIter = I>,
         I: ExactSizeIterator<Item = &'static str>,
     {
-        let state_names = state_names.into_iter();
-        if state_names.len() > 0 {
+        Self::new_with_owner(None, owned_names)
+    }
+
+    pub(crate) fn new_with_owner<T, I>(
+        owner_name: Option<&'static str>,
+        owned_names: T,
+    ) -> Option<Self>
+    where
+        T: IntoIterator<IntoIter = I>,
+        I: ExactSizeIterator<Item = &'static str>,
+    {
+        let owned_names = owned_names.into_iter();
+        // When this was written `std::iter::Chain<T, U>` didn't implement `ExactSizeIterator` even
+        // when both `T` and `U` did.
+        let len = owned_names.len() + if owner_name.is_some() { 1 } else { 0 };
+        let star = owner_name.map(|_| Token![*](Span::call_site()));
+        let state_names = owner_name.into_iter().chain(owned_names);
+        if len > 0 {
             Some(Self {
                 bracket: Bracket::default(),
+                star,
                 idents: state_names.map(new_ident).map(Rc::new).collect(),
             })
         } else {
@@ -303,18 +354,17 @@ impl Parse for IdentArray {
     fn parse(input: syn::parse::ParseStream) -> syn::Result<Self> {
         let content;
         let bracket = bracketed!(content in input);
+        let star = content.parse::<Token![*]>().ok();
         let idents =
             content.parse_terminated(|input| Ok(Rc::new(Ident::parse(input)?)), Token![,])?;
         if idents.is_empty() {
             return Err(syn::Error::new(bracket.span.open(), Self::EMPTY_ERR));
         }
-        Ok(Self { bracket, idents })
-    }
-}
-
-impl AsRef<Punctuated<Rc<Ident>, Token![,]>> for IdentArray {
-    fn as_ref(&self) -> &Punctuated<Rc<Ident>, Token![,]> {
-        &self.idents
+        Ok(Self {
+            bracket,
+            star,
+            idents,
+        })
     }
 }
 
@@ -333,6 +383,23 @@ impl Transition {
     pub(crate) fn in_msg(&self) -> Option<&Rc<Message>> {
         self.in_msg.as_ref().map(|(_, msg)| msg)
     }
+
+    /// Returns an iterator over all of the states owned by this transition.
+    pub(crate) fn owned_states(&self) -> impl Iterator<Item = &'_ Rc<Ident>> {
+        let in_state_owned = self
+            .in_state
+            .states_array
+            .as_ref()
+            .into_iter()
+            .flat_map(|states_array| states_array.owned());
+        let in_msg_owned = self.in_msg().into_iter().flat_map(|msg| {
+            msg.states_array
+                .as_ref()
+                .into_iter()
+                .flat_map(|states_array| states_array.owned())
+        });
+        in_msg_owned.chain(in_state_owned)
+    }
 }
 
 #[cfg(test)]
@@ -393,59 +460,66 @@ impl GetSpan for Transition {
     }
 }
 
-struct IdentIter<'a> {
-    idents: Option<syn::punctuated::Iter<'a, Rc<Ident>>>,
+#[cfg_attr(test, derive(Debug))]
+#[derive(Hash, PartialEq, Eq)]
+pub(crate) struct State {
+    pub(crate) state_trait: Rc<Ident>,
+    pub(crate) states_array: Option<IdentArray>,
 }
 
-impl<'a> IdentIter<'a> {
-    fn new(idents: Option<syn::punctuated::Iter<'a, Rc<Ident>>>) -> Self {
-        Self { idents }
+impl State {
+    /// Returns an iterator over the owned states in this state's states array.
+    pub(crate) fn states_array_owned(&self) -> impl Iterator<Item = &'_ Rc<Ident>> {
+        self.states_array
+            .as_ref()
+            .into_iter()
+            .flat_map(|states_array| states_array.owned())
     }
-}
 
-impl<'a> Iterator for IdentIter<'a> {
-    type Item = &'a Rc<Ident>;
-    fn next(&mut self) -> Option<Self::Item> {
-        if let Some(idents) = self.idents.as_mut() {
-            idents.next()
+    pub(crate) fn states_array_owner(&self) -> Option<&Rc<Ident>> {
+        if let Some(states_array) = self.states_array.as_ref() {
+            states_array.owner()
         } else {
             None
         }
     }
-}
 
-#[cfg_attr(test, derive(Debug))]
-#[derive(Hash, PartialEq, Eq)]
-pub(crate) struct State {
-    pub(crate) state_trait: Rc<Ident>,
-    pub(crate) owned_states: Option<IdentArray>,
-}
-
-impl State {
-    pub(crate) fn owned_states(&self) -> impl Iterator<Item = &'_ Rc<Ident>> {
-        IdentIter::new(
-            self.owned_states
-                .as_ref()
-                .map(|idents| idents.as_ref().iter()),
-        )
+    /// Returns an iterator over all of the states in the states array associated with this state.
+    pub(crate) fn states_array_all(&self) -> impl Iterator<Item = &'_ Rc<Ident>> {
+        self.states_array
+            .as_ref()
+            .into_iter()
+            .flat_map(|states_array| states_array.all())
     }
 }
 
 #[cfg(test)]
 impl State {
-    pub(crate) fn new<T, I>(state_trait: &str, owned_states: T) -> Self
+    pub(crate) fn new<T, I>(state_trait: &str, owned_names: T) -> Self
     where
         T: IntoIterator<IntoIter = I>,
         I: ExactSizeIterator<Item = &'static str>,
     {
-        Self {
-            state_trait: new_ident(state_trait).into(),
-            owned_states: IdentArray::new(owned_states),
-        }
+        Self::new_with_owner(state_trait, None, owned_names)
     }
 
     fn new_empty_owned(state_trait: &str) -> Self {
-        Self::new(state_trait, std::iter::empty())
+        Self::new_with_owner(state_trait, None, std::iter::empty())
+    }
+
+    pub(crate) fn new_with_owner<T, I>(
+        state_trait: &str,
+        owner_name: Option<&'static str>,
+        owned_names: T,
+    ) -> Self
+    where
+        T: IntoIterator<IntoIter = I>,
+        I: ExactSizeIterator<Item = &'static str>,
+    {
+        Self {
+            state_trait: new_ident(state_trait).into(),
+            states_array: IdentArray::new_with_owner(owner_name, owned_names),
+        }
     }
 }
 
@@ -460,14 +534,14 @@ impl Parse for State {
         };
         Ok(Self {
             state_trait,
-            owned_states,
+            states_array: owned_states,
         })
     }
 }
 
 impl GetSpan for State {
     fn span(&self) -> Span {
-        self.state_trait.span().left_join(&self.owned_states)
+        self.state_trait.span().left_join(&self.states_array)
     }
 }
 
@@ -620,7 +694,7 @@ impl Parse for DestinationState {
             }
             Ok(DestinationState::Service(State {
                 state_trait: dest_state.into(),
-                owned_states: None,
+                states_array: None,
             }))
         } else {
             Ok(DestinationState::Individual(input.parse()?))
@@ -643,7 +717,7 @@ impl GetSpan for DestinationState {
 pub(crate) struct Message {
     pub(crate) msg_type: Rc<Ident>,
     reply_part: Option<MessageReplyPart>,
-    pub(crate) owned_states: Option<IdentArray>,
+    pub(crate) states_array: Option<IdentArray>,
     ident: Option<Ident>,
 }
 
@@ -663,11 +737,10 @@ impl Message {
     }
 
     pub(crate) fn owned_states(&self) -> impl Iterator<Item = &'_ Rc<Ident>> {
-        IdentIter::new(
-            self.owned_states
-                .as_ref()
-                .map(|states| states.as_ref().iter()),
-        )
+        self.states_array
+            .as_ref()
+            .into_iter()
+            .flat_map(|states_array| states_array.owned())
     }
 }
 
@@ -691,7 +764,7 @@ impl Message {
         Self {
             msg_type: Rc::new(new_ident(msg_type)),
             reply_part,
-            owned_states: IdentArray::new(owned_states),
+            states_array: IdentArray::new(owned_states),
             ident: ident_field,
         }
     }
@@ -718,7 +791,7 @@ impl Parse for Message {
         Ok(Self {
             msg_type,
             reply_part,
-            owned_states,
+            states_array: owned_states,
             ident,
         })
     }
@@ -729,7 +802,7 @@ impl GetSpan for Message {
         self.msg_type
             .span()
             .left_join(self.reply_part.as_ref())
-            .left_join(&self.owned_states)
+            .left_join(&self.states_array)
     }
 }
 
@@ -831,7 +904,7 @@ impl MaybeGetSpan for Span {
     }
 }
 
-trait LeftJoin<Rhs> {
+pub(crate) trait LeftJoin<Rhs> {
     /// Attempts to join two [GetSpan] values, but if the result of the join is `None`, then just
     /// the left span is returned.
     fn left_join(&self, other: Rhs) -> Span;
@@ -881,6 +954,8 @@ impl<T: Parse, U: Parse> ParsePunctuatedList for Punctuated<Rc<T>, U> {
 
 #[cfg(test)]
 mod tests {
+    use crate::error::assert_err;
+
     use super::*;
     use syn::parse_str;
 
@@ -1028,6 +1103,15 @@ Init?Activate -> End;"
         assert_eq!(expected, actual);
     }
 
+    #[test]
+    fn actor_def_parse_star_is_err() {
+        const INPUT: &str = "let actor = [*Owner, Owned];";
+
+        let actual = parse_str::<ActorDef>(INPUT);
+
+        assert_err(actual, ActorDef::STAR_ERROR);
+    }
+
     #[test]
     fn ident_array_new() {
         const EXPECTED: [&str; 2] = ["Red", "Green"];
@@ -1077,6 +1161,26 @@ Init?Activate -> End;"
         assert_eq!(IdentArray::EMPTY_ERR, err_str);
     }
 
+    #[test]
+    fn ident_array_stared_is_owner() {
+        const EXPECTED_OWNER: &str = "Owner";
+        const EXPECTED_OWNED: &str = "Owned";
+        let input = format!("[*{EXPECTED_OWNER}, {EXPECTED_OWNED}]");
+
+        let actual = parse_str::<IdentArray>(input.as_str()).unwrap();
+
+        assert_eq!(
+            Some(true),
+            actual
+                .owner()
+                .map(|actual| actual.as_ref() == EXPECTED_OWNER)
+        );
+        let actual_owned_vec: Vec<_> = actual.owned().collect();
+        assert_eq!(1, actual_owned_vec.len());
+        let actual_owned = actual_owned_vec.first().unwrap().as_ref();
+        assert_eq!(actual_owned, EXPECTED_OWNED);
+    }
+
     #[test]
     fn transition_parse_minimal() {
         const EXPECTED_IN_STATE: &str = "Catcher";
@@ -1316,7 +1420,7 @@ Init?Activate -> End;"
 
         assert_eq!(actual.msg_type.as_ref(), EXPECTED_MSG_TYPE);
         assert_eq!(actual.is_reply(), EXPECTED_IS_REPLY);
-        let idents = actual.owned_states.unwrap().idents;
+        let idents = actual.states_array.unwrap().idents;
         assert_eq!(idents.len(), EXPECTED_OWNED_STATES.len());
         assert_eq!(idents[0].as_ref(), EXPECTED_OWNED_STATES[0]);
         assert_eq!(idents[1].as_ref(), EXPECTED_OWNED_STATES[1]);

+ 519 - 16
crates/btproto/src/validation.rs

@@ -1,4 +1,4 @@
-use std::{collections::HashSet, hash::Hash};
+use std::{collections::HashSet, hash::Hash, rc::Rc};
 
 use proc_macro2::{Ident, Span};
 
@@ -6,8 +6,8 @@ use btrun::model::End;
 
 use crate::{
     error::{self, MaybeErr},
-    model::{MsgInfo, ProtocolModel},
-    parsing::{DestinationState, GetSpan, State},
+    model::{ActorKind, MsgInfo, ProtocolModel, ValueKind},
+    parsing::{DestinationState, GetSpan, LeftJoin, State},
 };
 
 impl ProtocolModel {
@@ -18,6 +18,10 @@ impl ProtocolModel {
             .combine(self.no_undeliverable_msgs())
             .combine(self.replies_expected())
             .combine(self.no_unobservable_states())
+            .combine(self.no_ambiguous_trans())
+            .combine(self.no_exit_races())
+            .combine(self.no_invalid_out_state_provenance())
+            .combine(self.no_invalid_out_msg_provenance())
             .into()
     }
 
@@ -27,7 +31,7 @@ impl ProtocolModel {
         let mut declared: HashSet<&Ident> = HashSet::new();
         declared.insert(&end);
         for actor_def in self.def().actor_defs.iter() {
-            for state in actor_def.states.as_ref().iter() {
+            for state in actor_def.states.all() {
                 declared.insert(state);
             }
         }
@@ -35,13 +39,13 @@ impl ProtocolModel {
         for transition in self.def().transitions.iter() {
             let in_state = &transition.in_state;
             used.insert(&in_state.state_trait);
-            used.extend(in_state.owned_states().map(|ident| ident.as_ref()));
+            used.extend(in_state.states_array_owned().map(|ident| ident.as_ref()));
             if let Some(in_msg) = transition.in_msg() {
                 used.extend(in_msg.owned_states().map(|ident| ident.as_ref()));
             }
             for out_states in transition.out_states.as_ref().iter() {
                 used.insert(&out_states.state_trait);
-                used.extend(out_states.owned_states().map(|ident| ident.as_ref()));
+                used.extend(out_states.states_array_owned().map(|ident| ident.as_ref()));
             }
             // We don't have to check the states referred to in out_msgs because the
             // receivers_and_senders_matched method ensures that each of these exists in a receiver
@@ -173,17 +177,24 @@ impl ProtocolModel {
                 match &dest.state {
                     DestinationState::Service(_) => continue,
                     DestinationState::Individual(dest_state) => {
-                        let owned_states = transition
+                        let allowed_from_msg = transition
+                            .in_msg()
+                            .into_iter()
+                            .flat_map(|msg| msg.states_array.as_ref())
+                            .flat_map(|array| array.all())
+                            .map(|ptr| ptr.as_ref());
+                        let allowed_from_state = transition
                             .in_state
-                            .owned_states()
-                            .map(|ident| ident.as_ref());
+                            .states_array_all()
+                            .map(|ident| ident.as_ref())
+                            .chain(allowed_from_msg);
                         let allowed = allowed_states.get_or_insert_with(|| {
                             transition
                                 .out_states
                                 .as_ref()
                                 .iter()
                                 .map(|state| state.state_trait.as_ref())
-                                .chain(owned_states)
+                                .chain(allowed_from_state)
                                 .collect()
                         });
                         if !allowed.contains(dest_state.state_trait.as_ref()) {
@@ -247,15 +258,197 @@ impl ProtocolModel {
             .filter(|actor| actor.kind().is_client())
             .flat_map(|actor| actor.states().values())
             .filter(|state| {
-                state.methods().values().all(|method| {
-                    if let Some(in_msg) = method.def().in_msg() {
-                        in_msg.is_reply()
+                !state.methods().is_empty()
+                    && state.methods().values().all(|method| {
+                        if let Some(in_msg) = method.def().in_msg() {
+                            in_msg.is_reply()
+                        } else {
+                            false
+                        }
+                    })
+            })
+            .map(|state| syn::Error::new(state.span(), error::msgs::UNOBSERVABLE_STATE))
+            .collect()
+    }
+
+    /// Checks that there are no two transitions defined in a non-client state which do not receive
+    /// messages. Such transition create ambiguity because we don't know which of the two
+    /// transitions to execute before entering the non-client's loop.
+    fn no_ambiguous_trans(&self) -> MaybeErr {
+        self.actors_iter()
+            .flat_map(|actor| {
+                if matches!(actor.kind(), ActorKind::Client) {
+                    return None;
+                }
+                let no_input_methods: Vec<_> = actor
+                    .states()
+                    .values()
+                    .flat_map(|state| state.methods().values())
+                    .filter(|method| method.msg_received_input().is_none())
+                    .collect();
+                if no_input_methods.len() > 1 {
+                    Some(no_input_methods)
+                } else {
+                    None
+                }
+            })
+            .map(|no_input_methods| {
+                let mut iter = no_input_methods.into_iter();
+                let mut span = iter.next().unwrap().span();
+                for method in iter {
+                    span = span.left_join(method.span());
+                }
+                syn::Error::new(span, error::msgs::AMBIGUOUS_TRANS)
+            })
+            .collect()
+    }
+
+    /// Ensures that no race conditions exist between the delivery of exit messages by the runtime
+    /// and application messages.
+    fn no_exit_races(&self) -> MaybeErr {
+        self.methods_iter()
+            .filter(|method| {
+                let next_state_name =
+                    if let ValueKind::State { def, .. } = method.next_state().kind() {
+                        def.state_trait.as_ref()
                     } else {
-                        false
+                        panic!(
+                            "Next state output of method '{}' is not actually a state.",
+                            method.name()
+                        );
+                    };
+                next_state_name == End::ident()
+            })
+            .flat_map(|method| {
+                let owned: HashSet<_> = method
+                    .def()
+                    .owned_states()
+                    .map(|ptr| ptr.as_ref())
+                    .collect();
+                method.def().out_msgs.as_ref().iter().flat_map(move |dest| {
+                    if let DestinationState::Individual(state) = &dest.state {
+                        let dest_state = state.state_trait.as_ref();
+                        if owned.contains(dest_state) {
+                            Some(dest)
+                        } else {
+                            None
+                        }
+                    } else {
+                        None
                     }
                 })
             })
-            .map(|state| syn::Error::new(state.span(), error::msgs::UNOBSERVABLE_STATE))
+            .map(|dest| syn::Error::new(dest.span(), error::msgs::EXIT_RACE))
+            .collect()
+    }
+
+    /// Checks that ownership granted to output states given when the input
+    /// state or the input message in the same transition owns the granted state.
+    fn no_invalid_out_state_provenance(&self) -> MaybeErr {
+        self.actors_iter()
+            .flat_map(|actor| actor.states().values().map(move |state| (actor, state)))
+            .flat_map(|(actor, state)| state.methods().values().map(move |method| (actor, method)))
+            .flat_map(|(actor, method)| {
+                let valid_owned_states: HashSet<&Rc<Ident>> = if actor.kind().is_client() {
+                    // A client is allowed to get ownership from the replies to the messages it's
+                    // sending.
+                    let states_from_reply = method
+                        .def()
+                        .out_msgs
+                        .as_ref()
+                        .iter()
+                        .filter(|out_msg| {
+                            let msg_info = self.msg_lookup().lookup(&out_msg.msg);
+                            msg_info.is_call()
+                        })
+                        .flat_map(|out_msg| {
+                            let receiver_state_name =
+                                out_msg.state.state_ref().state_trait.as_ref();
+                            let receiver_state = self.get_state(receiver_state_name);
+                            let msg = &out_msg.msg;
+                            let receiver_method = receiver_state
+                                .methods()
+                                .values()
+                                .find(|method| method.def().in_msg() == Some(msg))
+                                .unwrap_or_else(|| {
+                                    panic!(
+                                    "There is no receiver for message '{}' sent by method '{}'.",
+                                    msg.msg_type.as_ref(),
+                                    method.name(),
+                                )
+                                });
+                            let reply_msg = receiver_method
+                                .def()
+                                .out_msgs
+                                .as_ref()
+                                .iter()
+                                .find(|reply_msg| {
+                                    reply_msg.msg.is_reply()
+                                        && reply_msg.msg.msg_type.as_ref() == msg.msg_type.as_ref()
+                                })
+                                .unwrap_or_else(|| {
+                                    panic!(
+                                        "Method '{}' is not sending reply to message '{}'.",
+                                        receiver_method.name(),
+                                        msg.msg_type.as_ref(),
+                                    )
+                                });
+                            reply_msg.msg.owned_states()
+                        });
+                    method
+                        .def()
+                        .owned_states()
+                        .chain(states_from_reply)
+                        .collect()
+                } else {
+                    method.def().owned_states().collect()
+                };
+                let valid_owned_states = Rc::new(valid_owned_states);
+                method
+                    .def()
+                    .out_states
+                    .as_ref()
+                    .iter()
+                    .flat_map(move |out_state| {
+                        let valid_owned_states = valid_owned_states.clone();
+                        out_state
+                            .states_array_owned()
+                            .filter(move |out_state_owned| {
+                                !valid_owned_states.contains(out_state_owned)
+                            })
+                    })
+            })
+            .map(|state| syn::Error::new(state.span(), error::msgs::INVALID_OUT_STATE_PROVENANCE))
+            .collect()
+    }
+
+    /// Checks that ownership granted to output messages is only given for states owned by the
+    /// input state, input message, or which are output states in the same transition.
+    fn no_invalid_out_msg_provenance(&self) -> MaybeErr {
+        self.methods_iter()
+            .flat_map(|method| {
+                let out_msg_not_valid = |state: &&Rc<Ident>| {
+                    method
+                        .def()
+                        .owned_states()
+                        .chain(
+                            method
+                                .def()
+                                .out_states
+                                .as_ref()
+                                .iter()
+                                .map(|out_state| &out_state.state_trait),
+                        )
+                        .all(|owned_state| owned_state.as_ref() != state.as_ref())
+                };
+                method
+                    .def()
+                    .out_msgs
+                    .as_ref()
+                    .iter()
+                    .flat_map(move |out_msg| out_msg.msg.owned_states().filter(out_msg_not_valid))
+            })
+            .map(|state| syn::Error::new(state.span(), error::msgs::INVALID_OUT_MSG_PROVENANCE))
             .collect()
     }
 }
@@ -272,7 +465,7 @@ mod tests {
     fn all_states_declared_and_used_ok() {
         let input = ProtocolModel::new(Protocol::minimal()).unwrap();
 
-        let result = input.all_states_declared_and_used();
+        let result = input.validate();
 
         assert_ok(result);
     }
@@ -635,6 +828,48 @@ mod tests {
         assert_ok(result);
     }
 
+    #[test]
+    fn no_undeliverable_msgs_sending_when_handling_owned_msg_ok() {
+        let input = ProtocolModel::new(Protocol::new(
+            NameDef::new("EndAndSend"),
+            [
+                ActorDef::new("client", ["Client", "Subbed"]),
+                ActorDef::new("server", ["Server"]),
+            ],
+            [
+                Transition::new(
+                    State::new("Client", []),
+                    None,
+                    [State::new("Subbed", [])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Server", [])),
+                        Message::new("GiveSelf", false, ["Subbed"]),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Server", []),
+                    Some(Message::new("GiveSelf", false, ["Subbed"])),
+                    [State::new("Server", ["Subbed"])],
+                    [Dest::new(
+                        DestinationState::Individual(State::new("Subbed", [])),
+                        Message::new("FinalOrders", false, []),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Subbed", []),
+                    Some(Message::new("FinalOrders", false, [])),
+                    [State::new("End", [])],
+                    [],
+                ),
+            ],
+        ))
+        .unwrap();
+
+        let result = input.validate();
+
+        assert_ok(result);
+    }
+
     #[test]
     fn no_undeliverable_msgs_err() {
         let input = ProtocolModel::new(Protocol::new(
@@ -728,4 +963,272 @@ mod tests {
 
         assert_err(result, error::msgs::MULTIPLE_REPLIES);
     }
+
+    #[test]
+    fn no_unobservable_states_reply_to_client_ok() {
+        let input = ProtocolModel::new(Protocol::new(
+            NameDef::new("Test"),
+            [
+                ActorDef::new("server", ["Server"]),
+                ActorDef::new("client", ["Client"]),
+            ],
+            [
+                Transition::new(
+                    State::new("Client", []),
+                    None,
+                    [State::new("Client", [])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Server", [])),
+                        Message::new("Msg", false, []),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Server", []),
+                    Some(Message::new("Msg", false, [])),
+                    [State::new("Server", [])],
+                    [Dest::new(
+                        DestinationState::Individual(State::new("Client", [])),
+                        Message::new("Msg", true, []),
+                    )],
+                ),
+            ],
+        ))
+        .unwrap();
+
+        let result = input.validate();
+
+        assert_ok(result);
+    }
+
+    /// Only one method in a non-client actor state is allowed to have no input message, because
+    /// otherwise there would be ambiguity as to which method to call before the actor entered
+    /// its loop.
+    #[test]
+    fn no_ambiguous_trans_err() {
+        let input = ProtocolModel::new(Protocol::new(
+            NameDef::new("MultipleNoInputTransitions"),
+            [
+                ActorDef::new("client", ["Client"]),
+                ActorDef::new("server", ["Server"]),
+                ActorDef::new("worker", ["Worker"]),
+            ],
+            [
+                Transition::new(
+                    State::new("Client", []),
+                    None,
+                    [State::new("Client", [])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Server", [])),
+                        Message::new("Msg", false, []),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Server", []),
+                    Some(Message::new("Msg", false, [])),
+                    [State::new("Server", []), State::new("Worker", [])],
+                    [],
+                ),
+                Transition::new(
+                    State::new("Server", []),
+                    Some(Message::new("Halt", false, [])),
+                    [State::new("End", [])],
+                    [],
+                ),
+                // Notice that worker sends two different messages from its initial state. This
+                // crates a ambiguity because we don't know which one should be called in the
+                // generated code.
+                Transition::new(
+                    State::new("Worker", []),
+                    None,
+                    [State::new("Worker", [])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Server", [])),
+                        Message::new("Msg", false, []),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Worker", []),
+                    None,
+                    [State::new("End", [])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Server", [])),
+                        Message::new("Halt", false, []),
+                    )],
+                ),
+            ],
+        ))
+        .unwrap();
+
+        let result = input.no_ambiguous_trans();
+
+        assert_err(result, error::msgs::AMBIGUOUS_TRANS);
+    }
+
+    #[test]
+    fn no_exit_races_err() {
+        let input = ProtocolModel::new(Protocol::new(
+            NameDef::new("EndAndSend"),
+            [
+                ActorDef::new("client", ["Client", "Subbed"]),
+                ActorDef::new("server", ["Server"]),
+            ],
+            [
+                Transition::new(
+                    State::new("Client", []),
+                    None,
+                    [State::new("Subbed", [])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Server", [])),
+                        Message::new("GiveSelf", false, ["Subbed"]),
+                    )],
+                ),
+                // Notice that Server sends FinalOrders to Subbed and immediately exits. This
+                // creates a race condition between the owner exited message sent by the runtime
+                // and the FinalOrders message.
+                Transition::new(
+                    State::new("Server", []),
+                    Some(Message::new("GiveSelf", false, ["Subbed"])),
+                    [State::new("End", ["Subbed"])],
+                    [Dest::new(
+                        DestinationState::Individual(State::new("Subbed", [])),
+                        Message::new("FinalOrders", false, []),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Subbed", []),
+                    Some(Message::new("FinalOrders", false, [])),
+                    [State::new("End", [])],
+                    [],
+                ),
+            ],
+        ))
+        .unwrap();
+
+        let result = input.validate();
+
+        assert_err(result, error::msgs::EXIT_RACE);
+    }
+
+    #[test]
+    fn no_invalid_out_state_provenance_ok() {
+        let input = ProtocolModel::new(Protocol::minimal()).unwrap();
+
+        let result = input.no_invalid_out_state_provenance();
+
+        assert_ok(result);
+    }
+
+    #[test]
+    fn no_invalid_out_state_provenance_ownership_from_reply_ok() {
+        let input = ProtocolModel::new(Protocol::new(
+            NameDef::new("OwnershipFromReply"),
+            [
+                ActorDef::new("client", ["Client"]),
+                ActorDef::new("handle", ["Handle"]),
+                ActorDef::new("server", ["Server"]),
+                ActorDef::new("worker", ["Worker"]),
+            ],
+            [
+                Transition::new(
+                    State::new("Client", []),
+                    None,
+                    [State::new("Client", []), State::new("Handle", ["Worker"])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Server", [])),
+                        Message::new("Open", false, []),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Server", []),
+                    Some(Message::new("Open", false, [])),
+                    [State::new("Server", []), State::new("Worker", [])],
+                    [Dest::new(
+                        DestinationState::Individual(State::new("Client", [])),
+                        Message::new("Open", true, ["Worker"]),
+                    )],
+                ),
+            ],
+        ))
+        .unwrap();
+
+        let result = input.validate();
+
+        assert_ok(result);
+    }
+
+    #[test]
+    fn no_invalid_out_state_provenance_err() {
+        let input = ProtocolModel::new(Protocol::new(
+            NameDef::new("Test"),
+            [
+                ActorDef::new("server", ["Server"]),
+                ActorDef::new("client", ["Client"]),
+                ActorDef::new("worker", ["Worker"]),
+            ],
+            [
+                Transition::new(
+                    State::new("Client", []),
+                    None,
+                    [State::new("Client", [])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Server", [])),
+                        Message::new("Msg", false, []),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Server", []),
+                    Some(Message::new("Msg", false, [])),
+                    [
+                        State::new("Server", []),
+                        // Note that this transition doesn't have the right to give ownership of
+                        // Client.
+                        State::new("Worker", ["Client"]),
+                    ],
+                    [],
+                ),
+            ],
+        ))
+        .unwrap();
+
+        let result = input.validate();
+
+        assert_err(result, error::msgs::INVALID_OUT_STATE_PROVENANCE);
+    }
+
+    #[test]
+    fn no_invalid_out_msgs_provenance_err() {
+        let input = ProtocolModel::new(Protocol::new(
+            NameDef::new("Test"),
+            [
+                ActorDef::new("server", ["Server"]),
+                ActorDef::new("client", ["Client"]),
+            ],
+            [
+                Transition::new(
+                    State::new("Client", []),
+                    None,
+                    [State::new("Client", [])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Server", [])),
+                        Message::new("Msg", false, []),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Server", []),
+                    Some(Message::new("Msg", false, [])),
+                    [State::new("Server", [])],
+                    [Dest::new(
+                        DestinationState::Individual(State::new("Client", [])),
+                        // Note that the this transition has no right to give ownership of Client.
+                        Message::new("Msg", true, ["Client"]),
+                    )],
+                ),
+            ],
+        ))
+        .unwrap();
+
+        let result = input.validate();
+
+        assert_err(result, error::msgs::INVALID_OUT_MSG_PROVENANCE);
+    }
 }

+ 3 - 3
crates/btproto/tests/protocol_tests.rs

@@ -121,9 +121,9 @@ fn client_callback() {
         let server = [Listening];
         let worker = [Working];
         let client = [Unregistered, Registered];
-        Unregistered -> Registered, >service(Listening)!Register[Registered];
-        Listening?Register[Registered] -> Listening, Working[Registered];
-        Working[Registered] -> End, >Registered!Completed;
+        Unregistered -> Registered, >service(Listening)!Register[*Registered];
+        Listening?Register[*Registered] -> Listening, Working[*Registered];
+        Working[*Registered] -> End, >Registered!Completed;
         Registered?Completed -> End;
     }
 

+ 28 - 0
crates/btrun/src/lib.rs

@@ -232,6 +232,29 @@ impl Runtime {
         }
     }
 
+    /// Sends `msg` to the owner of `from`.
+    /// 
+    /// If `from` is not the ID of an actor in this runtime, [RuntimeError::BadActorId] is returned.
+    /// 
+    /// If the actor `from` doesn't have an owner, [RuntimeError::NoOwner] is returned.
+    pub async fn send_owner<T: 'static + MsgEnum>(
+        &'static self,
+        from: ActorId,
+        msg: T,
+    ) -> Result<()> {
+        let owner = {
+            let handles = self.handles.read().await;
+            let handle = handles
+                .get(&from)
+                .ok_or_else(|| bterr!(RuntimeError::BadActorId(from)))?;
+            handle
+                .owner()
+                .ok_or_else(|| bterr!(RuntimeError::NoOwner(from)))?
+                .clone()
+        };
+        self.send(owner, self.actor_name(from), msg).await
+    }
+
     fn service_not_registered_err(id: &ServiceId) -> btlib::Error {
         bterr!("Service is not registered: '{id}'")
     }
@@ -485,14 +508,17 @@ pub async fn do_nothing_actor(
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum RuntimeError {
     BadActorName(ActorName),
+    BadActorId(ActorId),
     BadServiceId(ServiceId),
     BadOwnerName(ActorName),
+    NoOwner(ActorId),
 }
 
 impl Display for RuntimeError {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match self {
             Self::BadActorName(name) => write!(f, "bad actor name: {name}"),
+            Self::BadActorId(actor_id) => write!(f, "bad actor ID: {actor_id}"),
             Self::BadServiceId(service_id) => {
                 write!(f, "service ID is not registered: {service_id}")
             }
@@ -500,6 +526,7 @@ impl Display for RuntimeError {
                 f,
                 "Non-existent name {name} can't be used as an actor owner."
             ),
+            Self::NoOwner(actor_id) => write!(f, "no owner exists for actor: {actor_id}"),
         }
     }
 }
@@ -708,6 +735,7 @@ macro_rules! test_setup {
         pub(crate) const LOG_LEVEL: &str = "warn";
 
         #[::ctor::ctor]
+        #[allow(non_snake_case)]
         fn ctor() {
             ::std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
             let mut builder = ::env_logger::Builder::from_default_env();

+ 9 - 2
crates/btrun/src/model.rs

@@ -45,10 +45,12 @@ impl<From, To> TransResult<From, To> {
         match self {
             Self::Ok(to) => to,
             Self::Abort { err, .. } => {
-                panic!("Called `TransResult::unwrap()` on an `Abort` value: {err}")
+                let backtrace = err.as_ref().backtrace();
+                panic!("Called `TransResult::unwrap()` on an `Abort` value: {err}\n{backtrace}")
             }
             Self::Fatal { err, .. } => {
-                panic!("Called `TransResult::unwrap()` on a `Fatal` value: {err}")
+                let backtrace = err.as_ref().backtrace();
+                panic!("Called `TransResult::unwrap()` on a `Fatal` value: {err}\n{backtrace}")
             }
         }
     }
@@ -728,6 +730,11 @@ impl ActorHandle {
         (self.wire_deliverer)(envelope).await
     }
 
+    /// Returns a reference to the name of this actor's owner, if it has one.
+    pub(crate) fn owner(&self) -> Option<&ActorName> {
+        self.owner.as_ref()
+    }
+
     /// Takes the owner name out of this handle.
     pub(crate) fn take_owner(&mut self) -> Option<ActorName> {
         self.owner.take()

+ 23 - 571
crates/btrun/tests/runtime_tests.rs

@@ -1,16 +1,12 @@
-#![feature(impl_trait_in_assoc_type)]
-
 use btrun::model::*;
 use btrun::test_setup;
 use btrun::*;
 
-use btlib::Result;
 use btproto::protocol;
 use log;
-use once_cell::sync::Lazy;
 use serde::{Deserialize, Serialize};
 use std::{
-    future::{ready, Future, Ready},
+    future::{ready, Ready},
     sync::{
         atomic::{AtomicU8, Ordering},
         Arc,
@@ -44,219 +40,6 @@ mod ping_pong {
     // When a state is expecting a Reply message, an error occurs if the message is not received
     // in a timely manner.
 
-    enum PingClientState<T: Client> {
-        Client(T),
-        End(End),
-    }
-
-    impl<T: Client> PingClientState<T> {
-        const fn name(&self) -> &'static str {
-            match self {
-                Self::Client(_) => "Client",
-                Self::End(_) => "End",
-            }
-        }
-    }
-
-    struct ClientHandleManual<T: Client> {
-        state: Option<PingClientState<T>>,
-        client_name: ActorName,
-        runtime: &'static Runtime,
-    }
-
-    impl<T: Client> ClientHandleManual<T> {
-        async fn send_ping(
-            mut self,
-            msg: Ping,
-            service: ServiceAddr,
-        ) -> TransResult<Self, (ClientHandleManual<T>, T::OnSendPingReturn)> {
-            let state = if let Some(state) = self.state.take() {
-                state
-            } else {
-                return TransResult::Abort {
-                    from: self,
-                    err: bterr!("The shared state was not returned."),
-                };
-            };
-            match state {
-                PingClientState::Client(state) => {
-                    let result = self
-                        .runtime
-                        .call_service(
-                            service,
-                            self.client_name.clone(),
-                            PingProtocolMsgs::Ping(msg),
-                        )
-                        .await;
-                    let reply_enum = match result {
-                        Ok(reply_enum) => reply_enum,
-                        Err(err) => {
-                            self.state = Some(PingClientState::Client(state));
-                            return TransResult::Abort { from: self, err };
-                        }
-                    };
-                    if let PingProtocolMsgs::PingReply(reply) = reply_enum {
-                        match state.on_send_ping(reply).await {
-                            TransResult::Ok((new_state, return_var)) => {
-                                self.state = Some(PingClientState::End(new_state));
-                                TransResult::Ok((self, return_var))
-                            }
-                            TransResult::Abort { from, err } => {
-                                self.state = Some(PingClientState::Client(from));
-                                TransResult::Abort { from: self, err }
-                            }
-                            TransResult::Fatal { err } => return TransResult::Fatal { err },
-                        }
-                    } else {
-                        TransResult::Abort {
-                            from: self,
-                            err: bterr!("Unexpected reply type."),
-                        }
-                    }
-                }
-                state => {
-                    let err = bterr!("Can't send Ping in state {}.", state.name());
-                    self.state = Some(state);
-                    TransResult::Abort { from: self, err }
-                }
-            }
-        }
-    }
-
-    async fn spawn_client_manual<T: Client>(
-        init: T,
-        runtime: &'static Runtime,
-    ) -> ClientHandleManual<T> {
-        let state = Some(PingClientState::Client(init));
-        let client_name = runtime.spawn(None, do_nothing_actor).await.unwrap();
-        ClientHandleManual {
-            state,
-            client_name,
-            runtime,
-        }
-    }
-
-    async fn register_server_manual<Init, F>(
-        make_init: F,
-        rt: &'static Runtime,
-        id: ServiceId,
-    ) -> Result<ServiceName>
-    where
-        Init: 'static + Server,
-        F: 'static + Send + Sync + Clone + Fn() -> Init,
-    {
-        enum ServerState<S> {
-            Server(S),
-            End(End),
-        }
-
-        impl<S> Named for ServerState<S> {
-            fn name(&self) -> Arc<String> {
-                static SERVER_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("Server".into()));
-                static END_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("End".into()));
-                match self {
-                    Self::Server(_) => SERVER_NAME.clone(),
-                    Self::End(_) => END_NAME.clone(),
-                }
-            }
-        }
-
-        async fn server_loop<Init, F>(
-            _runtime: &'static Runtime,
-            make_init: F,
-            mut mailbox: Mailbox<PingProtocolMsgs>,
-            actor_id: ActorId,
-        ) -> ActorResult
-        where
-            Init: 'static + Server,
-            F: 'static + Send + Sync + FnOnce() -> Init,
-        {
-            let mut state = ServerState::Server(make_init());
-            while let Some(envelope) = mailbox.recv().await {
-                state = match envelope {
-                    Envelope::Call {
-                        msg,
-                        reply: replier,
-                        ..
-                    } => match (state, msg) {
-                        (ServerState::Server(listening_state), PingProtocolMsgs::Ping(msg)) => {
-                            match listening_state.handle_ping(msg).await {
-                                TransResult::Ok((new_state, reply)) => {
-                                    let replier = replier
-                                        .ok_or_else(|| bterr!("Reply has already been sent."))
-                                        .unwrap();
-                                    if let Err(_) = replier.send(PingProtocolMsgs::PingReply(reply))
-                                    {
-                                        return Err(ActorError::new(
-                                            bterr!("Failed to send Ping reply."),
-                                            ActorErrorPayload {
-                                                actor_id,
-                                                actor_impl: Init::actor_impl(),
-                                                state: Init::state_name(),
-                                                message: PingProtocolMsgKinds::Ping.name(),
-                                                kind: TransKind::Receive,
-                                            },
-                                        ));
-                                    }
-                                    ServerState::End(new_state)
-                                }
-                                TransResult::Abort { from, err } => {
-                                    log::warn!("Aborted transition from the {} while handling the {} message: {}", "Server", "Ping", err);
-                                    ServerState::Server(from)
-                                }
-                                TransResult::Fatal { err } => {
-                                    return Err(ActorError::new(
-                                        err,
-                                        ActorErrorPayload {
-                                            actor_id,
-                                            actor_impl: Init::actor_impl(),
-                                            state: Init::state_name(),
-                                            message: PingProtocolMsgKinds::Ping.name(),
-                                            kind: TransKind::Receive,
-                                        },
-                                    ));
-                                }
-                            }
-                        }
-                        (state, _) => state,
-                    },
-                    envelope => {
-                        return Err(ActorError::new(
-                            bterr!("Unexpected envelope type: {}", envelope.name()),
-                            ActorErrorPayload {
-                                actor_id,
-                                actor_impl: Init::actor_impl(),
-                                state: state.name(),
-                                message: envelope.msg_name(),
-                                kind: TransKind::Receive,
-                            },
-                        ))
-                    }
-                };
-
-                if let ServerState::End(_) = state {
-                    break;
-                }
-            }
-            Ok(actor_id)
-        }
-
-        rt.register::<PingProtocolMsgs, _>(id, move |runtime| {
-            let make_init = make_init.clone();
-            let fut = async move {
-                let actor_impl = runtime
-                    .spawn(None, move |mailbox, act_id, runtime| {
-                        server_loop(runtime, make_init, mailbox, act_id)
-                    })
-                    .await
-                    .unwrap();
-                Ok(actor_impl)
-            };
-            Box::pin(fut)
-        })
-        .await
-    }
-
     #[derive(Serialize, Deserialize)]
     pub struct Ping;
     impl CallMsg for Ping {
@@ -281,7 +64,7 @@ mod ping_pong {
         actor_name!("ping_client");
 
         type OnSendPingReturn = ();
-        type OnSendPingFut = impl Future<Output = TransResult<Self, (End, ())>>;
+        type OnSendPingFut = Ready<TransResult<Self, (End, ())>>;
         fn on_send_ping(self, _msg: PingReply) -> Self::OnSendPingFut {
             self.counter.fetch_sub(1, Ordering::SeqCst);
             ready(TransResult::Ok((End, ())))
@@ -294,7 +77,8 @@ mod ping_pong {
 
     impl ServerState {
         fn new(counter: Arc<AtomicU8>) -> Self {
-            counter.fetch_add(1, Ordering::SeqCst);
+            let count = counter.fetch_add(1, Ordering::SeqCst);
+            log::info!("New service provider started. Count is now '{count}'.");
             Self { counter }
         }
     }
@@ -302,7 +86,7 @@ mod ping_pong {
     impl Server for ServerState {
         actor_name!("ping_server");
 
-        type HandlePingFut = impl Future<Output = TransResult<Self, (End, PingReply)>>;
+        type HandlePingFut = Ready<TransResult<Self, (End, PingReply)>>;
         fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
             self.counter.fetch_sub(1, Ordering::SeqCst);
             ready(TransResult::Ok((End, PingReply)))
@@ -321,14 +105,13 @@ mod ping_pong {
                     let server_counter = service_counter.clone();
                     ServerState::new(server_counter)
                 };
-                register_server_manual(make_init, &RUNTIME, service_id.clone())
+                register_server(&RUNTIME, service_id.clone(), make_init)
                     .await
                     .unwrap()
             };
-            let client_handle =
-                spawn_client_manual(ClientImpl::new(counter.clone()), &RUNTIME).await;
+            let client_handle = spawn_client(ClientImpl::new(counter.clone()), &RUNTIME).await;
             let service_addr = ServiceAddr::new(service_name, true);
-            client_handle.send_ping(Ping, service_addr).await.unwrap();
+            client_handle.send_ping(service_addr, Ping).await.unwrap();
 
             assert_eq!(0, counter.load(Ordering::SeqCst));
         });
@@ -377,14 +160,11 @@ mod travel_agency {
     }
 }
 
-#[allow(dead_code)]
 mod client_callback {
-
     use super::*;
 
     use btlib::bterr;
-    use once_cell::sync::Lazy;
-    use std::{marker::PhantomData, panic::panic_any, time::Duration};
+    use std::time::Duration;
     use tokio::{sync::oneshot, time::timeout};
 
     #[derive(Serialize, Deserialize)]
@@ -402,9 +182,9 @@ mod client_callback {
         let server = [Listening];
         let worker = [Working];
         let client = [Unregistered, Registered];
-        Unregistered -> Registered, >service(Listening)!Register[Registered];
-        Listening?Register[Registered] -> Listening, Working[Registered];
-        Working[Registered] -> End, >Registered!Completed;
+        Unregistered -> Registered, >service(Listening)!Register[*Registered];
+        Listening?Register[*Registered] -> Listening, Working[*Registered];
+        Working[*Registered] -> End, >Registered!Completed;
         Registered?Completed -> End;
     }
 
@@ -441,7 +221,7 @@ mod client_callback {
     }
 
     struct ListeningState {
-        multiple: usize,
+        multiply_by: usize,
     }
 
     impl Listening for ListeningState {
@@ -451,7 +231,7 @@ mod client_callback {
         type HandleRegisterWorking = WorkingState;
         type HandleRegisterFut = Ready<TransResult<Self, (ListeningState, WorkingState)>>;
         fn handle_register(self, arg: Register) -> Self::HandleRegisterFut {
-            let multiple = self.multiple;
+            let multiple = self.multiply_by;
             ready(TransResult::Ok((
                 self,
                 WorkingState {
@@ -477,354 +257,26 @@ mod client_callback {
         }
     }
 
-    use ::tokio::sync::Mutex;
-
-    enum ClientStateManual<Init: Unregistered> {
-        Unregistered(Init),
-        Registered(Init::OnSendRegisterRegistered),
-        End(End),
-    }
-
-    impl<Init: Unregistered> Named for ClientStateManual<Init> {
-        fn name(&self) -> Arc<String> {
-            static UNREGISTERED_NAME: Lazy<Arc<String>> =
-                Lazy::new(|| Arc::new("Unregistered".into()));
-            static REGISTERED_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("Registered".into()));
-            static END_NAME: Lazy<Arc<String>> = Lazy::new(|| Arc::new("End".into()));
-            match self {
-                Self::Unregistered(_) => UNREGISTERED_NAME.clone(),
-                Self::Registered(_) => REGISTERED_NAME.clone(),
-                Self::End(_) => END_NAME.clone(),
-            }
-        }
-    }
-
-    struct ClientHandleManual<Init: Unregistered, State> {
-        runtime: &'static Runtime,
-        state: Arc<Mutex<Option<ClientStateManual<Init>>>>,
-        name: ActorName,
-        type_state: PhantomData<State>,
-    }
-
-    impl<Init: Unregistered, State> ClientHandleManual<Init, State> {
-        fn new_type<NewState>(self) -> ClientHandleManual<Init, NewState> {
-            ClientHandleManual {
-                runtime: self.runtime,
-                state: self.state,
-                name: self.name,
-                type_state: PhantomData,
-            }
-        }
-    }
-
-    impl<
-            Init: Unregistered,
-            State: Unregistered<OnSendRegisterRegistered = NewState>,
-            NewState: Registered,
-        > ClientHandleManual<Init, State>
-    {
-        async fn send_register(
-            self,
-            to: ServiceAddr,
-            msg: Register,
-        ) -> TransResult<
-            Self,
-            (
-                ClientHandleManual<Init, NewState>,
-                Init::OnSendRegisterReturn,
-            ),
-        > {
-            let mut guard = self.state.lock().await;
-            let state = guard
-                .take()
-                .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
-            match state {
-                ClientStateManual::Unregistered(state) => match state.on_send_register().await {
-                    TransResult::Ok((new_state, return_var)) => {
-                        let msg = ClientCallbackMsgs::Register(msg);
-                        let result = self.runtime.send_service(to, self.name.clone(), msg).await;
-                        if let Err(err) = result {
-                            return TransResult::Fatal { err };
-                        }
-                        *guard = Some(ClientStateManual::Registered(new_state));
-                        drop(guard);
-                        TransResult::Ok((self.new_type(), return_var))
-                    }
-                    TransResult::Abort { from, err } => {
-                        *guard = Some(ClientStateManual::Unregistered(from));
-                        drop(guard);
-                        return TransResult::Abort { from: self, err };
-                    }
-                    TransResult::Fatal { err } => {
-                        return TransResult::Fatal { err };
-                    }
-                },
-                state => {
-                    let name = state.name();
-                    *guard = Some(state);
-                    drop(guard);
-                    TransResult::Abort {
-                        from: self,
-                        err: bterr!(
-                            "Unexpected state '{}' for '{}' method.",
-                            name,
-                            "send_register"
-                        ),
-                    }
-                }
-            }
-        }
-    }
-
-    async fn spawn_client_manual<Init>(
-        init: Init,
-        runtime: &'static Runtime,
-    ) -> ClientHandleManual<Init, Init>
-    where
-        Init: 'static + Unregistered,
-    {
-        let state = Arc::new(Mutex::new(Some(ClientStateManual::Unregistered(init))));
-        let name = {
-            let state = state.clone();
-            runtime.spawn(None, move |mut mailbox, actor_id, _| async move {
-                while let Some(envelope) = mailbox.recv().await {
-                    let mut guard = state.lock().await;
-                    let state = guard.take()
-                        .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
-                    let new_state = match envelope {
-                        Envelope::Send { msg, .. } => {
-                            match (state, msg) {
-                                (ClientStateManual::Registered(curr_state), ClientCallbackMsgs::Completed(msg)) => {
-                                    match curr_state.handle_completed(msg).await {
-                                        TransResult::Ok(next) =>  ClientStateManual::<Init>::End(next),
-                                        TransResult::Abort { from, err } => {
-                                            log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Registered", "Completed", err);
-                                            ClientStateManual::Registered(from)
-                                        }
-                                        TransResult::Fatal { err } => {
-                                            panic_any(ActorError::new(
-                                                err,
-                                                ActorErrorPayload {
-                                                actor_id,
-                                                actor_impl: Init::actor_impl(),
-                                                state: Init::OnSendRegisterRegistered::state_name(),
-                                                message: ClientCallbackMsgKinds::Completed.name(),
-                                                kind: TransKind::Receive,
-                                            }));
-                                        }
-                                    }
-                                }
-                                (state, msg) => {
-                                    log::error!("Unexpected message {} in state {}.", msg.name(), state.name());
-                                    state
-                                }
-                            }
-                        }
-                        envelope => return Err(ActorError::new(
-                            bterr!("Unexpected envelope type: {}", envelope.name()),
-                            ActorErrorPayload {
-                            actor_id,
-                            actor_impl: Init::actor_impl(),
-                            state: state.name(),
-                            message: envelope.msg_name(),
-                            kind: TransKind::Receive,
-                        }))
-                    };
-                    *guard = Some(new_state);
-                    if let Some(state) = &*guard {
-                        if let ClientStateManual::End(_) = state {
-                            break;
-                        }
-                    }
-                }
-                Ok(actor_id)
-            }).await.unwrap()
-        };
-        ClientHandleManual {
-            runtime,
-            state,
-            name,
-            type_state: PhantomData,
-        }
-    }
-
-    async fn register_server_manual<Init, F>(
-        make_init: F,
-        runtime: &'static Runtime,
-        service_id: ServiceId,
-    ) -> Result<ServiceName>
-    where
-        Init: 'static + Listening<HandleRegisterListening = Init>,
-        F: 'static + Send + Sync + Clone + Fn() -> Init,
-    {
-        enum ServerState<Init: Listening> {
-            Listening(Init),
-        }
-
-        impl<S: Listening> Named for ServerState<S> {
-            fn name(&self) -> Arc<String> {
-                static LISTENING_NAME: Lazy<Arc<String>> =
-                    Lazy::new(|| Arc::new("Listening".into()));
-                match self {
-                    Self::Listening(_) => LISTENING_NAME.clone(),
-                }
-            }
-        }
-
-        async fn server_loop<Init, F>(
-            runtime: &'static Runtime,
-            make_init: F,
-            mut mailbox: Mailbox<ClientCallbackMsgs>,
-            actor_id: ActorId,
-        ) -> ActorResult
-        where
-            Init: 'static + Listening<HandleRegisterListening = Init>,
-            F: 'static + Send + Sync + Fn() -> Init,
-        {
-            let mut state = ServerState::Listening(make_init());
-            while let Some(envelope) = mailbox.recv().await {
-                let new_state = match envelope {
-                    Envelope::Send { msg, from, .. } => match (state, msg) {
-                        (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
-                            match curr_state.handle_register(msg).await {
-                                TransResult::Ok((new_state, working_state)) => {
-                                    start_worker_manual(working_state, from, runtime).await;
-                                    ServerState::Listening(new_state)
-                                }
-                                TransResult::Abort { from, err } => {
-                                    log::warn!("Aborted transition from the {} state while handling the {} message: {}", "Listening", "Register", err);
-                                    ServerState::Listening(from)
-                                }
-                                TransResult::Fatal { err } => {
-                                    let err = ActorError::new(
-                                        err,
-                                        ActorErrorPayload {
-                                            actor_id,
-                                            actor_impl: Init::actor_impl(),
-                                            state: Init::state_name(),
-                                            message: ClientCallbackMsgKinds::Register.name(),
-                                            kind: TransKind::Receive,
-                                        },
-                                    );
-                                    panic_any(format!("{err}"));
-                                }
-                            }
-                        }
-                        (state, msg) => {
-                            log::error!(
-                                "Unexpected message {} in state {}.",
-                                msg.name(),
-                                state.name()
-                            );
-                            state
-                        }
-                    },
-                    envelope => {
-                        return Err(ActorError::new(
-                            bterr!("Unexpected envelope type: {}", envelope.name()),
-                            ActorErrorPayload {
-                                actor_id,
-                                actor_impl: Init::actor_impl(),
-                                state: state.name(),
-                                message: envelope.msg_name(),
-                                kind: TransKind::Receive,
-                            },
-                        ))
-                    }
-                };
-                state = new_state;
-            }
-            Ok(actor_id)
-        }
-
-        runtime
-            .register::<ClientCallbackMsgs, _>(service_id, move |runtime: &'static Runtime| {
-                let make_init = make_init.clone();
-                let fut = async move {
-                    let make_init = make_init.clone();
-                    let actor_impl = runtime
-                        .spawn(None, move |mailbox, act_id, runtime| {
-                            server_loop(runtime, make_init, mailbox, act_id)
-                        })
-                        .await
-                        .unwrap();
-                    Ok(actor_impl)
-                };
-                Box::pin(fut)
-            })
-            .await
-    }
-
-    async fn start_worker_manual<Init>(
-        init: Init,
-        owned: ActorName,
-        runtime: &'static Runtime,
-    ) -> ActorName
-    where
-        Init: 'static + Working,
-    {
-        enum WorkerState<S: Working> {
-            Working(S),
-        }
-
-        runtime
-            .spawn::<ClientCallbackMsgs, _, _>(
-                Some(owned.clone()),
-                move |_, actor_id, _| async move {
-                    let msg = match init.on_send_completed().await {
-                        TransResult::Ok((End, msg)) => msg,
-                        TransResult::Abort { err, .. } | TransResult::Fatal { err } => {
-                            let err = ActorError::new(
-                                err,
-                                ActorErrorPayload {
-                                    actor_id,
-                                    actor_impl: Init::actor_impl(),
-                                    state: Init::state_name(),
-                                    message: ClientCallbackMsgKinds::Completed.name(),
-                                    kind: TransKind::Send,
-                                },
-                            );
-                            panic_any(format!("{err}"))
-                        }
-                    };
-                    let from = runtime.actor_name(actor_id);
-                    let msg = ClientCallbackMsgs::Completed(msg);
-                    runtime.send(owned, from, msg).await.unwrap_or_else(|err| {
-                        let err = ActorError::new(
-                            err,
-                            ActorErrorPayload {
-                                actor_id,
-                                actor_impl: Init::actor_impl(),
-                                state: Init::state_name(),
-                                message: ClientCallbackMsgKinds::Completed.name(),
-                                kind: TransKind::Send,
-                            },
-                        );
-                        panic_any(format!("{err}"));
-                    });
-                    Ok(actor_id)
-                },
-            )
-            .await
-            .unwrap()
-    }
-
     #[test]
     fn client_callback_protocol() {
         ASYNC_RT.block_on(async {
             const SERVICE_ID: &str = "ClientCallbackProtocolListening";
+            let factor = 21usize;
+            let multiply_by = 2usize;
+            let expected = multiply_by * factor;
             let service_id = ServiceId::from(SERVICE_ID);
+
             let service_name = {
-                let make_init = move || ListeningState { multiple: 2 };
-                register_server_manual(make_init, &RUNTIME, service_id.clone())
+                let make_init = move || ListeningState { multiply_by };
+                register_server(&RUNTIME, service_id.clone(), make_init)
                     .await
                     .unwrap()
             };
             let (sender, receiver) = oneshot::channel();
-            let client_handle = spawn_client_manual(UnregisteredState { sender }, &RUNTIME).await;
+            let client_handle = spawn_client(UnregisteredState { sender }, &RUNTIME).await;
             let service_addr = ServiceAddr::new(service_name, false);
             client_handle
-                .send_register(service_addr, Register { factor: 21 })
+                .send_register(service_addr, Register { factor })
                 .await
                 .unwrap();
             let value = timeout(Duration::from_millis(500), receiver)
@@ -832,7 +284,7 @@ mod client_callback {
                 .unwrap()
                 .unwrap();
 
-            assert_eq!(42, value);
+            assert_eq!(expected, value);
         });
     }
 }