Переглянути джерело

Changed the determination of client actors.

Matthew Carr 1 рік тому
батько
коміт
07ef48cc6c

+ 3 - 6
crates/btfs/src/lib.rs

@@ -26,17 +26,14 @@ protocol! {
     named FsProtocol;
     let server = [Listening];
     let client = [Client];
-    let file = [FileInit, Opened];
+    let file = [Opened];
     let file_handle = [FileHandle];
 
     Client -> Client, >service(Listening)!Query;
     Listening?Query -> Listening, >Client!Query::Reply;
-    Client?Query::Reply -> Client;
 
-    Client -> Client, FileHandle[FileInit], >service(Listening)!Open;
-    Listening?Open -> Listening, FileInit, >Client!Open::Reply[FileInit], FileInit!Open;
-
-    FileInit?Open -> Opened;
+    Client -> Client, FileHandle[Opened], >service(Listening)!Open;
+    Listening?Open -> Listening, Opened, >Client!Open::Reply[Opened];
 
     FileHandle[Opened] -> FileHandle[Opened], >Opened!FileOp;
     Opened?FileOp -> Opened, >FileHandle!FileOp::Reply;

+ 0 - 2
crates/btproto/src/error.rs

@@ -85,8 +85,6 @@ pub(crate) mod msgs {
         "Replies can only be used in transitions which handle messages.";
     pub(crate) const MULTIPLE_REPLIES: &str =
         "Only a single reply can be sent in response to any message.";
-    pub(crate) const CLIENT_RECEIVED_NON_REPLY: &str =
-        "This client actor receives a message which is not a reply.";
     pub(crate) const UNOBSERVABLE_STATE: &str =
         "This client state is not allowed because it only receives replies. Such a state cannot be observed.";
 }

+ 35 - 5
crates/btproto/src/generation.rs

@@ -12,13 +12,43 @@ impl ToTokens for ProtocolModel {
 
 impl ProtocolModel {
     fn generate_message_enum(&self) -> TokenStream {
-        let variants = self.msg_lookup().msg_iter().map(|msg| msg.msg_name());
-        let msg_types = self.msg_lookup().msg_iter().map(|msg| msg.msg_type());
+        let msg_lookup = self.msg_lookup();
+        let get_variants = || msg_lookup.msg_iter().map(|msg| msg.msg_name());
+        let variants0 = get_variants();
+        let variants1 = get_variants();
+        let variant_names = get_variants().map(|variant| variant.to_string());
+        let msg_types = msg_lookup.msg_iter().map(|msg| msg.msg_type());
+        let all_replies = msg_lookup.msg_iter().all(|msg| msg.is_reply());
         let enum_name = format_ident!("{}Msgs", self.def().name_def.name);
+        let send_impl = if all_replies {
+            quote! {}
+        } else {
+            quote! {
+                impl ::btrun::SendMsg for #enum_name {}
+            }
+        };
+        let proto_name = &self.def().name_def.name;
+        let doc_comment = format!("Message type for the {proto_name} protocol.");
         quote! {
+            #[doc = #doc_comment]
+            #[derive(::serde::Serialize, ::serde::Deserialize)]
             pub enum #enum_name {
-                #( #variants(#msg_types) ),*
+                #( #variants0(#msg_types) ),*
+            }
+
+            impl #enum_name {
+                pub fn name(&self) -> &'static str {
+                    match self {
+                        #( Self::#variants1(_) => #variant_names),*
+                    }
+                }
+            }
+
+            impl ::btrun::CallMsg for #enum_name {
+                type Reply = Self;
             }
+
+            #send_impl
         }
     }
 
