@@ -29,27 +29,27 @@ use uuid::Uuid;
macro_rules! declare_runtime {
($name:ident, $ip_addr:expr, $creds:expr) => {
::lazy_static::lazy_static! {
- static ref $name: Runtime = _new_runtime($ip_addr, $creds).unwrap();
+ static ref $name: &'static Runtime = {
+ ::lazy_static::lazy_static! {
+ static ref RUNTIME: Runtime = Runtime::_new($creds).unwrap();
+ static ref RECEIVER: Receiver = _new_receiver($ip_addr, $creds, &*RUNTIME);
+ }
+ // By dereferencing RECEIVER we ensure it is started.
+ let _ = &*RECEIVER;
+ };
-/// This function is not intended to be called directly by downstream crates. Use the macro
-/// [declare_runtime] to create a [Runtime] instead.
-pub fn _new_runtime<C: 'static + Send + Sync + Creds>(
- ip_addr: IpAddr,
- creds: Arc<C>,
-) -> Result<Runtime> {
- let path = Arc::new(creds.bind_path()?);
- let handles = Arc::new(RwLock::new(HashMap::new()));
- let callback = RuntimeCallback::new(handles.clone());
- let rx = Receiver::new(ip_addr, creds, callback)?;
- Ok(Runtime {
- _rx: rx,
- path,
- handles,
- peers: RwLock::new(HashMap::new()),
- })
+/// This function is not intended to be called by downstream crates.
+pub fn _new_receiver<C>(ip_addr: IpAddr, creds: Arc<C>, runtime: &'static Runtime) -> Receiver
+ C: 'static + Send + Sync + Creds,
+ let callback = RuntimeCallback::new(runtime);
+ Receiver::new(ip_addr, creds, callback).unwrap()
/// An actor runtime.
@@ -59,13 +59,26 @@ pub fn _new_runtime<C: 'static + Send + Sync + Creds>(
/// recipient. If a reply is needed, then `call` can be used, which returns a future that will
/// be ready when the reply has been received.
pub struct Runtime {
- _rx: Receiver,
path: Arc<BlockPath>,
- handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>,
+ handles: RwLock<HashMap<Uuid, ActorHandle>>,
peers: RwLock<HashMap<Arc<BlockPath>, Transmitter>>,
impl Runtime {
+ /// This method is not intended to be called directly by downstream crates. Use the macro
+ /// [declare_runtime] to create a [Runtime].
+ ///
+ /// If you create a non-static [Runtime], your process will panic when it is dropped.
+ #[doc(hidden)]
+ pub fn _new<C: 'static + Send + Sync + Creds>(creds: Arc<C>) -> Result<Runtime> {
+ let path = Arc::new(creds.bind_path()?);
+ Ok(Runtime {
+ path,
+ handles: RwLock::new(HashMap::new()),
+ peers: RwLock::new(HashMap::new()),
+ })
+ }
pub fn path(&self) -> &Arc<BlockPath> {
@@ -79,14 +92,14 @@ impl Runtime {
/// Sends a message to the actor identified by the given [ActorName].
pub async fn send<T: 'static + SendMsg>(
- to: &ActorName,
- from: Uuid,
+ to: ActorName,
+ from: ActorName,
msg: T,
) -> Result<()> {
if to.path == self.path {
let guard = self.handles.read().await;
if let Some(handle) = guard.get(&to.act_id) {
- handle.send(msg).await
+ handle.send(from, msg).await
} else {
Err(bterr!("invalid actor name"))
@@ -95,7 +108,7 @@ impl Runtime {
if let Some(peer) = guard.get(&to.path) {
let buf = to_vec(&msg)?;
let wire_msg = WireMsg {
- to: to.act_id,
+ to,
payload: &buf,
@@ -112,14 +125,14 @@ impl Runtime {
/// is ready when a reply has been received.
pub async fn call<T: 'static + CallMsg>(
- to: &ActorName,
- from: Uuid,
+ to: ActorName,
+ from: ActorName,
msg: T,
) -> Result<T::Reply> {
if to.path == self.path {
let guard = self.handles.read().await;
if let Some(handle) = guard.get(&to.act_id) {
- handle.call_through(msg).await
+ handle.call_through(from, msg).await
} else {
Err(bterr!("invalid actor name"))
@@ -128,7 +141,7 @@ impl Runtime {
if let Some(peer) = guard.get(&to.path) {
let buf = to_vec(&msg)?;
let wire_msg = WireMsg {
- to: to.act_id,
+ to,
payload: &buf,
@@ -161,21 +174,24 @@ impl Runtime {
+ let act_name = self.actor_name(act_id);
let (tx, rx) = mpsc::channel::<Envelope<Msg>>(MAILBOX_LIMIT);
// The deliverer closure is responsible for deserializing messages received over the wire
// and delivering them to the actor's mailbox and sending replies to call messages.
let deliverer = {
let buffer = Arc::new(Mutex::new(Vec::<u8>::new()));
let tx = tx.clone();
+ let act_name = act_name.clone();
move |envelope: WireEnvelope| {
let (wire_msg, replier) = envelope.into_parts();
let result = from_slice(wire_msg.payload);
let buffer = buffer.clone();
let tx = tx.clone();
+ let act_name = act_name.clone();
let fut: FutureResult = Box::pin(async move {
let msg = result?;
if let Some(mut replier) = replier {
- let (envelope, rx) = Envelope::call(msg);
+ let (envelope, rx) = Envelope::new_call(act_name, msg);
tx.send(envelope).await.map_err(|_| {
bterr!("failed to deliver message. Recipient may have halted.")
@@ -190,9 +206,11 @@ impl Runtime {
Err(err) => replier.reply_err(err.to_string(), None).await,
} else {
- tx.send(Envelope::Send { msg }).await.map_err(|_| {
- bterr!("failed to deliver message. Recipient may have halted.")
- })
+ tx.send(Envelope::new_send(act_name, msg))
+ .await
+ .map_err(|_| {
+ bterr!("failed to deliver message. Recipient may have halted.")
+ })
@@ -201,7 +219,7 @@ impl Runtime {
let handle = tokio::task::spawn(activator(self, rx, act_id));
let actor_handle = ActorHandle::new(handle, tx, deliverer);
guard.insert(act_id, actor_handle);
- ActorName::new(self.path.clone(), act_id)
+ act_name
/// Registers an actor as a service with the given [ServiceId].
@@ -240,6 +258,11 @@ impl Runtime {
+ /// Returns the name of the actor in this runtime with the given actor ID.
+ pub fn actor_name(&self, act_id: Uuid) -> ActorName {
+ ActorName::new(self.path.clone(), act_id)
+ }
impl Drop for Runtime {
@@ -289,15 +312,73 @@ impl<T: CallMsg> DeserCallback for ReplyCallback<T> {
+struct SendReplyCallback {
+ replier: Option<Replier>,
+impl SendReplyCallback {
+ fn new(replier: Replier) -> Self {
+ Self {
+ replier: Some(replier),
+ }
+ }
+impl DeserCallback for SendReplyCallback {
+ type Arg<'de> = WireReply<'de>;
+ type Return = Result<()>;
+ type CallFut<'de> = impl 'de + Future<Output = Self::Return>;
+ fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
+ async move {
+ if let Some(mut replier) = self.replier.take() {
+ replier.reply(arg).await
+ } else {
+ Ok(())
+ }
+ }
+ }
/// This struct implements the server callback for network messages.
struct RuntimeCallback {
- handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>,
+ rt: &'static Runtime,
impl RuntimeCallback {
- fn new(handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>) -> Self {
- Self { handles }
+ fn new(rt: &'static Runtime) -> Self {
+ Self { rt }
+ }
+ async fn deliver_local(&self, msg: WireMsg<'_>, replier: Option<Replier>) -> Result<()> {
+ let guard = self.rt.handles.read().await;
+ if let Some(handle) = guard.get(&msg.to.act_id) {
+ let envelope = if let Some(replier) = replier {
+ WireEnvelope::Call { msg, replier }
+ } else {
+ WireEnvelope::Send { msg }
+ };
+ (handle.deliverer)(envelope).await
+ } else {
+ Err(bterr!("invalid actor name: {}", msg.to))
+ }
+ }
+ async fn route_msg(&self, msg: WireMsg<'_>, replier: Option<Replier>) -> Result<()> {
+ let guard = self.rt.peers.read().await;
+ if let Some(tx) = guard.get(msg.to.path()) {
+ if let Some(replier) = replier {
+ let callback = SendReplyCallback::new(replier);
+ tx.call(msg, callback).await?
+ } else {
+ tx.send(msg).await
+ }
+ } else {
+ Err(bterr!(
+ "unable to deliver message to peer at '{}'",
+ msg.to.path()
+ ))
+ }
@@ -307,16 +388,10 @@ impl MsgCallback for RuntimeCallback {
fn call<'de>(&'de self, arg: btmsg::MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de> {
async move {
let (_, body, replier) = arg.into_parts();
- let guard = self.handles.read().await;
- if let Some(handle) = guard.get(&body.to) {
- let envelope = if let Some(replier) = replier {
- WireEnvelope::Call { msg: body, replier }
- } else {
- WireEnvelope::Send { msg: body }
- };
- (handle.deliverer)(envelope).await
+ if body.to.path() == self.rt.path() {
+ self.deliver_local(body, replier).await
} else {
- Err(bterr!("invalid actor ID: {}", body.to))
+ self.route_msg(body, replier).await
@@ -366,6 +441,14 @@ impl ActorName {
pub fn new(path: Arc<BlockPath>, act_id: Uuid) -> Self {
Self { path, act_id }
+ pub fn path(&self) -> &Arc<BlockPath> {
+ &self.path
+ }
+ pub fn act_id(&self) -> Uuid {
+ self.act_id
+ }
impl Display for ActorName {
@@ -389,8 +472,8 @@ const MAILBOX_LIMIT: usize = 32;
/// The type of messages sent over the wire between runtimes.
#[derive(Serialize, Deserialize)]
struct WireMsg<'a> {
- to: Uuid,
- from: Uuid,
+ to: ActorName,
+ from: ActorName,
payload: &'a [u8],
@@ -425,26 +508,70 @@ impl<'de> WireEnvelope<'de> {
/// If the message was sent with call, then this enum will contain a channel that can be used to
/// reply to it.
-pub enum Envelope<T: CallMsg> {
- /// The message was sent with `send` and does not expect a reply.
- Send { msg: T },
- /// The message was sent with `call` and expects a reply.
- Call {
- msg: T,
- /// A reply message must be sent using this channel.
- reply: oneshot::Sender<T::Reply>,
- },
+pub struct Envelope<T: CallMsg> {
+ from: ActorName,
+ reply: Option<oneshot::Sender<T::Reply>>,
+ msg: T,
impl<T: CallMsg> Envelope<T> {
- fn send(msg: T) -> Self {
- Self::Send { msg }
+ pub fn new(msg: T, reply: Option<oneshot::Sender<T::Reply>>, from: ActorName) -> Self {
+ Self { from, reply, msg }
- fn call(msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
+ /// Creates a new envelope containing the given message which does not expect a reply.
+ fn new_send(from: ActorName, msg: T) -> Self {
+ Self {
+ from,
+ msg,
+ reply: None,
+ }
+ }
+ /// Creates a new envelope containing the given message which expects exactly one reply.
+ fn new_call(from: ActorName, msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
let (tx, rx) = oneshot::channel::<T::Reply>();
- let kind = Envelope::Call { msg, reply: tx };
- (kind, rx)
+ let envelope = Self {
+ from,
+ msg,
+ reply: Some(tx),
+ };
+ (envelope, rx)
+ }
+ /// Returns the name of the actor which sent this message.
+ pub fn from(&self) -> &ActorName {
+ &self.from
+ }
+ /// Returns a reference to the message in this envelope.
+ pub fn msg(&self) -> &T {
+ &self.msg
+ }
+ /// Sends a reply to this message.
+ ///
+ /// If this message is not expecting a reply, or if this message has already been replied to,
+ /// then an error is returned.
+ pub fn reply(&mut self, reply: T::Reply) -> Result<()> {
+ if let Some(tx) = self.reply.take() {
+ if tx.send(reply).is_ok() {
+ Ok(())
+ } else {
+ Err(bterr!("failed to send reply"))
+ }
+ } else {
+ Err(bterr!("reply already sent"))
+ }
+ }
+ /// Returns true if this message expects a reply and it has not already been replied to.
+ pub fn needs_reply(&self) -> bool {
+ self.reply.is_some()
+ }
+ pub fn split(self) -> (T, Option<oneshot::Sender<T::Reply>>, ActorName) {
+ (self.msg, self.reply, self.from)
@@ -475,18 +602,23 @@ impl ActorHandle {
.ok_or_else(|| bterr!("unexpected message type"))
- pub async fn send<T: 'static + SendMsg>(&self, msg: T) -> Result<()> {
+ /// Sends a message to the actor represented by this handle.
+ pub async fn send<T: 'static + SendMsg>(&self, from: ActorName, msg: T) -> Result<()> {
let sender = self.sender()?;
- .send(Envelope::send(msg))
+ .send(Envelope::new_send(from, msg))
.map_err(|_| bterr!("failed to enqueue message"))?;
- pub async fn call_through<T: 'static + CallMsg>(&self, msg: T) -> Result<T::Reply> {
+ pub async fn call_through<T: 'static + CallMsg>(
+ &self,
+ from: ActorName,
+ msg: T,
+ ) -> Result<T::Reply> {
let sender = self.sender()?;
- let (envelope, rx) = Envelope::call(msg);
+ let (envelope, rx) = Envelope::new_call(from, msg);
@@ -528,7 +660,11 @@ mod tests {
use btserde::to_vec;
use ctor::ctor;
use lazy_static::lazy_static;
- use std::net::{IpAddr, Ipv4Addr};
+ use std::{
+ net::{IpAddr, Ipv4Addr},
+ sync::atomic::{AtomicU8, Ordering},
+ time::{Duration, Instant},
+ };
use tokio::runtime::Builder;
const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
@@ -577,9 +713,10 @@ mod tests {
mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
_act_id: Uuid,
) {
- while let Some(msg) = mailbox.recv().await {
- if let Envelope::Call { msg, reply } = msg {
- if let Err(_) = reply.send(msg) {
+ while let Some(envelope) = mailbox.recv().await {
+ let (msg, replier, ..) = envelope.split();
+ if let Some(replier) = replier {
+ if let Err(_) = replier.send(msg) {
panic!("failed to send reply");
@@ -591,9 +728,10 @@ mod tests {
ASYNC_RT.block_on(async {
const EXPECTED: &str = "hello";
let name = RUNTIME.activate(echo).await;
+ let from = ActorName::new(name.path().clone(), Uuid::default());
let reply = RUNTIME
- .call(&name, Uuid::default(), EchoMsg(EXPECTED.into()))
+ .call(name.clone(), from, EchoMsg(EXPECTED.into()))
@@ -615,8 +753,8 @@ mod tests {
let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
let wire_msg = WireMsg {
- to: actor_name.act_id,
- from: Uuid::default(),
+ to: actor_name.clone(),
+ from: RUNTIME.actor_name(Uuid::default()),
payload: &buf,
@@ -649,4 +787,240 @@ mod tests {
assert_eq!(0, LOCAL_RT.num_running().await);
+ struct Activate {
+ rt: &'static Runtime,
+ act_id: Uuid,
+ }
+ #[derive(Serialize, Deserialize)]
+ struct Ping;
+ impl CallMsg for Ping {
+ type Reply = ();
+ }
+ impl SendMsg for Ping {}
+ #[derive(Serialize, Deserialize)]
+ struct Pong;
+ impl CallMsg for Pong {
+ type Reply = ();
+ }
+ impl SendMsg for Pong {}
+ trait ClientInit {
+ type AfterActivate: SentPing;
+ type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
+ fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
+ }
+ trait SentPing {
+ type AfterPong: Returned;
+ type HandlePongFut: Future<Output = Result<Self::AfterPong>>;
+ fn handle_pong(self, msg: Envelope<Pong>) -> Self::HandlePongFut;
+ }
+ trait ServerInit {
+ type AfterActivate: Listening;
+ type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
+ fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
+ }
+ trait Listening {
+ type AfterPing: Returned;
+ type HandlePingFut: Future<Output = Result<Self::AfterPing>>;
+ fn handle_ping(self, msg: Envelope<Ping>) -> Self::HandlePingFut;
+ }
+ trait Returned {}
+ struct ReturnedState;
+ impl Returned for ReturnedState {}
+ trait PingProtocol {
+ type Client: ClientInit;
+ type Server: ServerInit;
+ }
+ #[derive(Serialize, Deserialize)]
+ enum PingProtocolMsg {
+ Ping(Ping),
+ Pong(Pong),
+ }
+ impl CallMsg for PingProtocolMsg {
+ type Reply = ();
+ }
+ impl SendMsg for PingProtocolMsg {}
+ #[allow(dead_code)]
+ enum PingClientState {
+ Init(ClientInitState),
+ SentPing(ClientState),
+ Returned(ReturnedState),
+ }
+ #[allow(dead_code)]
+ enum PingServerState {
+ ServerInit(ServerInitState),
+ Listening(ServerState),
+ Returned(ReturnedState),
+ }
+ struct ClientInitState {
+ server_name: ActorName,
+ }
+ struct ClientState;
+ impl ClientInit for ClientInitState {
+ type AfterActivate = ClientState;
+ type HandleActivateFut = impl Future<Output = Result<Self::AfterActivate>>;
+ fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut {
+ async move {
+ let from = msg.rt.actor_name(msg.act_id);
+ // TODO: This implementation should not know about PingProtocolMsg. It should be
+ // able to send Ping directly.
+ msg.rt
+ .send(self.server_name, from, PingProtocolMsg::Ping(Ping))
+ .await?;
+ Ok(ClientState)
+ }
+ }
+ }
+ impl SentPing for ClientState {
+ type AfterPong = ReturnedState;
+ type HandlePongFut = Ready<Result<Self::AfterPong>>;
+ fn handle_pong(self, _msg: Envelope<Pong>) -> Self::HandlePongFut {
+ ready(Ok(ReturnedState))
+ }
+ }
+ struct ServerInitState;
+ struct ServerState {
+ rt: &'static Runtime,
+ act_id: Uuid,
+ }
+ impl ServerInit for ServerInitState {
+ type AfterActivate = ServerState;
+ type HandleActivateFut = Ready<Result<Self::AfterActivate>>;
+ fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut {
+ ready(Ok(ServerState {
+ rt: msg.rt,
+ act_id: msg.act_id,
+ }))
+ }
+ }
+ impl Listening for ServerState {
+ type AfterPing = ReturnedState;
+ type HandlePingFut = impl Future<Output = Result<Self::AfterPing>>;
+ fn handle_ping(self, msg: Envelope<Ping>) -> Self::HandlePingFut {
+ async move {
+ let to = msg.from;
+ let from = self.rt.actor_name(self.act_id);
+ // TODO: This implementation should not know about PingProtocolMsg. It should be
+ // able to send Pong directly.
+ self.rt.send(to, from, PingProtocolMsg::Pong(Pong)).await?;
+ Ok(ReturnedState)
+ }
+ }
+ }
+ async fn ping_server(
+ counter: Arc<AtomicU8>,
+ rt: &'static Runtime,
+ mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
+ act_id: Uuid,
+ ) {
+ let mut state = {
+ let init = ServerInitState;
+ let state = init.handle_activate(Activate { rt, act_id }).await.unwrap();
+ PingServerState::Listening(state)
+ };
+ while let Some(envelope) = mailbox.recv().await {
+ let (msg, replier, from) = envelope.split();
+ match (state, msg) {
+ (PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => {
+ let envelope = Envelope::new(msg, replier, from);
+ state = PingServerState::Returned(
+ listening_state.handle_ping(envelope).await.unwrap(),
+ );
+ }
+ _ => {
+ log::error!("Ping protocol violation.");
+ break;
+ }
+ }
+ if let PingServerState::Returned(_) = state {
+ break;
+ }
+ }
+ counter.fetch_sub(1, Ordering::SeqCst);
+ }
+ async fn ping_client(
+ counter: Arc<AtomicU8>,
+ server_name: ActorName,
+ rt: &'static Runtime,
+ mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
+ act_id: Uuid,
+ ) {
+ let mut state = {
+ let init = ClientInitState { server_name };
+ let state = init.handle_activate(Activate { rt, act_id }).await.unwrap();
+ PingClientState::SentPing(state)
+ };
+ while let Some(envelope) = mailbox.recv().await {
+ let (msg, replier, from) = envelope.split();
+ match (state, msg) {
+ (PingClientState::SentPing(curr_state), PingProtocolMsg::Pong(msg)) => {
+ let envelope = Envelope::new(msg, replier, from);
+ state =
+ PingClientState::Returned(curr_state.handle_pong(envelope).await.unwrap());
+ }
+ _ => {
+ log::error!("Ping protocol violation.");
+ break;
+ }
+ }
+ if let PingClientState::Returned(_) = state {
+ break;
+ }
+ }
+ counter.fetch_sub(1, Ordering::SeqCst);
+ }
+ #[test]
+ fn ping_pong_test() {
+ ASYNC_RT.block_on(async {
+ let counter = Arc::new(AtomicU8::new(2));
+ let server_name = {
+ let counter = counter.clone();
+ .activate(move |rt, mailbox, act_id| ping_server(counter, rt, mailbox, act_id))
+ .await
+ };
+ let client_name = {
+ let server_name = server_name.clone();
+ let counter = counter.clone();
+ .activate(move |rt, mailbox, act_id| {
+ ping_client(counter, server_name, rt, mailbox, act_id)
+ })
+ .await
+ };
+ let deadline = Instant::now() + Duration::from_millis(500);
+ while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline {
+ tokio::time::sleep(Duration::from_millis(20)).await;
+ }
+ // TODO: Should actor which return be removed from the runtime automatically?
+ RUNTIME.take(&server_name).await.unwrap();
+ RUNTIME.take(&client_name).await.unwrap();
+ });
+ }