|
@@ -13,8 +13,8 @@ use std::{
|
|
};
|
|
};
|
|
|
|
|
|
use btlib::{bterr, crypto::Creds, error::StringError, BlockPath, Result};
|
|
use btlib::{bterr, crypto::Creds, error::StringError, BlockPath, Result};
|
|
-use bttp::{DeserCallback, MsgCallback, Receiver, Replier, Transmitter};
|
|
|
|
use btserde::{field_helpers::smart_ptr, from_slice, to_vec, write_to};
|
|
use btserde::{field_helpers::smart_ptr, from_slice, to_vec, write_to};
|
|
|
|
+use bttp::{DeserCallback, MsgCallback, Receiver, Replier, Transmitter};
|
|
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
|
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
|
use tokio::{
|
|
use tokio::{
|
|
sync::{mpsc, oneshot, Mutex, RwLock},
|
|
sync::{mpsc, oneshot, Mutex, RwLock},
|
|
@@ -177,7 +177,7 @@ impl Runtime {
|
|
let act_name = self.actor_name(act_id);
|
|
let act_name = self.actor_name(act_id);
|
|
let (tx, rx) = mpsc::channel::<Envelope<Msg>>(MAILBOX_LIMIT);
|
|
let (tx, rx) = mpsc::channel::<Envelope<Msg>>(MAILBOX_LIMIT);
|
|
// The deliverer closure is responsible for deserializing messages received over the wire
|
|
// The deliverer closure is responsible for deserializing messages received over the wire
|
|
- // and delivering them to the actor's mailbox and sending replies to call messages.
|
|
|
|
|
|
+ // and delivering them to the actor's mailbox, and sending replies to call messages.
|
|
let deliverer = {
|
|
let deliverer = {
|
|
let buffer = Arc::new(Mutex::new(Vec::<u8>::new()));
|
|
let buffer = Arc::new(Mutex::new(Vec::<u8>::new()));
|
|
let tx = tx.clone();
|
|
let tx = tx.clone();
|
|
@@ -286,6 +286,19 @@ impl Display for RuntimeError {
|
|
|
|
|
|
impl std::error::Error for RuntimeError {}
|
|
impl std::error::Error for RuntimeError {}
|
|
|
|
|
|
|
|
+#[allow(dead_code)]
|
|
|
|
+/// Represents the terminal state of an actor, where it stops processing messages and halts.
|
|
|
|
+struct End;
|
|
|
|
+
|
|
|
|
+#[allow(dead_code)]
|
|
|
|
+/// Delivered to an actor implementation when it starts up.
|
|
|
|
+pub struct Activate {
|
|
|
|
+ /// A reference to the `Runtime` which is running this actor.
|
|
|
|
+ rt: &'static Runtime,
|
|
|
|
+ /// The ID assigned to this actor.
|
|
|
|
+ act_id: Uuid,
|
|
|
|
+}
|
|
|
|
+
|
|
/// Deserializes replies sent over the wire.
|
|
/// Deserializes replies sent over the wire.
|
|
struct ReplyCallback<T> {
|
|
struct ReplyCallback<T> {
|
|
_phantom: PhantomData<T>,
|
|
_phantom: PhantomData<T>,
|
|
@@ -466,6 +479,10 @@ pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync {
|
|
/// Trait for messages which expect exactly zero replies.
|
|
/// Trait for messages which expect exactly zero replies.
|
|
pub trait SendMsg: CallMsg {}
|
|
pub trait SendMsg: CallMsg {}
|
|
|
|
|
|
|
|
+/// A type used to express when a reply is not expected for a message type.
|
|
|
|
+#[derive(Serialize, Deserialize)]
|
|
|
|
+enum NoReply {}
|
|
|
|
+
|
|
/// The maximum number of messages which can be kept in an actor's mailbox.
|
|
/// The maximum number of messages which can be kept in an actor's mailbox.
|
|
const MAILBOX_LIMIT: usize = 32;
|
|
const MAILBOX_LIMIT: usize = 32;
|
|
|
|
|
|
@@ -504,10 +521,8 @@ impl<'de> WireEnvelope<'de> {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-/// Wraps a message to indicate if it was sent with `call` or `send`.
|
|
|
|
-///
|
|
|
|
-/// If the message was sent with call, then this enum will contain a channel that can be used to
|
|
|
|
-/// reply to it.
|
|
|
|
|
|
+/// Wrapper around a message type `T` which indicates who the message is from and, if the message
|
|
|
|
+/// was dispatched with `call`, provides a channel to reply to it.
|
|
pub struct Envelope<T: CallMsg> {
|
|
pub struct Envelope<T: CallMsg> {
|
|
from: ActorName,
|
|
from: ActorName,
|
|
reply: Option<oneshot::Sender<T::Reply>>,
|
|
reply: Option<oneshot::Sender<T::Reply>>,
|
|
@@ -656,8 +671,8 @@ mod tests {
|
|
log::BuilderExt,
|
|
log::BuilderExt,
|
|
};
|
|
};
|
|
use btlib_tests::TEST_STORE;
|
|
use btlib_tests::TEST_STORE;
|
|
- use bttp::BlockAddr;
|
|
|
|
use btserde::to_vec;
|
|
use btserde::to_vec;
|
|
|
|
+ use bttp::BlockAddr;
|
|
use ctor::ctor;
|
|
use ctor::ctor;
|
|
use lazy_static::lazy_static;
|
|
use lazy_static::lazy_static;
|
|
use std::{
|
|
use std::{
|
|
@@ -788,37 +803,46 @@ mod tests {
|
|
assert_eq!(0, LOCAL_RT.num_running().await);
|
|
assert_eq!(0, LOCAL_RT.num_running().await);
|
|
}
|
|
}
|
|
|
|
|
|
- struct Activate {
|
|
|
|
- rt: &'static Runtime,
|
|
|
|
- act_id: Uuid,
|
|
|
|
- }
|
|
|
|
|
|
+ // The following code is a proof-of-concept for what types should be generated for a
|
|
|
|
+ // simple ping-pong protocol:
|
|
|
|
+ //
|
|
|
|
+ // protocol! {
|
|
|
|
+ // ClientInit?Activate -> SentPing, Listening!Ping
|
|
|
|
+ // ServerInit?Activate -> Listening
|
|
|
|
+ // Listening?Ping -> End, SentPing!Ping::Reply
|
|
|
|
+ // SentPing?Ping::Reply -> End
|
|
|
|
+ // }
|
|
|
|
+ //
|
|
|
|
+ // In words, the protocol is described as follows.
|
|
|
|
+ // 1. The ClientInit state receives the Activate message. It returns the SentPing state and a
|
|
|
|
+ // Ping message to be sent to the Listening state.
|
|
|
|
+ // 2. The ServerInit state receives the Activate message. It returns the Listening state.
|
|
|
|
+ // 3. When the Listening state receives the Ping message it returns the End state and a
|
|
|
|
+ // Ping::Reply message to be sent to the SentPing state.
|
|
|
|
+ // 4. When the SentPing state receives the Ping::Reply message it returns the End state.
|
|
|
|
+ //
|
|
|
|
+ // The End state represents an end to the session described by the protocol. When an actor
|
|
|
|
+ // transitions to the End state its function returns.
|
|
|
|
+ // The generated actor implementation is the sender of the Activate message.
|
|
|
|
+ // When a state is expecting a Reply message, an error occurs if the message is not received
|
|
|
|
+ // in a timely manner.
|
|
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
#[derive(Serialize, Deserialize)]
|
|
struct Ping;
|
|
struct Ping;
|
|
impl CallMsg for Ping {
|
|
impl CallMsg for Ping {
|
|
- type Reply = ();
|
|
|
|
|
|
+ type Reply = PingReply;
|
|
}
|
|
}
|
|
- impl SendMsg for Ping {}
|
|
|
|
|
|
|
|
|
|
+ // I was tempted to name this "Pong", but the proc macro wouldn't think to do that.
|
|
#[derive(Serialize, Deserialize)]
|
|
#[derive(Serialize, Deserialize)]
|
|
- struct Pong;
|
|
|
|
- impl CallMsg for Pong {
|
|
|
|
- type Reply = ();
|
|
|
|
- }
|
|
|
|
- impl SendMsg for Pong {}
|
|
|
|
|
|
+ struct PingReply;
|
|
|
|
|
|
trait ClientInit {
|
|
trait ClientInit {
|
|
type AfterActivate: SentPing;
|
|
type AfterActivate: SentPing;
|
|
- type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
|
|
|
|
|
|
+ type HandleActivateFut: Future<Output = Result<(Self::AfterActivate, Ping)>>;
|
|
fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
|
|
fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
|
|
}
|
|
}
|
|
|
|
|
|
- trait SentPing {
|
|
|
|
- type AfterPong: Returned;
|
|
|
|
- type HandlePongFut: Future<Output = Result<Self::AfterPong>>;
|
|
|
|
- fn handle_pong(self, msg: Envelope<Pong>) -> Self::HandlePongFut;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
trait ServerInit {
|
|
trait ServerInit {
|
|
type AfterActivate: Listening;
|
|
type AfterActivate: Listening;
|
|
type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
|
|
type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
|
|
@@ -826,109 +850,77 @@ mod tests {
|
|
}
|
|
}
|
|
|
|
|
|
trait Listening {
|
|
trait Listening {
|
|
- type AfterPing: Returned;
|
|
|
|
- type HandlePingFut: Future<Output = Result<Self::AfterPing>>;
|
|
|
|
- fn handle_ping(self, msg: Envelope<Ping>) -> Self::HandlePingFut;
|
|
|
|
|
|
+ type HandlePingFut: Future<Output = Result<(End, PingReply)>>;
|
|
|
|
+ fn handle_ping(self, msg: Ping) -> Self::HandlePingFut;
|
|
}
|
|
}
|
|
|
|
|
|
- trait Returned {}
|
|
|
|
-
|
|
|
|
- struct ReturnedState;
|
|
|
|
-
|
|
|
|
- impl Returned for ReturnedState {}
|
|
|
|
-
|
|
|
|
- trait PingProtocol {
|
|
|
|
- type Client: ClientInit;
|
|
|
|
- type Server: ServerInit;
|
|
|
|
|
|
+ trait SentPing {
|
|
|
|
+ type HandleReplyFut: Future<Output = Result<End>>;
|
|
|
|
+ fn handle_reply(self, msg: PingReply) -> Self::HandleReplyFut;
|
|
}
|
|
}
|
|
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
#[derive(Serialize, Deserialize)]
|
|
enum PingProtocolMsg {
|
|
enum PingProtocolMsg {
|
|
Ping(Ping),
|
|
Ping(Ping),
|
|
- Pong(Pong),
|
|
|
|
|
|
+ PingReply(PingReply),
|
|
}
|
|
}
|
|
impl CallMsg for PingProtocolMsg {
|
|
impl CallMsg for PingProtocolMsg {
|
|
- type Reply = ();
|
|
|
|
|
|
+ type Reply = PingProtocolMsg;
|
|
}
|
|
}
|
|
impl SendMsg for PingProtocolMsg {}
|
|
impl SendMsg for PingProtocolMsg {}
|
|
|
|
|
|
- #[allow(dead_code)]
|
|
|
|
- enum PingClientState {
|
|
|
|
- Init(ClientInitState),
|
|
|
|
- SentPing(ClientState),
|
|
|
|
- Returned(ReturnedState),
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- #[allow(dead_code)]
|
|
|
|
- enum PingServerState {
|
|
|
|
- ServerInit(ServerInitState),
|
|
|
|
- Listening(ServerState),
|
|
|
|
- Returned(ReturnedState),
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- struct ClientInitState {
|
|
|
|
- server_name: ActorName,
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- struct ClientState;
|
|
|
|
|
|
+ struct ClientInitState;
|
|
|
|
|
|
impl ClientInit for ClientInitState {
|
|
impl ClientInit for ClientInitState {
|
|
type AfterActivate = ClientState;
|
|
type AfterActivate = ClientState;
|
|
- type HandleActivateFut = impl Future<Output = Result<Self::AfterActivate>>;
|
|
|
|
- fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut {
|
|
|
|
- async move {
|
|
|
|
- let from = msg.rt.actor_name(msg.act_id);
|
|
|
|
- // TODO: This implementation should not know about PingProtocolMsg. It should be
|
|
|
|
- // able to send Ping directly.
|
|
|
|
- msg.rt
|
|
|
|
- .send(self.server_name, from, PingProtocolMsg::Ping(Ping))
|
|
|
|
- .await?;
|
|
|
|
- Ok(ClientState)
|
|
|
|
- }
|
|
|
|
|
|
+ type HandleActivateFut = impl Future<Output = Result<(Self::AfterActivate, Ping)>>;
|
|
|
|
+ fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
|
|
|
|
+ ready(Ok((ClientState, Ping)))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ struct ClientState;
|
|
|
|
+
|
|
impl SentPing for ClientState {
|
|
impl SentPing for ClientState {
|
|
- type AfterPong = ReturnedState;
|
|
|
|
- type HandlePongFut = Ready<Result<Self::AfterPong>>;
|
|
|
|
- fn handle_pong(self, _msg: Envelope<Pong>) -> Self::HandlePongFut {
|
|
|
|
- ready(Ok(ReturnedState))
|
|
|
|
|
|
+ type HandleReplyFut = Ready<Result<End>>;
|
|
|
|
+ fn handle_reply(self, _msg: PingReply) -> Self::HandleReplyFut {
|
|
|
|
+ ready(Ok(End))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ #[allow(dead_code)]
|
|
|
|
+ enum PingClientState {
|
|
|
|
+ Init(ClientInitState),
|
|
|
|
+ SentPing(ClientState),
|
|
|
|
+ End(End),
|
|
|
|
+ }
|
|
|
|
+
|
|
struct ServerInitState;
|
|
struct ServerInitState;
|
|
|
|
|
|
- struct ServerState {
|
|
|
|
- rt: &'static Runtime,
|
|
|
|
- act_id: Uuid,
|
|
|
|
- }
|
|
|
|
|
|
+ struct ServerState;
|
|
|
|
|
|
impl ServerInit for ServerInitState {
|
|
impl ServerInit for ServerInitState {
|
|
type AfterActivate = ServerState;
|
|
type AfterActivate = ServerState;
|
|
type HandleActivateFut = Ready<Result<Self::AfterActivate>>;
|
|
type HandleActivateFut = Ready<Result<Self::AfterActivate>>;
|
|
- fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut {
|
|
|
|
- ready(Ok(ServerState {
|
|
|
|
- rt: msg.rt,
|
|
|
|
- act_id: msg.act_id,
|
|
|
|
- }))
|
|
|
|
|
|
+ fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
|
|
|
|
+ ready(Ok(ServerState))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
impl Listening for ServerState {
|
|
impl Listening for ServerState {
|
|
- type AfterPing = ReturnedState;
|
|
|
|
- type HandlePingFut = impl Future<Output = Result<Self::AfterPing>>;
|
|
|
|
- fn handle_ping(self, msg: Envelope<Ping>) -> Self::HandlePingFut {
|
|
|
|
- async move {
|
|
|
|
- let to = msg.from;
|
|
|
|
- let from = self.rt.actor_name(self.act_id);
|
|
|
|
- // TODO: This implementation should not know about PingProtocolMsg. It should be
|
|
|
|
- // able to send Pong directly.
|
|
|
|
- self.rt.send(to, from, PingProtocolMsg::Pong(Pong)).await?;
|
|
|
|
- Ok(ReturnedState)
|
|
|
|
- }
|
|
|
|
|
|
+ type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
|
|
|
|
+ fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
|
|
|
|
+ ready(Ok((End, PingReply)))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ #[allow(dead_code)]
|
|
|
|
+ enum PingServerState {
|
|
|
|
+ ServerInit(ServerInitState),
|
|
|
|
+ Listening(ServerState),
|
|
|
|
+ End(End),
|
|
|
|
+ }
|
|
|
|
+
|
|
async fn ping_server(
|
|
async fn ping_server(
|
|
counter: Arc<AtomicU8>,
|
|
counter: Arc<AtomicU8>,
|
|
rt: &'static Runtime,
|
|
rt: &'static Runtime,
|
|
@@ -941,20 +933,22 @@ mod tests {
|
|
PingServerState::Listening(state)
|
|
PingServerState::Listening(state)
|
|
};
|
|
};
|
|
while let Some(envelope) = mailbox.recv().await {
|
|
while let Some(envelope) = mailbox.recv().await {
|
|
- let (msg, replier, from) = envelope.split();
|
|
|
|
|
|
+ let (msg, replier, _from) = envelope.split();
|
|
match (state, msg) {
|
|
match (state, msg) {
|
|
(PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => {
|
|
(PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => {
|
|
- let envelope = Envelope::new(msg, replier, from);
|
|
|
|
- state = PingServerState::Returned(
|
|
|
|
- listening_state.handle_ping(envelope).await.unwrap(),
|
|
|
|
- );
|
|
|
|
|
|
+ let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
|
|
|
|
+ state = PingServerState::End(new_state);
|
|
|
|
+ if let Err(_) = replier.unwrap().send(PingProtocolMsg::PingReply(reply)) {
|
|
|
|
+ panic!("Failed to send Ping reply.");
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- _ => {
|
|
|
|
- log::error!("Ping protocol violation.");
|
|
|
|
- break;
|
|
|
|
|
|
+ (_prev_state, _) => {
|
|
|
|
+ panic!("Ping protocol violation.");
|
|
|
|
+ // A real implementation should assign the previous state and log the error.
|
|
|
|
+ // state = prev_state;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- if let PingServerState::Returned(_) = state {
|
|
|
|
|
|
+ if let PingServerState::End(_) = state {
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -965,30 +959,20 @@ mod tests {
|
|
counter: Arc<AtomicU8>,
|
|
counter: Arc<AtomicU8>,
|
|
server_name: ActorName,
|
|
server_name: ActorName,
|
|
rt: &'static Runtime,
|
|
rt: &'static Runtime,
|
|
- mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
|
|
|
|
|
|
+ _mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
|
|
act_id: Uuid,
|
|
act_id: Uuid,
|
|
) {
|
|
) {
|
|
- let mut state = {
|
|
|
|
- let init = ClientInitState { server_name };
|
|
|
|
- let state = init.handle_activate(Activate { rt, act_id }).await.unwrap();
|
|
|
|
- PingClientState::SentPing(state)
|
|
|
|
- };
|
|
|
|
- while let Some(envelope) = mailbox.recv().await {
|
|
|
|
- let (msg, replier, from) = envelope.split();
|
|
|
|
- match (state, msg) {
|
|
|
|
- (PingClientState::SentPing(curr_state), PingProtocolMsg::Pong(msg)) => {
|
|
|
|
- let envelope = Envelope::new(msg, replier, from);
|
|
|
|
- state =
|
|
|
|
- PingClientState::Returned(curr_state.handle_pong(envelope).await.unwrap());
|
|
|
|
- }
|
|
|
|
- _ => {
|
|
|
|
- log::error!("Ping protocol violation.");
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if let PingClientState::Returned(_) = state {
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
|
|
+ let init = ClientInitState;
|
|
|
|
+ let (state, msg) = init.handle_activate(Activate { rt, act_id }).await.unwrap();
|
|
|
|
+ let from = rt.actor_name(act_id);
|
|
|
|
+ let reply = rt
|
|
|
|
+ .call(server_name, from, PingProtocolMsg::Ping(msg))
|
|
|
|
+ .await
|
|
|
|
+ .unwrap();
|
|
|
|
+ if let PingProtocolMsg::PingReply(msg) = reply {
|
|
|
|
+ state.handle_reply(msg).await.unwrap();
|
|
|
|
+ } else {
|
|
|
|
+ panic!("Incorrect message type sent in reply to Ping.");
|
|
}
|
|
}
|
|
counter.fetch_sub(1, Ordering::SeqCst);
|
|
counter.fetch_sub(1, Ordering::SeqCst);
|
|
}
|
|
}
|
|
@@ -1017,10 +1001,29 @@ mod tests {
|
|
while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline {
|
|
while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline {
|
|
tokio::time::sleep(Duration::from_millis(20)).await;
|
|
tokio::time::sleep(Duration::from_millis(20)).await;
|
|
}
|
|
}
|
|
|
|
+ // Check that both tasks finished successfully and we didn't just timeout.
|
|
|
|
+ assert_eq!(0, counter.load(Ordering::SeqCst));
|
|
|
|
|
|
// TODO: Should actor which return be removed from the runtime automatically?
|
|
// TODO: Should actor which return be removed from the runtime automatically?
|
|
RUNTIME.take(&server_name).await.unwrap();
|
|
RUNTIME.take(&server_name).await.unwrap();
|
|
RUNTIME.take(&client_name).await.unwrap();
|
|
RUNTIME.take(&client_name).await.unwrap();
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
|
|
|
|
+ // example in the survey paper "Behavioral Types in Programming Languages."
|
|
|
|
+ //
|
|
|
|
+ // protocol! {
|
|
|
|
+ // CustomerInit?Activate -> Choosing
|
|
|
|
+ // AgencyInit?Activate -> Listening
|
|
|
|
+ // Choosing?Choice -> Choosing, Listening!Query|Accept|Reject
|
|
|
|
+ // Listening?Query -> Listening, Choosing!Query::Reply
|
|
|
|
+ // Choosing?Query::Reply -> Choosing
|
|
|
|
+ // Listening?Accept -> End, Choosing!Accept::Reply
|
|
|
|
+ // Choosing?Accept::Reply -> End
|
|
|
|
+ // Listening?Reject -> End, Choosing!Reject:Reply
|
|
|
|
+ // Choosing?Reject::Reply -> End
|
|
|
|
+ // }
|
|
|
|
+ //
|
|
|
|
+ // The Choice message is from the runtime itself. It represents receiving input from a user.
|
|
}
|
|
}
|