Sfoglia il codice sorgente

* Updated the ping_pong module in btrun.
* Added a new test in the client_callback module.

Matthew Carr 1 anno fa
parent
commit
abb47d47c0

+ 16 - 27
crates/btproto/src/parsing.rs

@@ -280,10 +280,7 @@ impl IdentArray {
         if state_names.len() > 0 {
             Some(Self {
                 bracket: Bracket::default(),
-                idents: state_names
-                    .map(new_ident)
-                    .map(Rc::new)
-                    .collect(),
+                idents: state_names.map(new_ident).map(Rc::new).collect(),
             })
         } else {
             None
@@ -430,16 +427,17 @@ pub(crate) struct State {
 
 impl State {
     pub(crate) fn owned_states(&self) -> impl Iterator<Item = &'_ Rc<Ident>> {
-        IdentIter::new(self.owned_states.as_ref().map(|idents| idents.as_ref().iter()))
+        IdentIter::new(
+            self.owned_states
+                .as_ref()
+                .map(|idents| idents.as_ref().iter()),
+        )
     }
 }
 
 #[cfg(test)]
 impl State {
-    pub(crate) fn new<T, I>(
-        state_trait: &str,
-        owned_states: T,
-    ) -> Self
+    pub(crate) fn new<T, I>(state_trait: &str, owned_states: T) -> Self
     where
         T: IntoIterator<IntoIter = I>,
         I: ExactSizeIterator<Item = &'static str>,
@@ -662,17 +660,17 @@ impl Message {
     }
 
     pub(crate) fn owned_states(&self) -> impl Iterator<Item = &'_ Rc<Ident>> {
-        IdentIter::new(self.owned_states.as_ref().map(|states| states.as_ref().iter()))
+        IdentIter::new(
+            self.owned_states
+                .as_ref()
+                .map(|states| states.as_ref().iter()),
+        )
     }
 }
 
 #[cfg(test)]
 impl Message {
-    pub(crate) fn new<T, I>(
-        msg_type: &str,
-        is_reply: bool,
-        owned_states: T,
-    ) -> Self
+    pub(crate) fn new<T, I>(msg_type: &str, is_reply: bool, owned_states: T) -> Self
     where
         T: IntoIterator<IntoIter = I>,
         I: ExactSizeIterator<Item = &'static str>,
@@ -1316,18 +1314,9 @@ Init?Activate -> End;"
         assert_eq!(actual.msg_type.as_ref(), EXPECTED_MSG_TYPE);
         assert_eq!(actual.is_reply(), EXPECTED_IS_REPLY);
         let idents = actual.owned_states.unwrap().idents;
-        assert_eq!(
-            idents.len(),
-            EXPECTED_OWNED_STATES.len()
-        );
-        assert_eq!(
-            idents[0].as_ref(),
-            EXPECTED_OWNED_STATES[0]
-        );
-        assert_eq!(
-            idents[1].as_ref(),
-            EXPECTED_OWNED_STATES[1]
-        );
+        assert_eq!(idents.len(), EXPECTED_OWNED_STATES.len());
+        assert_eq!(idents[0].as_ref(), EXPECTED_OWNED_STATES[0]);
+        assert_eq!(idents[1].as_ref(), EXPECTED_OWNED_STATES[1]);
     }
 
     #[test]

+ 7 - 16
crates/btproto/src/validation.rs

@@ -35,25 +35,13 @@ impl ProtocolModel {
         for transition in self.def().transitions.iter() {
             let in_state = &transition.in_state;
             used.insert(&in_state.state_trait);
-            used.extend(
-                in_state
-                    .owned_states()
-                    .map(|ident| ident.as_ref()),
-            );
+            used.extend(in_state.owned_states().map(|ident| ident.as_ref()));
             if let Some(in_msg) = transition.in_msg() {
-                used.extend(
-                    in_msg
-                        .owned_states()
-                        .map(|ident| ident.as_ref()),
-                );
+                used.extend(in_msg.owned_states().map(|ident| ident.as_ref()));
             }
             for out_states in transition.out_states.as_ref().iter() {
                 used.insert(&out_states.state_trait);
-                used.extend(
-                    out_states
-                        .owned_states()
-                        .map(|ident| ident.as_ref()),
-                );
+                used.extend(out_states.owned_states().map(|ident| ident.as_ref()));
             }
             // We don't have to check the states referred to in out_msgs because the
             // receivers_and_senders_matched method ensures that each of these exists in a receiver
@@ -185,7 +173,10 @@ impl ProtocolModel {
                 match &dest.state {
                     DestinationState::Service(_) => continue,
                     DestinationState::Individual(dest_state) => {
-                        let owned_states = transition.in_state.owned_states().map(|ident| ident.as_ref());
+                        let owned_states = transition
+                            .in_state
+                            .owned_states()
+                            .map(|ident| ident.as_ref());
                         let allowed = allowed_states.get_or_insert_with(|| {
                             transition
                                 .out_states

+ 223 - 65
crates/btrun/src/lib.rs

@@ -2,7 +2,7 @@
 
 use std::{
     any::Any,
-    collections::HashMap,
+    collections::{hash_map, HashMap},
     fmt::Display,
     future::{ready, Future, Ready},
     marker::PhantomData,
@@ -52,6 +52,9 @@ where
     Receiver::new(ip_addr, creds, callback).unwrap()
 }
 
+/// Type used to implement an actor's mailbox.
+pub type Mailbox<T> = mpsc::Receiver<Envelope<T>>;
+
 /// An actor runtime.
 ///
 /// Actors can be activated by the runtime and execute autonomously until they return. Running
@@ -62,6 +65,7 @@ pub struct Runtime {
     path: Arc<BlockPath>,
     handles: RwLock<HashMap<Uuid, ActorHandle>>,
     peers: RwLock<HashMap<Arc<BlockPath>, Transmitter>>,
+    registry: RwLock<HashMap<ServiceId, ServiceRecord>>,
 }
 
 impl Runtime {
@@ -76,6 +80,7 @@ impl Runtime {
             path,
             handles: RwLock::new(HashMap::new()),
             peers: RwLock::new(HashMap::new()),
+            registry: RwLock::new(HashMap::new()),
         })
     }
 
@@ -123,12 +128,26 @@ impl Runtime {
 
     /// Sends a message to the service identified by [ServiceName].
     pub async fn send_service<T: 'static + SendMsg>(
-        &self,
-        _to: ServiceName,
-        _from: ActorName,
-        _msg: T,
+        &'static self,
+        to: ServiceAddr,
+        from: ActorName,
+        msg: T,
     ) -> Result<()> {
-        todo!()
+        if to.path().as_ref() == self.path.as_ref() {
+            let actor_id = self.service_provider(&to).await?;
+            let handles = self.handles.read().await;
+            if let Some(handle) = handles.get(&actor_id) {
+                handle.send(from, msg).await
+            } else {
+                panic!(
+                    "Service record '{}' had a non-existent actor with ID '{}'.",
+                    to.service_id(),
+                    actor_id
+                );
+            }
+        } else {
+            todo!("Send the message to an appropriate peer.")
+        }
     }
 
     /// Sends a message to the actor identified by the given [ActorName] and returns a future which
@@ -142,7 +161,7 @@ impl Runtime {
         if to.path == self.path {
             let guard = self.handles.read().await;
             if let Some(handle) = guard.get(&to.act_id) {
-                handle.call_through(from, msg).await
+                handle.call_through(msg).await
             } else {
                 Err(bterr!("invalid actor name"))
             }
@@ -157,29 +176,76 @@ impl Runtime {
                 };
                 peer.call(wire_msg, ReplyCallback::<T>::new()).await?
             } else {
-                // TODO: Use the filesystem to discover the address of the recipient and connect to
-                // it.
-                todo!()
+                todo!("Use the filesystem to find the address of the recipient and connect to it.")
             }
         }
     }
 
     /// Calls a service identified by [ServiceName].
-    pub async fn send_call<T: 'static + CallMsg>(
-        &self,
-        _to: ServiceName,
-        _from: ActorName,
-        _msg: T,
+    pub async fn call_service<T: 'static + CallMsg>(
+        &'static self,
+        to: ServiceAddr,
+        msg: T,
     ) -> Result<T::Reply> {
-        todo!()
+        if to.path().as_ref() == self.path.as_ref() {
+            let actor_id = self.service_provider(&to).await?;
+            let handles = self.handles.read().await;
+            if let Some(handle) = handles.get(&actor_id) {
+                handle.call_through(msg).await
+            } else {
+                panic!(
+                    "Service record '{}' had a non-existent actor with ID '{}'.",
+                    to.service_id(),
+                    actor_id
+                );
+            }
+        } else {
+            todo!("Send the message to an appropriate peer.")
+        }
+    }
+
+    fn service_not_registered_err(id: &ServiceId) -> btlib::Error {
+        bterr!("Service is not registered: '{id}'")
     }
 
-    /// Activates a new actor using the given activator function and returns a handle to it.
-    pub async fn activate<Msg, F, Fut>(&'static self, activator: F) -> ActorName
+    async fn service_provider(&'static self, to: &ServiceAddr) -> Result<Uuid> {
+        let actor_id = {
+            let registry = self.registry.read().await;
+            if let Some(record) = registry.get(to.service_id()) {
+                record.actor_ids.first().copied()
+            } else {
+                return Err(Self::service_not_registered_err(to.service_id()));
+            }
+        };
+        let actor_id = if let Some(actor_id) = actor_id {
+            actor_id
+        } else {
+            let mut registry = self.registry.write().await;
+            if let Some(record) = registry.get_mut(to.service_id()) {
+                // It's possible that another thread got the write lock before us and they
+                // already spawned an actor.
+                if record.actor_ids.is_empty() {
+                    let spawner = record.spawner.as_ref();
+                    let actor_name = spawner(self).await?;
+                    let actor_id = actor_name.act_id;
+                    record.actor_ids.push(actor_id);
+                    actor_id
+                } else {
+                    record.actor_ids[0]
+                }
+            } else {
+                return Err(Self::service_not_registered_err(to.service_id()));
+            }
+        };
+        Ok(actor_id)
+    }
+
+    /// Spawns a new actor using the given activator function and returns a handle to it.
+    pub async fn spawn<Msg, F, Fut>(&'static self, activator: F) -> ActorName
     where
         Msg: 'static + CallMsg,
         Fut: 'static + Send + Future<Output = ()>,
-        F: FnOnce(&'static Runtime, mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
+        F: FnOnce(&'static Runtime, Mailbox<Msg>, Uuid) -> Fut,
     {
         let mut guard = self.handles.write().await;
         let act_id = {
@@ -206,7 +272,7 @@ impl Runtime {
                 let fut: FutureResult = Box::pin(async move {
                     let msg = result?;
                     if let Some(mut replier) = replier {
-                        let (envelope, rx) = Envelope::new_call(act_name, msg);
+                        let (envelope, rx) = Envelope::new_call(msg);
                         tx.send(envelope).await.map_err(|_| {
                             bterr!("failed to deliver message. Recipient may have halted.")
                         })?;
@@ -237,20 +303,32 @@ impl Runtime {
         act_name
     }
 
-    /// Registers an actor as a service with the given [ServiceId].
-    pub async fn register<Msg, Fut, F, G>(
-        &self,
-        _id: ServiceId,
-        _activator: F,
-        _deserializer: G,
-    ) -> Result<()>
+    /// Registers a service activation closure for [ServiceId]. An error is returned if the
+    /// [ServiceId] has already been registered.
+    pub async fn register<Msg, F>(&self, id: ServiceId, spawner: F) -> Result<ServiceName>
     where
         Msg: 'static + CallMsg,
-        Fut: 'static + Send + Future<Output = ()>,
-        F: Fn(mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
-        G: 'static + Send + Sync + Fn(&[u8]) -> Result<Msg>,
+        F: 'static
+            + Send
+            + Sync
+            + Fn(&'static Runtime) -> Pin<Box<dyn Future<Output = Result<ActorName>>>>,
     {
-        todo!()
+        let mut guard = self.registry.write().await;
+        match guard.entry(id.clone()) {
+            hash_map::Entry::Vacant(entry) => {
+                entry.insert(ServiceRecord::new(spawner));
+                Ok(ServiceName::new(self.path().clone(), id.clone()))
+            }
+            hash_map::Entry::Occupied(_) => {
+                log::info!("Updated registration for service '{id}'.");
+                Ok(ServiceName::new(self.path().clone(), id))
+            }
+        }
+    }
+
+    pub async fn take_service(&self, id: &ServiceId) -> Option<ServiceRecord> {
+        let mut registry = self.registry.write().await;
+        registry.remove(id)
     }
 
     /// Returns the [ActorHandle] for the actor with the given name.
@@ -301,6 +379,30 @@ impl Display for RuntimeError {
 
 impl std::error::Error for RuntimeError {}
 
+/// Closure type used to spawn new service providers.
+pub type Spawner =
+    Box<dyn Send + Sync + Fn(&'static Runtime) -> Pin<Box<dyn Future<Output = Result<ActorName>>>>>;
+
+pub struct ServiceRecord {
+    spawner: Spawner,
+    actor_ids: Vec<Uuid>,
+}
+
+impl ServiceRecord {
+    fn new<F>(spawner: F) -> Self
+    where
+        F: 'static
+            + Send
+            + Sync
+            + Fn(&'static Runtime) -> Pin<Box<dyn Future<Output = Result<ActorName>>>>,
+    {
+        Self {
+            spawner: Box::new(spawner),
+            actor_ids: Vec::new(),
+        }
+    }
+}
+
 /// Represents the terminal state of an actor, where it stops processing messages and halts.
 pub struct End;
 
@@ -447,15 +549,27 @@ impl MsgCallback for RuntimeCallback {
 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
 pub struct ServiceId(#[serde(with = "smart_ptr")] Arc<String>);
 
-impl From<String> for ServiceId {
-    fn from(value: String) -> Self {
-        Self(Arc::new(value))
+impl ServiceId {
+    pub fn new(value: Arc<String>) -> Self {
+        Self(value)
     }
 }
 
-impl<'a> From<&'a str> for ServiceId {
-    fn from(value: &'a str) -> Self {
-        Self(Arc::new(value.to_owned()))
+impl AsRef<str> for ServiceId {
+    fn as_ref(&self) -> &str {
+        self.0.as_str()
+    }
+}
+
+impl<T: Into<String>> From<T> for ServiceId {
+    fn from(value: T) -> Self {
+        Self(Arc::new(value.into()))
+    }
+}
+
+impl Display for ServiceId {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.write_str(self.as_ref())
     }
 }
 
@@ -469,10 +583,39 @@ pub struct ServiceName {
     path: Arc<BlockPath>,
     /// The id of the service.
     service_id: ServiceId,
+}
+
+impl ServiceName {
+    pub fn new(path: Arc<BlockPath>, service_id: ServiceId) -> Self {
+        Self { path, service_id }
+    }
+}
+
+/// Indicates the set of service providers a message is addressed to.
+///
+/// The message could be delivered to any of the service providers in this set, at random.
+pub struct ServiceAddr {
+    /// The [ServiceName] to address the message to.
+    name: ServiceName,
+    #[allow(dead_code)]
     /// Indicates if the message should be routed towards the root of the tree or away from it.
     rootward: bool,
 }
 
+impl ServiceAddr {
+    pub fn new(name: ServiceName, rootward: bool) -> Self {
+        Self { name, rootward }
+    }
+
+    pub fn path(&self) -> &Arc<BlockPath> {
+        &self.name.path
+    }
+
+    pub fn service_id(&self) -> &ServiceId {
+        &self.name.service_id
+    }
+}
+
 /// A unique identifier for a specific actor activation.
 #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
 pub struct ActorName {
@@ -562,42 +705,60 @@ impl<'de> WireEnvelope<'de> {
     }
 }
 
+pub enum EnvelopeKind<T: CallMsg> {
+    Call {
+        reply: Option<oneshot::Sender<T::Reply>>,
+    },
+    Send {
+        from: ActorName,
+    },
+}
+
+impl<T: CallMsg> EnvelopeKind<T> {
+    pub fn name(&self) -> &'static str {
+        match self {
+            Self::Call { .. } => "Call",
+            Self::Send { .. } => "Send",
+        }
+    }
+}
+
 /// Wrapper around a message type `T` which indicates who the message is from and, if the message
 /// was dispatched with `call`, provides a channel to reply to it.
 pub struct Envelope<T: CallMsg> {
-    from: ActorName,
-    reply: Option<oneshot::Sender<T::Reply>>,
     msg: T,
+    kind: EnvelopeKind<T>,
 }
 
 impl<T: CallMsg> Envelope<T> {
-    pub fn new(msg: T, reply: Option<oneshot::Sender<T::Reply>>, from: ActorName) -> Self {
-        Self { from, reply, msg }
+    pub fn new(msg: T, kind: EnvelopeKind<T>) -> Self {
+        Self { msg, kind }
     }
 
     /// Creates a new envelope containing the given message which does not expect a reply.
     fn new_send(from: ActorName, msg: T) -> Self {
         Self {
-            from,
+            kind: EnvelopeKind::Send { from },
             msg,
-            reply: None,
         }
     }
 
     /// Creates a new envelope containing the given message which expects exactly one reply.
-    fn new_call(from: ActorName, msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
+    fn new_call(msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
         let (tx, rx) = oneshot::channel::<T::Reply>();
         let envelope = Self {
-            from,
+            kind: EnvelopeKind::Call { reply: Some(tx) },
             msg,
-            reply: Some(tx),
         };
         (envelope, rx)
     }
 
     /// Returns the name of the actor which sent this message.
-    pub fn from(&self) -> &ActorName {
-        &self.from
+    pub fn from(&self) -> Option<&ActorName> {
+        match &self.kind {
+            EnvelopeKind::Send { from } => Some(from),
+            _ => None,
+        }
     }
 
     /// Returns a reference to the message in this envelope.
@@ -610,24 +771,25 @@ impl<T: CallMsg> Envelope<T> {
     /// If this message is not expecting a reply, or if this message has already been replied to,
     /// then an error is returned.
     pub fn reply(&mut self, reply: T::Reply) -> Result<()> {
-        if let Some(tx) = self.reply.take() {
-            if tx.send(reply).is_ok() {
-                Ok(())
-            } else {
-                Err(bterr!("failed to send reply"))
+        match &mut self.kind {
+            EnvelopeKind::Call { reply: tx } => {
+                if let Some(tx) = tx.take() {
+                    tx.send(reply).map_err(|_| bterr!("Failed to send reply."))
+                } else {
+                    Err(bterr!("Reply has already been sent."))
+                }
             }
-        } else {
-            Err(bterr!("reply already sent"))
+            _ => Err(bterr!("Can't reply to '{}' messages.", self.kind.name())),
         }
     }
 
     /// Returns true if this message expects a reply and it has not already been replied to.
     pub fn needs_reply(&self) -> bool {
-        self.reply.is_some()
+        matches!(&self.kind, EnvelopeKind::Call { .. })
     }
 
-    pub fn split(self) -> (T, Option<oneshot::Sender<T::Reply>>, ActorName) {
-        (self.msg, self.reply, self.from)
+    pub fn split(self) -> (T, EnvelopeKind<T>) {
+        (self.msg, self.kind)
     }
 }
 
@@ -655,7 +817,7 @@ impl ActorHandle {
     fn sender<T: 'static + CallMsg>(&self) -> Result<&mpsc::Sender<Envelope<T>>> {
         self.sender
             .downcast_ref::<mpsc::Sender<Envelope<T>>>()
-            .ok_or_else(|| bterr!("unexpected message type"))
+            .ok_or_else(|| bterr!("Attempt to send message as the wrong type."))
     }
 
     /// Sends a message to the actor represented by this handle.
@@ -668,13 +830,9 @@ impl ActorHandle {
         Ok(())
     }
 
-    pub async fn call_through<T: 'static + CallMsg>(
-        &self,
-        from: ActorName,
-        msg: T,
-    ) -> Result<T::Reply> {
+    pub async fn call_through<T: 'static + CallMsg>(&self, msg: T) -> Result<T::Reply> {
         let sender = self.sender()?;
-        let (envelope, rx) = Envelope::new_call(from, msg);
+        let (envelope, rx) = Envelope::new_call(msg);
         sender
             .send(envelope)
             .await

+ 282 - 196
crates/btrun/tests/runtime_tests.rs

@@ -22,7 +22,6 @@ use std::{
         atomic::{AtomicU8, Ordering},
         Arc,
     },
-    time::{Duration, Instant},
 };
 use tokio::{runtime::Builder, sync::mpsc};
 use uuid::Uuid;
@@ -74,11 +73,15 @@ async fn echo(
     _act_id: Uuid,
 ) {
     while let Some(envelope) = mailbox.recv().await {
-        let (msg, replier, ..) = envelope.split();
-        if let Some(replier) = replier {
-            if let Err(_) = replier.send(msg) {
-                panic!("failed to send reply");
+        let (msg, kind) = envelope.split();
+        match kind {
+            EnvelopeKind::Call { reply } => {
+                let replier = reply.unwrap_or_else(|| panic!("The reply has already been sent."));
+                if let Err(_) = replier.send(msg) {
+                    panic!("failed to send reply");
+                }
             }
+            _ => panic!("Expected EchoMsg to be a Call Message."),
         }
     }
 }
@@ -87,7 +90,7 @@ async fn echo(
 fn local_call() {
     ASYNC_RT.block_on(async {
         const EXPECTED: &str = "hello";
-        let name = RUNTIME.activate(echo).await;
+        let name = RUNTIME.spawn(echo).await;
         let from = ActorName::new(name.path().clone(), Uuid::default());
 
         let reply = RUNTIME
@@ -105,7 +108,7 @@ fn local_call() {
 fn remote_call() {
     ASYNC_RT.block_on(async {
         const EXPECTED: &str = "hello";
-        let actor_name = RUNTIME.activate(echo).await;
+        let actor_name = RUNTIME.spawn(echo).await;
         let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
         let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
         let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
@@ -142,7 +145,7 @@ async fn num_running() {
         TEST_STORE.node_creds().unwrap()
     );
     assert_eq!(0, LOCAL_RT.num_running().await);
-    let name = LOCAL_RT.activate(echo).await;
+    let name = LOCAL_RT.spawn(echo).await;
     assert_eq!(1, LOCAL_RT.num_running().await);
     LOCAL_RT.take(&name).await.unwrap();
     assert_eq!(0, LOCAL_RT.num_running().await);
@@ -151,10 +154,12 @@ async fn num_running() {
 mod ping_pong {
     use super::*;
 
+    use btlib::bterr;
+
     // The following code is a proof-of-concept for what types should be generated for a
     // simple ping-pong protocol:
     protocol! {
-        named PingPongProtocol;
+        named PingProtocol;
         let server = [Server];
         let client = [Client];
         Client -> End, >service(Server)!Ping;
@@ -175,192 +180,197 @@ mod ping_pong {
     // When a state is expecting a Reply message, an error occurs if the message is not received
     // in a timely manner.
 
-    #[derive(Serialize, Deserialize)]
-    pub struct Ping;
-    impl CallMsg for Ping {
-        type Reply = PingReply;
-    }
-
-    // I was tempted to name this "Pong", but the proc macro wouldn't think to do that.
-    #[derive(Serialize, Deserialize)]
-    pub struct PingReply;
-
-    trait ClientInit2 {
-        type AfterActivate: SentPing2;
-        type HandleActivateFut: Future<Output = Result<(Self::AfterActivate, Ping)>>;
-        fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
+    enum PingClientState<T: Client> {
+        Client(T),
+        End(End),
     }
 
-    trait ServerInit2 {
-        type AfterActivate: Listening2;
-        type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
-        fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
+    impl<T: Client> PingClientState<T> {
+        const fn name(&self) -> &'static str {
+            match self {
+                Self::Client(_) => "Client",
+                Self::End(_) => "End",
+            }
+        }
     }
 
-    trait Listening2 {
-        type HandlePingFut: Future<Output = Result<(End, PingReply)>>;
-        fn handle_ping(self, msg: Ping) -> Self::HandlePingFut;
+    struct ClientHandle<T: Client> {
+        state: Option<PingClientState<T>>,
+        runtime: &'static Runtime,
     }
 
-    trait SentPing2 {
-        type HandleReplyFut: Future<Output = Result<End>>;
-        fn handle_reply(self, msg: PingReply) -> Self::HandleReplyFut;
+    impl<T: Client> ClientHandle<T> {
+        async fn send_ping(&mut self, mut msg: Ping, service: ServiceAddr) -> Result<PingReply> {
+            let state = self
+                .state
+                .take()
+                .ok_or_else(|| bterr!("State was not returned."))?;
+            let (new_state, result) = match state {
+                PingClientState::Client(state) => {
+                    let (new_state, _) = state.on_send_ping(&mut msg).await?;
+                    let new_state = PingClientState::End(new_state);
+                    let result = self
+                        .runtime
+                        .call_service(service, PingProtocolMsgs::Ping(msg))
+                        .await;
+                    (new_state, result)
+                }
+                state => {
+                    let result = Err(bterr!("Can't send Ping in state {}.", state.name()));
+                    (state, result)
+                }
+            };
+            self.state = Some(new_state);
+            let reply = result?;
+            match reply {
+                PingProtocolMsgs::PingReply(reply) => Ok(reply),
+                msg => Err(bterr!(
+                    "Unexpected message type sent in reply: {}",
+                    msg.name()
+                )),
+            }
+        }
     }
 
-    #[derive(Serialize, Deserialize)]
-    enum PingProtocolMsg {
-        Ping(Ping),
-        PingReply(PingReply),
-    }
-    impl CallMsg for PingProtocolMsg {
-        type Reply = PingProtocolMsg;
+    async fn spawn_client<T: Client>(init: T, runtime: &'static Runtime) -> ClientHandle<T> {
+        let state = Some(PingClientState::Client(init));
+        ClientHandle { state, runtime }
     }
-    impl SendMsg for PingProtocolMsg {}
 
-    struct ClientInitState;
-
-    impl ClientInit2 for ClientInitState {
-        type AfterActivate = ClientState;
-        type HandleActivateFut = impl Future<Output = Result<(Self::AfterActivate, Ping)>>;
-        fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
-            ready(Ok((ClientState, Ping)))
+    async fn register_server<Init, F>(
+        make_init: F,
+        rt: &'static Runtime,
+        id: ServiceId,
+    ) -> Result<ServiceName>
+    where
+        Init: 'static + Server,
+        F: 'static + Send + Sync + Clone + Fn() -> Init,
+    {
+        enum ServerState<S> {
+            Server(S),
+            End(End),
         }
-    }
 
-    struct ClientState;
+        async fn server_loop<Init, F>(
+            _runtime: &'static Runtime,
+            make_init: F,
+            mut mailbox: Mailbox<PingProtocolMsgs>,
+            _act_id: Uuid,
+        ) where
+            Init: 'static + Server,
+            F: 'static + Send + Sync + FnOnce() -> Init,
+        {
+            let mut state = ServerState::Server(make_init());
+            while let Some(envelope) = mailbox.recv().await {
+                let (msg, msg_kind) = envelope.split();
+                state = match (state, msg) {
+                    (ServerState::Server(listening_state), PingProtocolMsgs::Ping(msg)) => {
+                        let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
+                        match msg_kind {
+                            EnvelopeKind::Call { reply: replier } => {
+                                let replier = replier.expect("The reply has already been sent.");
+                                if let Err(_) = replier.send(PingProtocolMsgs::PingReply(reply)) {
+                                    panic!("Failed to send Ping reply.");
+                                }
+                                ServerState::End(new_state)
+                            }
+                            _ => panic!("'Ping' was expected to be a Call message."),
+                        }
+                    }
+                    (state, _) => state,
+                };
 
-    impl SentPing2 for ClientState {
-        type HandleReplyFut = Ready<Result<End>>;
-        fn handle_reply(self, _msg: PingReply) -> Self::HandleReplyFut {
-            ready(Ok(End))
+                if let ServerState::End(_) = state {
+                    break;
+                }
+            }
         }
+
+        rt.register::<PingProtocolMsgs, _>(id, move |runtime| {
+            let make_init = make_init.clone();
+            let fut = async move {
+                let actor_name = runtime
+                    .spawn(move |_, mailbox, act_id| {
+                        server_loop(runtime, make_init, mailbox, act_id)
+                    })
+                    .await;
+                Ok(actor_name)
+            };
+            Box::pin(fut)
+        })
+        .await
     }
 
-    #[allow(dead_code)]
-    enum PingClientState {
-        Init(ClientInitState),
-        SentPing(ClientState),
-        End(End),
+    #[derive(Serialize, Deserialize)]
+    pub struct Ping;
+    impl CallMsg for Ping {
+        type Reply = PingReply;
     }
 
-    struct ServerInitState;
+    #[derive(Serialize, Deserialize)]
+    pub struct PingReply;
 
-    struct ServerState;
+    struct ClientState {
+        counter: Arc<AtomicU8>,
+    }
 
-    impl ServerInit2 for ServerInitState {
-        type AfterActivate = ServerState;
-        type HandleActivateFut = Ready<Result<Self::AfterActivate>>;
-        fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
-            ready(Ok(ServerState))
+    impl ClientState {
+        fn new(counter: Arc<AtomicU8>) -> Self {
+            counter.fetch_add(1, Ordering::SeqCst);
+            Self { counter }
         }
     }
 
-    impl Listening2 for ServerState {
-        type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
-        fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
+    impl Client for ClientState {
+        type OnSendPingFut = impl Future<Output = Result<(End, PingReply)>>;
+        fn on_send_ping(self, _msg: &mut Ping) -> Self::OnSendPingFut {
+            self.counter.fetch_sub(1, Ordering::SeqCst);
             ready(Ok((End, PingReply)))
         }
     }
 
-    #[allow(dead_code)]
-    enum PingServerState {
-        ServerInit(ServerInitState),
-        Listening(ServerState),
-        End(End),
+    struct ServerState {
+        counter: Arc<AtomicU8>,
     }
 
-    async fn ping_server(
-        counter: Arc<AtomicU8>,
-        rt: &'static Runtime,
-        mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
-        act_id: Uuid,
-    ) {
-        let mut state = {
-            let init = ServerInitState;
-            let state = init
-                .handle_activate(Activate::new(rt, act_id))
-                .await
-                .unwrap();
-            PingServerState::Listening(state)
-        };
-        while let Some(envelope) = mailbox.recv().await {
-            let (msg, replier, _from) = envelope.split();
-            match (state, msg) {
-                (PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => {
-                    let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
-                    state = PingServerState::End(new_state);
-                    if let Err(_) = replier.unwrap().send(PingProtocolMsg::PingReply(reply)) {
-                        panic!("Failed to send Ping reply.");
-                    }
-                }
-                (_prev_state, _) => {
-                    panic!("Ping protocol violation.");
-                    // A real implementation should assign the previous state and log the error.
-                    // state = prev_state;
-                }
-            }
-            if let PingServerState::End(_) = state {
-                break;
-            }
+    impl ServerState {
+        fn new(counter: Arc<AtomicU8>) -> Self {
+            counter.fetch_add(1, Ordering::SeqCst);
+            Self { counter }
         }
-        counter.fetch_sub(1, Ordering::SeqCst);
     }
 
-    async fn ping_client(
-        counter: Arc<AtomicU8>,
-        server_name: ActorName,
-        rt: &'static Runtime,
-        _mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
-        act_id: Uuid,
-    ) {
-        let init = ClientInitState;
-        let (state, msg) = init
-            .handle_activate(Activate::new(rt, act_id))
-            .await
-            .unwrap();
-        let from = rt.actor_name(act_id);
-        let reply = rt
-            .call(server_name, from, PingProtocolMsg::Ping(msg))
-            .await
-            .unwrap();
-        if let PingProtocolMsg::PingReply(msg) = reply {
-            state.handle_reply(msg).await.unwrap();
-        } else {
-            panic!("Incorrect message type sent in reply to Ping.");
+    impl Server for ServerState {
+        type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
+        fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
+            self.counter.fetch_sub(1, Ordering::SeqCst);
+            ready(Ok((End, PingReply)))
         }
-        counter.fetch_sub(1, Ordering::SeqCst);
     }
 
     #[test]
     fn ping_pong_test() {
         ASYNC_RT.block_on(async {
-            let counter = Arc::new(AtomicU8::new(2));
-            let server_name = {
-                let counter = counter.clone();
-                RUNTIME
-                    .activate(move |rt, mailbox, act_id| ping_server(counter, rt, mailbox, act_id))
-                    .await
-            };
-            let client_name = {
-                let server_name = server_name.clone();
-                let counter = counter.clone();
-                RUNTIME
-                    .activate(move |rt, mailbox, act_id| {
-                        ping_client(counter, server_name, rt, mailbox, act_id)
-                    })
+            const SERVICE_ID: &str = "PingPongProtocolServer";
+            let service_id = ServiceId::from(SERVICE_ID);
+            let counter = Arc::new(AtomicU8::new(0));
+            let service_name = {
+                let service_counter = counter.clone();
+                let make_init = move || {
+                    let server_counter = service_counter.clone();
+                    ServerState::new(server_counter)
+                };
+                register_server(make_init, &RUNTIME, service_id.clone())
                     .await
+                    .unwrap()
             };
+            let mut client_handle = spawn_client(ClientState::new(counter.clone()), &RUNTIME).await;
+            let service_addr = ServiceAddr::new(service_name, true);
+            client_handle.send_ping(Ping, service_addr).await.unwrap();
 
-            let deadline = Instant::now() + Duration::from_millis(500);
-            while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline {
-                tokio::time::sleep(Duration::from_millis(20)).await;
-            }
-            // Check that both tasks finished successfully and we didn't just timeout.
             assert_eq!(0, counter.load(Ordering::SeqCst));
 
-            // TODO: Should actor which return be removed from the runtime automatically?
-            RUNTIME.take(&server_name).await.unwrap();
-            RUNTIME.take(&client_name).await.unwrap();
+            RUNTIME.take_service(&service_id).await.unwrap();
         });
     }
 }
@@ -412,10 +422,17 @@ mod travel_agency {
 
 #[allow(dead_code)]
 mod client_callback {
+
     use super::*;
 
+    use std::time::Duration;
+    use tokio::{sync::oneshot, time::timeout};
+
     #[derive(Serialize, Deserialize)]
-    pub struct Register;
+    pub struct Register {
+        factor: usize,
+    }
+
     #[derive(Serialize, Deserialize)]
     pub struct Completed {
         value: usize,
@@ -433,7 +450,7 @@ mod client_callback {
     }
 
     struct UnregisteredState {
-        factor: usize,
+        sender: oneshot::Sender<usize>,
     }
 
     impl Unregistered for UnregisteredState {
@@ -441,42 +458,53 @@ mod client_callback {
         type OnSendRegisterFut = Ready<Result<Self::OnSendRegisterRegistered>>;
         fn on_send_register(self, _arg: &mut Register) -> Self::OnSendRegisterFut {
             ready(Ok(RegisteredState {
-                factor: self.factor,
-                result: None,
+                sender: self.sender,
             }))
         }
     }
 
     struct RegisteredState {
-        factor: usize,
-        result: Option<usize>,
+        sender: oneshot::Sender<usize>,
     }
 
     impl Registered for RegisteredState {
         type HandleCompletedFut = Ready<Result<End>>;
-        fn handle_completed(mut self, arg: Completed) -> Self::HandleCompletedFut {
-            self.result = Some(self.factor * arg.value);
+        fn handle_completed(self, arg: Completed) -> Self::HandleCompletedFut {
+            self.sender.send(arg.value).unwrap();
             ready(Ok(End))
         }
     }
 
-    struct ListeningState;
+    struct ListeningState {
+        multiple: usize,
+    }
 
     impl Listening for ListeningState {
         type HandleRegisterListening = ListeningState;
         type HandleRegisterWorking = WorkingState;
         type HandleRegisterFut = Ready<Result<(ListeningState, WorkingState)>>;
-        fn handle_register(self, _arg: Register) -> Self::HandleRegisterFut {
-            ready(Ok((self, WorkingState)))
+        fn handle_register(self, arg: Register) -> Self::HandleRegisterFut {
+            let multiple = self.multiple;
+            ready(Ok((
+                self,
+                WorkingState {
+                    factor: arg.factor,
+                    multiple,
+                },
+            )))
         }
     }
 
-    struct WorkingState;
+    struct WorkingState {
+        factor: usize,
+        multiple: usize,
+    }
 
     impl Working for WorkingState {
         type OnSendCompletedFut = Ready<Result<(End, Completed)>>;
         fn on_send_completed(self) -> Self::OnSendCompletedFut {
-            ready(Ok((End, Completed { value: 42 })))
+            let value = self.multiple * self.factor;
+            ready(Ok((End, Completed { value })))
         }
     }
 
@@ -505,7 +533,7 @@ mod client_callback {
     }
 
     impl<Init: Unregistered> ClientHandle<Init> {
-        async fn send_register(&self, to: ServiceName, mut msg: Register) -> Result<()> {
+        async fn send_register(&self, to: ServiceAddr, mut msg: Register) -> Result<()> {
             let mut guard = self.state.lock().await;
             let state = guard
                 .take()
@@ -528,19 +556,19 @@ mod client_callback {
         }
     }
 
-    async fn start_client<Init>(init: Init, runtime: &'static Runtime) -> ClientHandle<Init>
+    async fn spawn_client<Init>(init: Init, runtime: &'static Runtime) -> ClientHandle<Init>
     where
         Init: 'static + Unregistered,
     {
         let state = Arc::new(Mutex::new(Some(ClientState::Unregistered(init))));
         let name = {
             let state = state.clone();
-            runtime.activate(move |_, mut mailbox, _act_id| async move {
+            runtime.spawn(move |_, mut mailbox, _act_id| async move {
                 while let Some(envelope) = mailbox.recv().await {
                     let mut guard = state.lock().await;
                     let state = guard.take()
                         .unwrap_or_else(|| panic!("Logic error. The state was not returned."));
-                    let (msg, _replier, _from) = envelope.split();
+                    let (msg, _kind) = envelope.split();
                     let new_state = match (state, msg) {
                         (ClientState::Registered(curr_state), ClientCallbackMsgs::Completed(msg)) => {
                             match curr_state.handle_completed(msg).await {
@@ -567,9 +595,14 @@ mod client_callback {
         }
     }
 
-    async fn start_server<Init>(init: Init, runtime: &'static Runtime) -> ActorName
+    async fn register_server<Init, F>(
+        make_init: F,
+        runtime: &'static Runtime,
+        service_id: ServiceId,
+    ) -> Result<ServiceName>
     where
         Init: 'static + Listening<HandleRegisterListening = Init>,
+        F: 'static + Send + Sync + Clone + Fn() -> Init,
     {
         enum ServerState<S: Listening> {
             Listening(S),
@@ -583,35 +616,61 @@ mod client_callback {
             }
         }
 
-        runtime
-            .activate(move |_, mut mailbox, _act_id| async move {
-                let mut state = ServerState::Listening(init);
-                while let Some(envelope) = mailbox.recv().await {
-                    let (msg, _replier, from) = envelope.split();
-                    let new_state = match (state, msg) {
-                        (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
-                            match curr_state.handle_register(msg).await {
-                                Ok((new_state, working_state)) => {
+        async fn server_loop<Init, F>(
+            runtime: &'static Runtime,
+            make_init: F,
+            mut mailbox: Mailbox<ClientCallbackMsgs>,
+            _act_id: Uuid,
+        ) where
+            Init: 'static + Listening<HandleRegisterListening = Init>,
+            F: 'static + Send + Sync + Fn() -> Init,
+        {
+            let mut state = ServerState::Listening(make_init());
+            while let Some(envelope) = mailbox.recv().await {
+                let (msg, msg_kind) = envelope.split();
+                let new_state = match (state, msg) {
+                    (ServerState::Listening(curr_state), ClientCallbackMsgs::Register(msg)) => {
+                        match curr_state.handle_register(msg).await {
+                            Ok((new_state, working_state)) => {
+                                if let EnvelopeKind::Send { from, .. } = msg_kind {
                                     start_worker(working_state, from, runtime).await;
-                                    ServerState::Listening(new_state)
-                                }
-                                Err(err) => {
-                                    log::error!("Failed to handle the Register message: {err}");
-                                    todo!("Need to recover the previous state from err.")
+                                } else {
+                                    log::error!("Expected Register to be a Send message.");
                                 }
+                                ServerState::Listening(new_state)
+                            }
+                            Err(err) => {
+                                log::error!("Failed to handle the Register message: {err}");
+                                todo!("Need to recover the previous state from err.")
                             }
                         }
-                        (state, msg) => {
-                            log::error!(
-                                "Unexpected message '{}' in state '{}'.",
-                                msg.name(),
-                                state.name()
-                            );
-                            state
-                        }
-                    };
-                    state = new_state;
-                }
+                    }
+                    (state, msg) => {
+                        log::error!(
+                            "Unexpected message '{}' in state '{}'.",
+                            msg.name(),
+                            state.name()
+                        );
+                        state
+                    }
+                };
+                state = new_state;
+            }
+        }
+
+        runtime
+            .register::<ClientCallbackMsgs, _>(service_id, move |runtime: &'static Runtime| {
+                let make_init = make_init.clone();
+                let fut = async move {
+                    let make_init = make_init.clone();
+                    let actor_name = runtime
+                        .spawn(move |_, mailbox, act_id| {
+                            server_loop(runtime, make_init, mailbox, act_id)
+                        })
+                        .await;
+                    Ok(actor_name)
+                };
+                Box::pin(fut)
             })
             .await
     }
@@ -629,7 +688,7 @@ mod client_callback {
         }
 
         runtime
-            .activate::<ClientCallbackMsgs, _, _>(move |_, _, act_id| async move {
+            .spawn::<ClientCallbackMsgs, _, _>(move |_, _, act_id| async move {
                 let msg = match init.on_send_completed().await {
                     Ok((End, msg)) => msg,
                     Err(err) => {
@@ -645,4 +704,31 @@ mod client_callback {
             })
             .await
     }
+
+    #[test]
+    fn client_callback_protocol() {
+        ASYNC_RT.block_on(async {
+            const SERVICE_ID: &str = "ClientCallbackProtocolListening";
+            let service_id = ServiceId::from(SERVICE_ID);
+            let service_name = {
+                let make_init = move || ListeningState { multiple: 2 };
+                register_server(make_init, &RUNTIME, service_id.clone())
+                    .await
+                    .unwrap()
+            };
+            let (sender, receiver) = oneshot::channel();
+            let client_handle = spawn_client(UnregisteredState { sender }, &RUNTIME).await;
+            let service_addr = ServiceAddr::new(service_name, false);
+            client_handle
+                .send_register(service_addr, Register { factor: 21 })
+                .await
+                .unwrap();
+            let value = timeout(Duration::from_millis(500), receiver)
+                .await
+                .unwrap()
+                .unwrap();
+
+            assert_eq!(42, value);
+        });
+    }
 }