@@ -30,7 +60,7 @@ impl ProtocolModel {
         for (trait_ident, methods) in traits {
             let method_tokens = methods.map(|x| x.generate_tokens());
             quote! {
-                pub trait #trait_ident {
+                pub trait #trait_ident : Send + Sync {
                     #( #method_tokens )*
                 }
             }
@@ -50,7 +80,7 @@ impl MethodModel {
         let future_name = self.future();
         quote! {
             #( #output_decls )*
-            type #future_name: ::std::future::Future<Output = Result<( #( #output_types ),* )>>;
+            type #future_name: Send + ::std::future::Future<Output = Result<( #( #output_types ),* )>>;
             fn #method_ident(self #( , #msg_args )*) -> Self::#future_name;
         }
     }

+ 257 - 19
crates/btproto/src/model.rs

@@ -23,8 +23,47 @@ pub(crate) struct ProtocolModel {
 
 impl ProtocolModel {
     pub(crate) fn new(def: Protocol) -> syn::Result<Self> {
-        let actor_lookup = ActorLookup::new(def.actor_defs.iter().map(|x| x.as_ref()));
-        let msg_lookup = MsgLookup::new(def.transitions.iter().map(|x| x.as_ref()));
+        let get_transitions = || def.transitions.iter().map(|x| x.as_ref());
+        let actor_lookup =
+            ActorLookup::new(def.actor_defs.iter().map(|x| x.as_ref()), get_transitions())?;
+        let mut is_client = HashMap::<Rc<Ident>, bool>::new();
+        // First mark all actors as not clients.
+        for actor_def in def.actor_defs.iter() {
+            is_client.insert(actor_def.actor.clone(), false);
+        }
+        // For every actor which is not spawned by another actor, mark it as a client if its
+        // initial state receives no messages, then mark all of the actors it spawns as clients.
+        for actor_def in def.actor_defs.iter() {
+            if !actor_lookup.parents(actor_def.actor.as_ref()).is_empty() {
+                continue;
+            }
+            let init_state = actor_def.states.as_ref().first().unwrap();
+            let init_state_receives_no_msgs = def
+                .transitions
+                .iter()
+                .filter(|transition| {
+                    transition.in_state.state_trait.as_ref() == init_state.as_ref()
+                })
+                .all(|transition| transition.in_msg().is_none());
+            if init_state_receives_no_msgs {
+                mark_all_progeny(&actor_lookup, &mut is_client, actor_def.actor.as_ref());
+            }
+        }
+
+        fn mark_all_progeny(
+            actor_lookup: &ActorLookup,
+            is_client: &mut HashMap<Rc<Ident>, bool>,
+            actor_name: &Ident,
+        ) {
+            *is_client.get_mut(actor_name).unwrap() = true;
+            let children = actor_lookup.children(actor_name);
+            for child in children {
+                *is_client.get_mut(child).unwrap() = true;
+                mark_all_progeny(actor_lookup, is_client, child.as_ref());
+            }
+        }
+
+        let msg_lookup = MsgLookup::new(get_transitions());
         let mut actors = HashMap::new();
         for actor_def in def.actor_defs.iter() {
             let actor_name = &actor_def.actor;
@@ -39,7 +78,12 @@ impl ProtocolModel {
                     .cloned();
                 (state_name.clone(), transitions)
             });
-            let actor = ActorModel::new(actor_def.clone(), &msg_lookup, transitions_by_state)?;
+            let actor = ActorModel::new(
+                actor_def.clone(),
+                &msg_lookup,
+                *is_client.get(&actor_def.actor).unwrap(),
+                transitions_by_state,
+            )?;
             actors.insert(actor_name.clone(), actor);
         }
         Ok(Self {
@@ -81,12 +125,22 @@ impl ProtocolModel {
 pub(crate) struct ActorModel {
     #[allow(dead_code)]
     def: Rc<ActorDef>,
+    /// Indicates if this actor is a client.
+    ///
+    /// A client is an actor which is not spawned by another actor (i.e. has no parents) and
+    /// whose initial state has no transition which receives a message, or which is spawned by a
+    /// client.
     is_client: bool,
     states: HashMap<Rc<Ident>, StateModel>,
 }
 
 impl ActorModel {
-    fn new<S, T>(def: Rc<ActorDef>, messages: &MsgLookup, state_iter: S) -> syn::Result<Self>
+    fn new<S, T>(
+        def: Rc<ActorDef>,
+        messages: &MsgLookup,
+        is_client: bool,
+        state_iter: S,
+    ) -> syn::Result<Self>
     where
         S: IntoIterator<Item = (Rc<Ident>, T)>,
         T: IntoIterator<Item = Rc<Transition>>,
@@ -95,10 +149,6 @@ impl ActorModel {
             .into_iter()
             .map(|(name, transitions)| (name, transitions.into_iter().collect()))
             .collect();
-        let is_client = transitions
-            .values()
-            .flatten()
-            .any(|transition| transition.is_client());
         let mut states = HashMap::new();
         for (name, transitions) in transitions.into_iter() {
             let state = StateModel::new(name.clone(), messages, transitions, is_client)?;
@@ -209,18 +259,24 @@ impl MethodModel {
                 msg_names.push('_');
                 msg_names.push_str(dest.msg.variant().pascal_to_snake().as_str());
             }
-            format_ident!("send_{msg_names}")
+            format_ident!("on_send_{msg_names}")
         };
         Ok(name)
     }
 
     fn new_inputs(def: &Transition, messages: &MsgLookup, part_of_client: bool) -> Vec<InputModel> {
         let mut inputs = Vec::new();
+        let arg_kind = if def.is_client() {
+            InputKind::ByMutRef
+        } else {
+            InputKind::ByValue
+        };
         if let Some(in_msg) = def.in_msg() {
             let msg_info = messages.lookup(in_msg);
             inputs.push(InputModel::new(
                 msg_info.msg_name().clone(),
                 msg_info.msg_type.clone(),
+                arg_kind,
             ))
         }
         if part_of_client {
@@ -229,6 +285,7 @@ impl MethodModel {
                 inputs.push(InputModel::new(
                     msg_info.msg_name().clone(),
                     msg_info.msg_type.clone(),
+                    arg_kind,
                 ))
             }
         }
@@ -290,16 +347,30 @@ impl GetSpan for MethodModel {
     }
 }
 
+#[cfg_attr(test, derive(Debug))]
+#[derive(Clone)]
+enum InputKind {
+    ByValue,
+    ByMutRef,
+}
+
+impl Copy for InputKind {}
+
 #[cfg_attr(test, derive(Debug))]
 pub(crate) struct InputModel {
     name: Ident,
     arg_type: Rc<TokenStream>,
+    arg_kind: InputKind,
 }
 
 impl InputModel {
-    fn new(type_name: Rc<Ident>, arg_type: Rc<TokenStream>) -> Self {
+    fn new(type_name: Rc<Ident>, arg_type: Rc<TokenStream>, arg_kind: InputKind) -> Self {
         let name = format_ident!("{}_arg", type_name.to_string().pascal_to_snake());
-        Self { name, arg_type }
+        Self {
+            name,
+            arg_type,
+            arg_kind,
+        }
     }
 }
 
@@ -307,7 +378,11 @@ impl ToTokens for InputModel {
     fn to_tokens(&self, tokens: &mut TokenStream) {
         let name = &self.name;
         let arg_type = self.arg_type.as_ref();
-        tokens.extend(quote! { #name : #arg_type })
+        let modifier = match self.arg_kind {
+            InputKind::ByValue => quote! {},
+            InputKind::ByMutRef => quote! { &mut },
+        };
+        tokens.extend(quote! { #name : #modifier #arg_type })
     }
 }
 
@@ -385,30 +460,94 @@ pub(crate) enum OutputKind {
     },
 }
 
+/// A type used to query information about actors, states, and their relationships.
 pub(crate) struct ActorLookup {
+    /// A map from an actor name to the set of states which are part of that actor.
     actor_states: HashMap<Rc<Ident>, HashSet<Rc<Ident>>>,
+    #[allow(dead_code)]
+    /// A map from a state name to the actor name which that state is a part of.
+    actors_by_state: HashMap<Rc<Ident>, Rc<Ident>>,
+    /// A map from an actor name to the set of actor names which spawn it.
+    parents: HashMap<Rc<Ident>, HashSet<Rc<Ident>>>,
+    /// A map from an actor name to the set of actors names which it spawns.
+    children: HashMap<Rc<Ident>, HashSet<Rc<Ident>>>,
 }
 
 impl ActorLookup {
-    fn new<'a>(actor_defs: impl IntoIterator<Item = &'a ActorDef>) -> Self {
+    fn new<'a, A, T>(actor_defs: A, transitions: T) -> syn::Result<Self>
+    where
+        A: IntoIterator<Item = &'a ActorDef>,
+        T: IntoIterator<Item = &'a Transition>,
+    {
         let mut actor_states = HashMap::new();
-        for actor_def in actor_defs.into_iter() {
+        let mut actors_by_state = HashMap::new();
+        let mut parents = HashMap::new();
+        let mut children = HashMap::new();
+        for actor_def in actor_defs {
             let mut states = HashSet::new();
             for state in actor_def.states.as_ref().iter() {
                 states.insert(state.clone());
+                actors_by_state.insert(state.clone(), actor_def.actor.clone());
+            }
+            let actor_name = &actor_def.actor;
+            actor_states.insert(actor_name.clone(), states);
+            parents.insert(actor_name.clone(), HashSet::new());
+            children.insert(actor_name.clone(), HashSet::new());
+        }
+
+        for transition in transitions {
+            let in_state = transition.in_state.state_trait.as_ref();
+            let parent = actors_by_state
+                .get(in_state)
+                .ok_or_else(|| syn::Error::new(in_state.span(), error::msgs::UNDECLARED_STATE))?;
+            // The first output state is skipped because the current actor is transitioning to it,
+            // its not creating a new actor.
+            for out_state in transition.out_states.as_ref().iter().skip(1) {
+                let out_state = out_state.state_trait.as_ref();
+                let child = actors_by_state.get(out_state).ok_or_else(|| {
+                    syn::Error::new(out_state.span(), error::msgs::UNDECLARED_STATE)
+                })?;
+                parents
+                    .entry(child.clone())
+                    .or_insert_with(HashSet::new)
+                    .insert(parent.clone());
+                children
+                    .entry(parent.clone())
+                    .or_insert_with(HashSet::new)
+                    .insert(child.clone());
             }
-            actor_states.insert(actor_def.actor.clone(), states);
         }
-        Self { actor_states }
+
+        Ok(Self {
+            actor_states,
+            actors_by_state,
+            parents,
+            children,
+        })
     }
 
+    const UNKNOWN_ACTOR_ERR: &str =
+        "Unknown actor. This indicates there is a bug in the btproto crate.";
+
     /// Returns the set of states associated with the given actor.
     ///
     /// This method **panics** if you call it with a non-existent actor name.
     pub(crate) fn actor_states(&self, actor_name: &Ident) -> &HashSet<Rc<Ident>> {
-        self.actor_states.get(actor_name).unwrap_or_else(|| {
-            panic!("Unknown actor. This indicates there is a bug in the btproto crate.")
-        })
+        self.actor_states
+            .get(actor_name)
+            .unwrap_or_else(|| panic!("actor_states: {}", Self::UNKNOWN_ACTOR_ERR))
+    }
+
+    pub(crate) fn parents(&self, actor_name: &Ident) -> &HashSet<Rc<Ident>> {
+        self.parents
+            .get(actor_name)
+            .unwrap_or_else(|| panic!("parents: {}", Self::UNKNOWN_ACTOR_ERR))
+    }
+
+    pub(crate) fn children(&self, actor_name: &Ident) -> &HashSet<Rc<Ident>> {
+        self.children
+            .get(actor_name)
+            .unwrap_or_else(|| panic!("children: {}", Self::UNKNOWN_ACTOR_ERR))
     }
 }
 
@@ -767,4 +906,103 @@ mod tests {
         assert_eq!(1, outputs.iter().filter(|is_reply| **is_reply).count());
         assert_eq!(1, outputs.iter().filter(|is_reply| !*is_reply).count());
     }
+
+    fn simple_client_server_proto() -> Protocol {
+        Protocol::new(
+            NameDef::new("IsClientTest"),
+            [
+                ActorDef::new("server", ["Server"]),
+                ActorDef::new("client", ["Client"]),
+            ],
+            [
+                Transition::new(
+                    State::new("Client", []),
+                    None,
+                    [State::new("End", [])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Server", [])),
+                        Message::new("Msg", false, []),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Server", []),
+                    Some(Message::new("Msg", false, [])),
+                    [State::new("End", [])],
+                    [],
+                ),
+            ],
+        )
+    }
+
+    #[test]
+    fn is_client_false_for_server() {
+        let input = simple_client_server_proto();
+
+        let actual = ProtocolModel::new(input).unwrap();
+
+        let server = actual.actors.get(&format_ident!("server")).unwrap();
+        assert!(!server.is_client());
+    }
+
+    #[test]
+    fn is_client_true_for_client() {
+        let input = simple_client_server_proto();
+
+        let actual = ProtocolModel::new(input).unwrap();
+
+        let client = actual.actors.get(&format_ident!("client")).unwrap();
+        assert!(client.is_client());
+    }
+
+    #[test]
+    fn is_client_false_for_worker() {
+        let input = Protocol::new(
+            NameDef::new("IsClientTest"),
+            [
+                ActorDef::new("server", ["Listening"]),
+                ActorDef::new("worker", ["Working"]),
+                ActorDef::new("client", ["Unregistered", "Registered"]),
+            ],
+            [
+                Transition::new(
+                    State::new("Unregistered", []),
+                    None,
+                    [State::new("Registered", [])],
+                    [Dest::new(
+                        DestinationState::Service(State::new("Listening", [])),
+                        Message::new("Register", false, ["Registered"]),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Listening", []),
+                    Some(Message::new("Register", false, ["Registered"])),
+                    [
+                        State::new("Listening", []),
+                        State::new("Working", ["Registered"]),
+                    ],
+                    [],
+                ),
+                Transition::new(
+                    State::new("Working", ["Registered"]),
+                    None,
+                    [State::new("End", [])],
+                    [Dest::new(
+                        DestinationState::Individual(State::new("Registered", [])),
+                        Message::new("Completed", false, []),
+                    )],
+                ),
+                Transition::new(
+                    State::new("Registered", []),
+                    Some(Message::new("Completed", false, [])),
+                    [State::new("End", [])],
+                    [],
+                ),
+            ],
+        );
+
+        let actual = ProtocolModel::new(input).unwrap();
+
+        let worker = actual.actors.get(&format_ident!("worker")).unwrap();
+        assert!(!worker.is_client());
+    }
 }

+ 117 - 62
crates/btproto/src/parsing.rs

@@ -222,12 +222,16 @@ pub(crate) struct ActorDef {
 
 #[cfg(test)]
 impl ActorDef {
-    pub(crate) fn new(actor: &str, state_names: impl IntoIterator<Item = &'static str>) -> Self {
+    pub(crate) fn new<T, I>(actor: &str, state_names: T) -> Self
+    where
+        T: IntoIterator<IntoIter = I>,
+        I: ExactSizeIterator<Item = &'static str>,
+    {
         Self {
             let_token: Token![let](Span::call_site()),
             actor: new_ident(actor).into(),
             eq_token: Token![=](Span::call_site()),
-            states: IdentArray::new(state_names),
+            states: IdentArray::new(state_names).unwrap(),
         }
     }
 }
@@ -250,7 +254,7 @@ impl GetSpan for ActorDef {
             .span()
             .left_join(self.actor.span())
             .left_join(self.eq_token.span())
-            .left_join(&self.states)
+            .left_join(self.states.span())
     }
 }
 
@@ -263,25 +267,26 @@ pub(crate) struct IdentArray {
 
 impl IdentArray {
     const EMPTY_ERR: &str = "at least one state is required";
-
-    fn empty() -> Self {
-        Self {
-            bracket: Bracket::default(),
-            idents: Punctuated::new(),
-        }
-    }
 }
 
 #[cfg(test)]
 impl IdentArray {
-    pub(crate) fn new(state_names: impl IntoIterator<Item = &'static str>) -> Self {
-        Self {
-            bracket: Bracket::default(),
-            idents: state_names
-                .into_iter()
-                .map(new_ident)
-                .map(Rc::new)
-                .collect(),
+    pub(crate) fn new<T, I>(state_names: T) -> Option<Self>
+    where
+        T: IntoIterator<IntoIter = I>,
+        I: ExactSizeIterator<Item = &'static str>,
+    {
+        let state_names = state_names.into_iter();
+        if state_names.len() > 0 {
+            Some(Self {
+                bracket: Bracket::default(),
+                idents: state_names
+                    .map(new_ident)
+                    .map(Rc::new)
+                    .collect(),
+            })
+        } else {
+            None
         }
     }
 }
@@ -322,6 +327,7 @@ pub(crate) struct Transition {
     in_msg: Option<(Token![?], Rc<Message>)>,
     arrow: Token![->],
     pub(crate) out_states: StatesList,
+    #[allow(dead_code)]
     redirect: Option<Token![>]>,
     pub(crate) out_msgs: DestList,
 }
@@ -390,13 +396,28 @@ impl Parse for Transition {
 
 impl GetSpan for Transition {
     fn span(&self) -> Span {
-        self.in_state
-            .span()
-            .left_join(self.in_msg.as_ref().map(|(x, y)| x.span().left_join(y)))
-            .left_join(self.arrow.span())
-            .left_join(&self.out_states)
-            .left_join(self.redirect.as_ref().map(|x| x.span()))
-            .left_join(&self.out_msgs)
+        self.arrow.span()
+    }
+}
+
+struct IdentIter<'a> {
+    idents: Option<syn::punctuated::Iter<'a, Rc<Ident>>>,
+}
+
+impl<'a> IdentIter<'a> {
+    fn new(idents: Option<syn::punctuated::Iter<'a, Rc<Ident>>>) -> Self {
+        Self { idents }
+    }
+}
+
+impl<'a> Iterator for IdentIter<'a> {
+    type Item = &'a Rc<Ident>;
+    fn next(&mut self) -> Option<Self::Item> {
+        if let Some(idents) = self.idents.as_mut() {
+            idents.next()
+        } else {
+            None
+        }
     }
 }
 
@@ -404,15 +425,25 @@ impl GetSpan for Transition {
 #[derive(Hash, PartialEq, Eq)]
 pub(crate) struct State {
     pub(crate) state_trait: Rc<Ident>,
-    pub(crate) owned_states: IdentArray,
+    pub(crate) owned_states: Option<IdentArray>,
+}
+
+impl State {
+    pub(crate) fn owned_states(&self) -> impl Iterator<Item = &'_ Rc<Ident>> {
+        IdentIter::new(self.owned_states.as_ref().map(|idents| idents.as_ref().iter()))
+    }
 }
 
 #[cfg(test)]
 impl State {
-    pub(crate) fn new(
+    pub(crate) fn new<T, I>(
         state_trait: &str,
-        owned_states: impl IntoIterator<Item = &'static str>,
-    ) -> Self {
+        owned_states: T,
+    ) -> Self
+    where
+        T: IntoIterator<IntoIter = I>,
+        I: ExactSizeIterator<Item = &'static str>,
+    {
         Self {
             state_trait: new_ident(state_trait).into(),
             owned_states: IdentArray::new(owned_states),
@@ -429,9 +460,9 @@ impl Parse for State {
     fn parse(input: syn::parse::ParseStream) -> syn::Result<Self> {
         let state_trait = Ident::parse(input)?.into();
         let owned_states = if input.peek(Bracket) {
-            IdentArray::parse(input)?
+            Some(IdentArray::parse(input)?)
         } else {
-            IdentArray::empty()
+            None
         };
         Ok(Self {
             state_trait,
@@ -503,9 +534,13 @@ impl Parse for DestList {
     }
 }
 
-impl GetSpan for DestList {
-    fn span(&self) -> Span {
-        punctuated_span(&self.0)
+impl MaybeGetSpan for DestList {
+    fn maybe_span(&self) -> Option<Span> {
+        if self.0.is_empty() {
+            None
+        } else {
+            Some(punctuated_span(&self.0))
+        }
     }
 }
 
@@ -549,7 +584,7 @@ impl GetSpan for Dest {
         self.state
             .span()
             .left_join(self.exclamation.span())
-            .left_join(&self.msg)
+            .left_join(self.msg.span())
     }
 }
 
@@ -584,7 +619,7 @@ impl Parse for DestinationState {
             }
             Ok(DestinationState::Service(State {
                 state_trait: dest_state.into(),
-                owned_states: IdentArray::empty(),
+                owned_states: None,
             }))
         } else {
             Ok(DestinationState::Individual(input.parse()?))
@@ -607,7 +642,7 @@ impl GetSpan for DestinationState {
 pub(crate) struct Message {
     pub(crate) msg_type: Rc<Ident>,
     reply_part: Option<MessageReplyPart>,
-    pub(crate) owned_states: IdentArray,
+    pub(crate) owned_states: Option<IdentArray>,
     ident: Option<Ident>,
 }
 
@@ -625,15 +660,23 @@ impl Message {
     pub(crate) fn is_reply(&self) -> bool {
         self.reply_part.is_some()
     }
+
+    pub(crate) fn owned_states(&self) -> impl Iterator<Item = &'_ Rc<Ident>> {
+        IdentIter::new(self.owned_states.as_ref().map(|states| states.as_ref().iter()))
+    }
 }
 
 #[cfg(test)]
 impl Message {
-    pub(crate) fn new(
+    pub(crate) fn new<T, I>(
         msg_type: &str,
         is_reply: bool,
-        owned_states: impl IntoIterator<Item = &'static str>,
-    ) -> Self {
+        owned_states: T,
+    ) -> Self
+    where
+        T: IntoIterator<IntoIter = I>,
+        I: ExactSizeIterator<Item = &'static str>,
+    {
         let (reply_part, ident_field) = if is_reply {
             let reply_part = MessageReplyPart {
                 colons: Token![::](Span::call_site()),
@@ -667,9 +710,9 @@ impl Parse for Message {
             (None, None)
         };
         let owned_states = if input.peek(Bracket) {
-            IdentArray::parse(input)?
+            Some(IdentArray::parse(input)?)
         } else {
-            IdentArray::empty()
+            None
         };
         Ok(Self {
             msg_type,
@@ -682,15 +725,10 @@ impl Parse for Message {
 
 impl GetSpan for Message {
     fn span(&self) -> Span {
-        let owned_states = if self.owned_states.as_ref().is_empty() {
-            None
-        } else {
-            Some(&self.owned_states)
-        };
         self.msg_type
             .span()
             .left_join(self.reply_part.as_ref())
-            .left_join(owned_states)
+            .left_join(&self.owned_states)
     }
 }
 
@@ -770,22 +808,38 @@ fn punctuated_span<T: GetSpan, P>(arg: &Punctuated<T, P>) -> Span {
     span
 }
 
+pub(crate) trait MaybeGetSpan {
+    fn maybe_span(&self) -> Option<Span>;
+}
+
+impl<'a, T: MaybeGetSpan> MaybeGetSpan for &'a T {
+    fn maybe_span(&self) -> Option<Span> {
+        (*self).maybe_span()
+    }
+}
+
+impl<T: GetSpan> MaybeGetSpan for Option<T> {
+    fn maybe_span(&self) -> Option<Span> {
+        self.as_ref().map(|x| x.span())
+    }
+}
+
+impl MaybeGetSpan for Span {
+    fn maybe_span(&self) -> Option<Span> {
+        Some(*self)
+    }
+}
+
 trait LeftJoin<Rhs> {
     /// Attempts to join two [GetSpan] values, but if the result of the join is `None`, then just
     /// the left span is returned.
     fn left_join(&self, other: Rhs) -> Span;
 }
 
-impl<R: GetSpan> LeftJoin<R> for Span {
+impl<R: MaybeGetSpan> LeftJoin<R> for Span {
     fn left_join(&self, other: R) -> Span {
-        self.join(other.span()).unwrap_or(*self)
-    }
-}
-
-impl<R: GetSpan> LeftJoin<Option<R>> for Span {
-    fn left_join(&self, other: Option<R>) -> Span {
-        if let Some(other) = other {
-            self.left_join(other)
+        if let Some(span) = other.maybe_span() {
+            self.join(span).unwrap_or(*self)
         } else {
             *self
         }
@@ -977,7 +1031,7 @@ Init?Activate -> End;"
     fn ident_array_new() {
         const EXPECTED: [&str; 2] = ["Red", "Green"];
 
-        let actual = IdentArray::new(EXPECTED);
+        let actual = IdentArray::new(EXPECTED).unwrap();
 
         assert_eq!(EXPECTED.len(), actual.idents.len());
         assert_eq!(actual.idents[0].as_ref(), EXPECTED[0]);
@@ -1006,7 +1060,7 @@ Init?Activate -> End;"
     fn ident_array_parse() {
         const EXPECTED_STATES: [&str; 2] = ["Sad", "Glad"];
         let input = format!("[{}, {}]", EXPECTED_STATES[0], EXPECTED_STATES[1]);
-        let expected = IdentArray::new(EXPECTED_STATES);
+        let expected = IdentArray::new(EXPECTED_STATES).unwrap();
 
         let actual = parse_str::<IdentArray>(&input).unwrap();
 
@@ -1261,16 +1315,17 @@ Init?Activate -> End;"
 
         assert_eq!(actual.msg_type.as_ref(), EXPECTED_MSG_TYPE);
         assert_eq!(actual.is_reply(), EXPECTED_IS_REPLY);
+        let idents = actual.owned_states.unwrap().idents;
         assert_eq!(
-            actual.owned_states.idents.len(),
+            idents.len(),
             EXPECTED_OWNED_STATES.len()
         );
         assert_eq!(
-            actual.owned_states.idents[0].as_ref(),
+            idents[0].as_ref(),
             EXPECTED_OWNED_STATES[0]
         );
         assert_eq!(
-            actual.owned_states.idents[1].as_ref(),
+            idents[1].as_ref(),
             EXPECTED_OWNED_STATES[1]
         );
     }

+ 12 - 147
crates/btproto/src/validation.rs

@@ -17,7 +17,6 @@ impl ProtocolModel {
             .combine(self.receivers_and_senders_matched())
             .combine(self.no_undeliverable_msgs())
             .combine(self.replies_expected())
-            .combine(self.clients_only_receive_replies())
             .combine(self.no_unobservable_states())
             .into()
     }
@@ -38,17 +37,13 @@ impl ProtocolModel {
             used.insert(&in_state.state_trait);
             used.extend(
                 in_state
-                    .owned_states
-                    .as_ref()
-                    .iter()
+                    .owned_states()
                     .map(|ident| ident.as_ref()),
             );
             if let Some(in_msg) = transition.in_msg() {
                 used.extend(
                     in_msg
-                        .owned_states
-                        .as_ref()
-                        .iter()
+                        .owned_states()
                         .map(|ident| ident.as_ref()),
                 );
             }
@@ -56,9 +51,7 @@ impl ProtocolModel {
                 used.insert(&out_states.state_trait);
                 used.extend(
                     out_states
-                        .owned_states
-                        .as_ref()
-                        .iter()
+                        .owned_states()
                         .map(|ident| ident.as_ref()),
                 );
             }
@@ -82,6 +75,10 @@ impl ProtocolModel {
     /// defined, and every receiver has a sender. Note that each message isn't required to have a
     /// unique sender or a unique receiver, just that at least one of each much be defined.
     fn receivers_and_senders_matched<'s>(&'s self) -> MaybeErr {
+        /// Represents a message sender or receiver.
+        ///
+        /// This type is essentially just a tuple of references, but was created so a [Hash]
+        /// implementation could be defined.
         #[cfg_attr(test, derive(Debug))]
         struct MsgEndpoint<'a> {
             state: &'a State,
@@ -128,11 +125,11 @@ impl ProtocolModel {
         let mut outgoing: HashSet<MsgEndpoint<'s>> = HashSet::new();
         let mut incoming: HashSet<MsgEndpoint<'s>> = HashSet::new();
         for actor in self.actors_iter() {
-            for method in actor
+            let methods = actor
                 .states()
                 .values()
-                .flat_map(|state| state.methods().values())
-            {
+                .flat_map(|state| state.methods().values());
+            for method in methods {
                 let transition = method.def();
                 if let Some(msg) = transition.in_msg() {
                     let msg_info = msgs.lookup(msg);
@@ -188,20 +185,14 @@ impl ProtocolModel {
                 match &dest.state {
                     DestinationState::Service(_) => continue,
                     DestinationState::Individual(dest_state) => {
+                        let owned_states = transition.in_state.owned_states().map(|ident| ident.as_ref());
                         let allowed = allowed_states.get_or_insert_with(|| {
                             transition
                                 .out_states
                                 .as_ref()
                                 .iter()
                                 .map(|state| state.state_trait.as_ref())
-                                .chain(
-                                    transition
-                                        .in_state
-                                        .owned_states
-                                        .as_ref()
-                                        .iter()
-                                        .map(|ident| ident.as_ref()),
-                                )
+                                .chain(owned_states)
                                 .collect()
                         });
                         if !allowed.contains(dest_state.state_trait.as_ref()) {
@@ -258,30 +249,6 @@ impl ProtocolModel {
         err
     }
 
-    /// A client is any actor with a state that sends at least one message when not handling an
-    /// incoming message. Such actors are not allowed to receive any messages which are not replies.
-    fn clients_only_receive_replies(&self) -> MaybeErr {
-        self.actors_iter()
-            .filter(|actor| actor.is_client())
-            .flat_map(|actor| {
-                actor
-                    .states()
-                    .values()
-                    .flat_map(|state| state.methods().values())
-            })
-            .filter(|method| {
-                if let Some(msg) = method.def().in_msg() {
-                    !msg.is_reply()
-                } else {
-                    false
-                }
-            })
-            .map(|transition| {
-                syn::Error::new(transition.span(), error::msgs::CLIENT_RECEIVED_NON_REPLY)
-            })
-            .collect()
-    }
-
     /// Checks that there are no client states which are only receiving replies. Such states can't
     /// be observed, because the methods which sent the original messages will return their replies.
     fn no_unobservable_states(&self) -> MaybeErr {
@@ -739,106 +706,4 @@ mod tests {
 
         assert_err(result, error::msgs::MULTIPLE_REPLIES);
     }
-
-    #[test]
-    fn clients_only_receive_replies_ok() {
-        let input = ProtocolModel::new(Protocol::new(
-            NameDef::new("ClientReplies"),
-            [
-                ActorDef::new("client", ["Client", "Waiting"]),
-                ActorDef::new("server", ["Init", "Listening"]),
-            ],
-            [
-                Transition::new(
-                    State::new("Init", []),
-                    Some(Message::new("Activate", false, [])),
-                    [State::new("Listening", [])],
-                    [],
-                ),
-                Transition::new(
-                    State::new("Client", []),
-                    None,
-                    [State::new("Waiting", [])],
-                    [Dest::new(
-                        DestinationState::Service(State::new("Listening", [])),
-                        Message::new("Ping", false, []),
-                    )],
-                ),
-                Transition::new(
-                    State::new("Listening", []),
-                    Some(Message::new("Ping", false, [])),
-                    [State::new("Listening", [])],
-                    [Dest::new(
-                        DestinationState::Individual(State::new("Waiting", [])),
-                        Message::new("Ping", true, []),
-                    )],
-                ),
-                Transition::new(
-                    State::new("Waiting", []),
-                    Some(Message::new("Ping", true, [])),
-                    [State::new("Client", [])],
-                    [],
-                ),
-            ],
-        ))
-        .unwrap();
-
-        let result = input.clients_only_receive_replies();
-
-        assert_ok(result);
-    }
-
-    #[test]
-    fn clients_only_receive_replies_err() {
-        let input = ProtocolModel::new(Protocol::new(
-            NameDef::new("ClientReplies"),
-            [
-                ActorDef::new("client", ["Client", "Waiting"]),
-                ActorDef::new("server", ["Init", "Listening"]),
-            ],
-            [
-                Transition::new(
-                    State::new("Init", []),
-                    Some(Message::new("Activate", false, [])),
-                    [State::new("Listening", [])],
-                    [],
-                ),
-                Transition::new(
-                    State::new("Client", []),
-                    None,
-                    [State::new("Waiting", [])],
-                    [Dest::new(
-                        DestinationState::Service(State::new("Listening", [])),
-                        Message::new("Ping", false, []),
-                    )],
-                ),
-                Transition::new(
-                    State::new("Listening", []),
-                    Some(Message::new("Ping", false, [])),
-                    [State::new("Listening", [])],
-                    [Dest::new(
-                        DestinationState::Individual(State::new("Waiting", [])),
-                        Message::new("Ping", true, []),
-                    )],
-                ),
-                Transition::new(
-                    State::new("Waiting", []),
-                    Some(Message::new("Ping", true, [])),
-                    [State::new("Client", [])],
-                    [],
-                ),
-                Transition::new(
-                    State::new("Client", []),
-                    Some(Message::new("Activate", false, [])),
-                    [State::new("Client", [])],
-                    [],
-                ),
-            ],
-        ))
-        .unwrap();
-
-        let result = input.clients_only_receive_replies();
-
-        assert_err(result, error::msgs::CLIENT_RECEIVED_NON_REPLY);
-    }
 }

+ 74 - 6
crates/btproto/tests/protocol_tests.rs

@@ -1,3 +1,5 @@
+#![feature(impl_trait_in_assoc_type)]
+
 use std::future::{ready, Ready};
 
 use btlib::Result;
@@ -12,7 +14,7 @@ impl CallMsg for Ping {
     type Reply = ();
 }
 
-/// Tests that the given variable is of the given type.
+/// Tests that the given variable is of the given type _at compile time_.
 macro_rules! assert_type {
     ($var:expr, $ty:ty) => {{
         let _: $ty = $var;
@@ -21,6 +23,7 @@ macro_rules! assert_type {
 
 #[test]
 fn minimal_syntax() {
+    #[derive(Serialize, Deserialize)]
     pub struct Msg;
 
     protocol! {
@@ -49,8 +52,8 @@ fn minimal_syntax() {
     struct ClientState;
 
     impl Client for ClientState {
-        type SendMsgFut = Ready<Result<End>>;
-        fn send_msg(self, _msg: Msg) -> Self::SendMsgFut {
+        type OnSendMsgFut = Ready<Result<End>>;
+        fn on_send_msg(self, _msg: &mut Msg) -> Self::OnSendMsgFut {
             ready(Ok(End))
         }
     }
@@ -86,10 +89,75 @@ fn reply() {
     struct ClientState;
 
     impl Client for ClientState {
-        type SendPingClient = Self;
-        type SendPingFut = Ready<Result<(Self, <Ping as CallMsg>::Reply)>>;
-        fn send_ping(self, _ping: Ping) -> Self::SendPingFut {
+        type OnSendPingClient = Self;
+        type OnSendPingFut = Ready<Result<(Self, <Ping as CallMsg>::Reply)>>;
+        fn on_send_ping(self, _ping: &mut Ping) -> Self::OnSendPingFut {
             ready(Ok((self, ())))
         }
     }
 }
+
+#[test]
+fn client_callback() {
+    #[derive(Serialize, Deserialize)]
+    pub struct Register;
+    #[derive(Serialize, Deserialize)]
+    pub struct Completed;
+
+    protocol! {
+        named ClientCallback;
+        let server = [Listening];
+        let worker = [Working];
+        let client = [Unregistered, Registered];
+        Unregistered -> Registered, >service(Listening)!Register[Registered];
+        Listening?Register[Registered] -> Listening, Working[Registered];
+        Working[Registered] -> End, >Registered!Completed;
+        Registered?Completed -> End;
+    }
+
+    let msg: Option<ClientCallbackMsgs> = None;
+    match msg {
+        Some(ClientCallbackMsgs::Register(msg)) => assert_type!(msg, Register),
+        Some(ClientCallbackMsgs::Completed(msg)) => assert_type!(msg, Completed),
+        _ => (),
+    }
+
+    struct UnregisteredState;
+
+    impl Unregistered for UnregisteredState {
+        type OnSendRegisterRegistered = RegisteredState;
+        type OnSendRegisterFut = Ready<Result<Self::OnSendRegisterRegistered>>;
+        fn on_send_register(self, _arg: &mut Register) -> Self::OnSendRegisterFut {
+            ready(Ok(RegisteredState))
+        }
+    }
+
+    struct RegisteredState;
+
+    impl Registered for RegisteredState {
+        type HandleCompletedFut = Ready<Result<End>>;
+        fn handle_completed(self, _arg: Completed) -> Self::HandleCompletedFut {
+            ready(Ok(End))
+        }
+    }
+
+    struct ListeningState;
+
+    impl Listening for ListeningState {
+        type HandleRegisterListening = ListeningState;
+        type HandleRegisterWorking = WorkingState;
+        type HandleRegisterFut = Ready<Result<(ListeningState, WorkingState)>>;
+        fn handle_register(self, _arg: Register) -> Self::HandleRegisterFut {
+            ready(Ok((self, WorkingState)))
+        }
+    }
+
+    struct WorkingState;
+
+    impl Working for WorkingState {
+        type OnSendCompletedFut = Ready<Result<(End, Completed)>>;
+        fn on_send_completed(self) -> Self::OnSendCompletedFut {
+            ready(Ok((End, Completed)))
+        }
+    }
+}

+ 19 - 2
crates/btrun/src/lib.rs

@@ -121,6 +121,16 @@ impl Runtime {
         }
     }
 
+    /// Sends a message to the service identified by [ServiceName].
+    pub async fn send_service<T: 'static + SendMsg>(
+        &self,
+        _to: ServiceName,
+        _from: ActorName,
+        _msg: T,
+    ) -> Result<()> {
+        todo!()
+    }
+
     /// Sends a message to the actor identified by the given [ActorName] and returns a future which
     /// is ready when a reply has been received.
     pub async fn call<T: 'static + CallMsg>(
@@ -154,8 +164,13 @@ impl Runtime {
         }
     }
 
-    /// Resolves the given [ServiceName] to an [ActorName] which is part of it.
-    pub async fn resolve<'a>(&'a self, _service: &ServiceName) -> Result<ActorName> {
+    /// Calls a service identified by [ServiceName].
+    pub async fn send_call<T: 'static + CallMsg>(
+        &self,
+        _to: ServiceName,
+        _from: ActorName,
+        _msg: T,
+    ) -> Result<T::Reply> {
         todo!()
     }
 
@@ -454,6 +469,8 @@ pub struct ServiceName {
     path: Arc<BlockPath>,
     /// The id of the service.
     service_id: ServiceId,
+    /// Indicates if the message should be routed towards the root of the tree or away from it.
+    rootward: bool,
 }
 
 /// A unique identifier for a specific actor activation.

+ 238 - 0
crates/btrun/tests/runtime_tests.rs

@@ -13,6 +13,7 @@ use btserde::to_vec;
 use bttp::{BlockAddr, Transmitter};
 use ctor::ctor;
 use lazy_static::lazy_static;
+use log;
 use serde::{Deserialize, Serialize};
 use std::{
     future::{ready, Future, Ready},
@@ -408,3 +409,240 @@ mod travel_agency {
         type Reply = ();
     }
 }
+
+#[allow(dead_code)]
+mod client_callback {
+    use super::*;
+
+    #[derive(Serialize, Deserialize)]
+    pub struct Register;
+    #[derive(Serialize, Deserialize)]
+    pub struct Completed {
+        value: usize,
+    }
+
+    protocol! {
+        named ClientCallback;
+        let server = [Listening];
+        let worker = [Working];
+        let client = [Unregistered, Registered];
+        Unregistered -> Registered, >service(Listening)!Register[Registered];
+        Listening?Register[Registered] -> Listening, Working[Registered];
+        Working[Registered] -> End, >Registered!Completed;
+        Registered?Completed -> End;
+    }
+
+    struct UnregisteredState {
+        factor: usize,
+    }
+
+    impl Unregistered for UnregisteredState {
+        type OnSendRegisterRegistered = RegisteredState;
+        type OnSendRegisterFut = Ready<Result<Self::OnSendRegisterRegistered>>;
+        fn on_send_register(self, _arg: &mut Register) -> Self::OnSendRegisterFut {
+            ready(Ok(RegisteredState {
+                factor: self.factor,
+                result: None,
+            }))
+        }
+    }
+
+    struct RegisteredState {
+        factor: usize,
+        result: Option<usize>,
+    }
+
+    impl Registered for RegisteredState {
+        type HandleCompletedFut = Ready<Result<End>>;
+        fn handle_completed(mut self, arg: Completed) -> Self::HandleCompletedFut {
+            self.result = Some(self.factor * arg.value);
+            ready(Ok(End))
+        }
+    }
+
+    struct ListeningState;
+
+    impl Listening for ListeningState {
+        type HandleRegisterListening = ListeningState;
+        type HandleRegisterWorking = WorkingState;
+        type HandleRegisterFut = Ready<Result<(ListeningState, WorkingState)>>;
+        fn handle_register(self, _arg: Register) -> Self::HandleRegisterFut {
+            ready(Ok((self, WorkingState)))
+        }
+    }
+
+    struct WorkingState;
+
+    impl Working for WorkingState {
+        type OnSendCompletedFut = Ready<Result<(End, Completed)>>;
+        fn on_send_completed(self) -> Self::OnSendCompletedFut {
+            ready(Ok((End, Completed { value: 42 })))
+        }
+    }
+
+    use ::tokio::sync::Mutex;
+
+    enum ClientState<Init: Unregistered> {
+        Unregistered(Init),
+        Registered(Init::OnSendRegisterRegistered),
+        End(End),
+    }
+
+    impl<Init: Unregistered> ClientState<Init> {
+        pub fn name(&self) -> &'static str {
+            match self {
+                Self::Unregistered(_) => "Unregistered",
+                Self::Registered(_) => "Registered",
+                Self::End(_) => "End",
+            }
+        }
+    }
+
+    struct ClientHandle<Init: Unregistered> {
+        runtime: &'static Runtime,
+        state: Arc<Mutex<Option<ClientState<Init>>>>,
+        name: ActorName,
+    }
+
+    impl<Init: Unregistered> ClientHandle<Init> {
+        async fn send_register(&self, to: ServiceName, mut msg: Register) -> Result<()> {
+            let mut guard = self.state.lock().await;
+            let state = guard
+                .take()
+                .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
+            let new_state = match state {
+                ClientState::Unregistered(state) => {
+                    let new_state = state.on_send_register(&mut msg).await?;
+                    let msg = ClientCallbackMsgs::Register(msg);
+                    self.runtime
+                        .send_service(to, self.name.clone(), msg)
+                        .await?;
+                    // QUESTION: Should `on_send_register` be required to return the previous state
+                    // if it encounters an error?
+                    ClientState::Registered(new_state)
+                }
+                state => state,
+            };
+            *guard = Some(new_state);
+            Ok(())
+        }
+    }
+
+    async fn start_client<Init>(init: Init, runtime: &'static Runtime) -> ClientHandle<Init>
+    where
+        Init: 'static + Unregistered,
+    {
+        let state = Arc::new(Mutex::new(Some(ClientState::Unregistered(init))));
+        let name = {
+            let state = state.clone();
+            runtime.activate(move |_, mut mailbox, _act_id| async move {
+                while let Some(envelope) = mailbox.recv().await {
+                    let mut guard = state.lock().await;
+                    let state = guard.take()
+                        .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
+                    let (msg, _replier, _from) = envelope.split();
+                    let new_state = match (state, msg) {
+                        (ClientState::Registered(curr_state), ClientCallbackMsgs::Completed(msg)) => {
+                            match curr_state.handle_completed(msg).await {
+                                Ok(next) =>  ClientState::<Init>::End(next),
+                                Err(err) => {
+                                    log::error!("Failed to handle 'Completed' message in 'Registered' state: {err}");
+                                    panic!("We can't transition to a new state because we gave ownership away and that method failed!")
+                                }
+                            }
+                        }
+                        (state, msg) => {
+                            log::error!("Unexpected message '{}' in state '{}'.", msg.name(), state.name());
+                            state
+                        }
+                    };
+                    *guard = Some(new_state);
+                }
+            }).await
+        };
+        ClientHandle {
+            runtime,
+            state,
+            name,
+        }
+    }
+
+    async fn start_server<Init>(init: Init, runtime: &'static Runtime) -> ActorName
+    where
+        Init: 'static + Listening<HandleRegisterListening = Init>,
+    {
+        enum ServerState<S: Listening> {
+            Listening(S),
+        }
+
+        impl<S: Listening> ServerState<S> {
+            fn name(&self) -> &'static str {
+                match self {
+                    Self::Listening(_) => "Listening",
+                }
+            }
+        }
+
+        runtime
+            .activate(move |_, mut mailbox, _act_id| async move {
+                let mut state = ServerState::Listening(init);
+                while let Some(envelope) = mailbox.recv().await {
+                    let (msg, _replier, from) = envelope.split();
+                    let new_state = match (state, msg) {
+                        (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
+                            match curr_state.handle_register(msg).await {
+                                Ok((new_state, working_state)) => {
+                                    start_worker(working_state, from, runtime).await;
+                                    ServerState::Listening(new_state)
+                                }
+                                Err(err) => {
+                                    log::error!("Failed to handle the Register message: {err}");
+                                    todo!("Need to recover the previous state from err.")
+                                }
+                            }
+                        }
+                        (state, msg) => {
+                            log::error!(
+                                "Unexpected message '{}' in state '{}'.",
+                                msg.name(),
+                                state.name()
+                            );
+                            state
+                        }
+                    };
+                    state = new_state;
+                }
+            })
+            .await
+    }
+
+    async fn start_worker<Init>(
+        init: Init,
+        owned: ActorName,
+        runtime: &'static Runtime,
+    ) -> ActorName
+    where
+        Init: 'static + Working,
+    {
+        enum WorkerState<S: Working> {
+            Working(S),
+        }
+
+        runtime
+            .activate::<ClientCallbackMsgs, _, _>(move |_, _, act_id| async move {
+                let msg = match init.on_send_completed().await {
+                    Ok((End, msg)) => msg,
+                    Err(err) => {
+                        log::error!("Failed to send Completed message: {err}");
+                        return;
+                    }
+                };
+                let from = runtime.actor_name(act_id);
+                let msg = ClientCallbackMsgs::Completed(msg);
+                if let Err(err) = runtime.send(owned, from, msg).await {
+                    log::error!("Failed to send Completed message: {err}");
+                }
+            })
+            .await
+    }
+}