|
@@ -1,8 +1,5 @@
|
|
|
#![feature(impl_trait_in_assoc_type)]
|
|
|
|
|
|
-pub mod fs_proto;
|
|
|
-pub mod sector_proto;
|
|
|
-
|
|
|
use std::{
|
|
|
any::Any,
|
|
|
collections::HashMap,
|
|
@@ -32,10 +29,10 @@ use uuid::Uuid;
|
|
|
macro_rules! declare_runtime {
|
|
|
($name:ident, $ip_addr:expr, $creds:expr) => {
|
|
|
::lazy_static::lazy_static! {
|
|
|
- static ref $name: &'static Runtime = {
|
|
|
+ static ref $name: &'static ::btrun::Runtime = {
|
|
|
::lazy_static::lazy_static! {
|
|
|
- static ref RUNTIME: Runtime = Runtime::_new($creds).unwrap();
|
|
|
- static ref RECEIVER: Receiver = _new_receiver($ip_addr, $creds, &*RUNTIME);
|
|
|
+ static ref RUNTIME: ::btrun::Runtime = ::btrun::Runtime::_new($creds).unwrap();
|
|
|
+ static ref RECEIVER: ::bttp::Receiver = _new_receiver($ip_addr, $creds, &*RUNTIME);
|
|
|
}
|
|
|
// By dereferencing RECEIVER we ensure it is started.
|
|
|
let _ = &*RECEIVER;
|
|
@@ -289,9 +286,15 @@ impl Display for RuntimeError {
|
|
|
|
|
|
impl std::error::Error for RuntimeError {}
|
|
|
|
|
|
-#[allow(dead_code)]
|
|
|
/// Represents the terminal state of an actor, where it stops processing messages and halts.
|
|
|
-struct End;
|
|
|
+pub struct End;
|
|
|
+
|
|
|
+impl End {
|
|
|
+ /// Returns the identifier for this type which is expected in protocol definitions.
|
|
|
+ pub fn ident() -> &'static str {
|
|
|
+ stringify!(End)
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
#[allow(dead_code)]
|
|
|
/// Delivered to an actor implementation when it starts up.
|
|
@@ -302,19 +305,36 @@ pub struct Activate {
|
|
|
act_id: Uuid,
|
|
|
}
|
|
|
|
|
|
+impl Activate {
|
|
|
+ pub fn new(rt: &'static Runtime, act_id: Uuid) -> Self {
|
|
|
+ Self { rt, act_id }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns the identifier expected for this message in protocol definitions.
|
|
|
+ pub fn ident() -> &'static str {
|
|
|
+ stringify!(Activate)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
/// Deserializes replies sent over the wire.
|
|
|
-struct ReplyCallback<T> {
|
|
|
+pub struct ReplyCallback<T> {
|
|
|
_phantom: PhantomData<T>,
|
|
|
}
|
|
|
|
|
|
impl<T: CallMsg> ReplyCallback<T> {
|
|
|
- fn new() -> Self {
|
|
|
+ pub fn new() -> Self {
|
|
|
Self {
|
|
|
_phantom: PhantomData,
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+impl<T: CallMsg> Default for ReplyCallback<T> {
|
|
|
+ fn default() -> Self {
|
|
|
+ Self::new()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
impl<T: CallMsg> DeserCallback for ReplyCallback<T> {
|
|
|
type Arg<'de> = WireReply<'de> where T: 'de;
|
|
|
type Return = Result<T::Reply>;
|
|
@@ -491,12 +511,18 @@ const MAILBOX_LIMIT: usize = 32;
|
|
|
|
|
|
/// The type of messages sent over the wire between runtimes.
|
|
|
#[derive(Serialize, Deserialize)]
|
|
|
-struct WireMsg<'a> {
|
|
|
+pub struct WireMsg<'a> {
|
|
|
to: ActorName,
|
|
|
from: ActorName,
|
|
|
payload: &'a [u8],
|
|
|
}
|
|
|
|
|
|
+impl<'a> WireMsg<'a> {
|
|
|
+ pub fn new(to: ActorName, from: ActorName, payload: &'a [u8]) -> Self {
|
|
|
+ Self { to, from, payload }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
impl<'a> bttp::CallMsg<'a> for WireMsg<'a> {
|
|
|
type Reply<'r> = WireReply<'r>;
|
|
|
}
|
|
@@ -504,7 +530,7 @@ impl<'a> bttp::CallMsg<'a> for WireMsg<'a> {
|
|
|
impl<'a> bttp::SendMsg<'a> for WireMsg<'a> {}
|
|
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
|
-enum WireReply<'a> {
|
|
|
+pub enum WireReply<'a> {
|
|
|
Ok(&'a [u8]),
|
|
|
Err(&'a str),
|
|
|
}
|
|
@@ -664,379 +690,3 @@ impl Drop for ActorHandle {
|
|
|
self.abort();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-#[cfg(test)]
|
|
|
-mod tests {
|
|
|
- use super::*;
|
|
|
-
|
|
|
- use btlib::{
|
|
|
- crypto::{ConcreteCreds, CredStore, CredsPriv},
|
|
|
- log::BuilderExt,
|
|
|
- };
|
|
|
- use btlib_tests::TEST_STORE;
|
|
|
- use btproto::protocol;
|
|
|
- use btserde::to_vec;
|
|
|
- use bttp::BlockAddr;
|
|
|
- use ctor::ctor;
|
|
|
- use lazy_static::lazy_static;
|
|
|
- use std::{
|
|
|
- net::{IpAddr, Ipv4Addr},
|
|
|
- sync::atomic::{AtomicU8, Ordering},
|
|
|
- time::{Duration, Instant},
|
|
|
- };
|
|
|
- use tokio::runtime::Builder;
|
|
|
-
|
|
|
- const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
|
|
|
- lazy_static! {
|
|
|
- static ref RUNTIME_CREDS: Arc<ConcreteCreds> = TEST_STORE.node_creds().unwrap();
|
|
|
- }
|
|
|
- declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
|
|
|
-
|
|
|
- lazy_static! {
|
|
|
- /// A tokio async runtime.
|
|
|
- ///
|
|
|
- /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
|
|
|
- /// is created for each test
|
|
|
- /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
|
|
|
- /// This creates a problem, because the first test thread to access the `RUNTIME` static
|
|
|
- /// will initialize its `Receiver` in its runtime, which will stop running at the end of
|
|
|
- /// the test. Hence subsequent tests will not be able to send remote messages to this
|
|
|
- /// `Runtime`.
|
|
|
- ///
|
|
|
- /// By creating a single async runtime which is used by all of the tests, we can avoid this
|
|
|
- /// problem.
|
|
|
- static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread()
|
|
|
- .enable_all()
|
|
|
- .build()
|
|
|
- .unwrap();
|
|
|
- }
|
|
|
-
|
|
|
- /// The log level to use when running tests.
|
|
|
- const LOG_LEVEL: &str = "warn";
|
|
|
-
|
|
|
- #[ctor]
|
|
|
- fn ctor() {
|
|
|
- std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
|
|
|
- env_logger::Builder::from_default_env().btformat().init();
|
|
|
- }
|
|
|
-
|
|
|
- #[derive(Serialize, Deserialize)]
|
|
|
- struct EchoMsg(String);
|
|
|
-
|
|
|
- impl CallMsg for EchoMsg {
|
|
|
- type Reply = EchoMsg;
|
|
|
- }
|
|
|
-
|
|
|
- async fn echo(
|
|
|
- _rt: &'static Runtime,
|
|
|
- mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
|
|
|
- _act_id: Uuid,
|
|
|
- ) {
|
|
|
- while let Some(envelope) = mailbox.recv().await {
|
|
|
- let (msg, replier, ..) = envelope.split();
|
|
|
- if let Some(replier) = replier {
|
|
|
- if let Err(_) = replier.send(msg) {
|
|
|
- panic!("failed to send reply");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- #[test]
|
|
|
- fn local_call() {
|
|
|
- ASYNC_RT.block_on(async {
|
|
|
- const EXPECTED: &str = "hello";
|
|
|
- let name = RUNTIME.activate(echo).await;
|
|
|
- let from = ActorName::new(name.path().clone(), Uuid::default());
|
|
|
-
|
|
|
- let reply = RUNTIME
|
|
|
- .call(name.clone(), from, EchoMsg(EXPECTED.into()))
|
|
|
- .await
|
|
|
- .unwrap();
|
|
|
-
|
|
|
- assert_eq!(EXPECTED, reply.0);
|
|
|
-
|
|
|
- RUNTIME.take(&name).await.unwrap();
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- #[test]
|
|
|
- fn remote_call() {
|
|
|
- ASYNC_RT.block_on(async {
|
|
|
- const EXPECTED: &str = "hello";
|
|
|
- let actor_name = RUNTIME.activate(echo).await;
|
|
|
- let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
|
|
|
- let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
|
|
|
- let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
|
|
|
- .await
|
|
|
- .unwrap();
|
|
|
- let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
|
|
|
- let wire_msg = WireMsg {
|
|
|
- to: actor_name.clone(),
|
|
|
- from: RUNTIME.actor_name(Uuid::default()),
|
|
|
- payload: &buf,
|
|
|
- };
|
|
|
-
|
|
|
- let reply = transmitter
|
|
|
- .call(wire_msg, ReplyCallback::<EchoMsg>::new())
|
|
|
- .await
|
|
|
- .unwrap()
|
|
|
- .unwrap();
|
|
|
-
|
|
|
- assert_eq!(EXPECTED, reply.0);
|
|
|
-
|
|
|
- RUNTIME.take(&actor_name).await.unwrap();
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- /// Tests the `num_running` method.
|
|
|
- ///
|
|
|
- /// This test uses its own runtime and so can use the `#[tokio::test]` attribute.
|
|
|
- #[tokio::test]
|
|
|
- async fn num_running() {
|
|
|
- declare_runtime!(
|
|
|
- LOCAL_RT,
|
|
|
- // This needs to be different from the address where `RUNTIME` is listening.
|
|
|
- IpAddr::from([127, 0, 0, 2]),
|
|
|
- TEST_STORE.node_creds().unwrap()
|
|
|
- );
|
|
|
- assert_eq!(0, LOCAL_RT.num_running().await);
|
|
|
- let name = LOCAL_RT.activate(echo).await;
|
|
|
- assert_eq!(1, LOCAL_RT.num_running().await);
|
|
|
- LOCAL_RT.take(&name).await.unwrap();
|
|
|
- assert_eq!(0, LOCAL_RT.num_running().await);
|
|
|
- }
|
|
|
-
|
|
|
- // The following code is a proof-of-concept for what types should be generated for a
|
|
|
- // simple ping-pong protocol:
|
|
|
- //
|
|
|
- protocol! {
|
|
|
- let name = PingPongProtocol;
|
|
|
- let states = [
|
|
|
- ClientInit, SentPing,
|
|
|
- ServerInit, Listening,
|
|
|
- ];
|
|
|
- ClientInit?Activate -> SentPing, >service(Listening)!Ping;
|
|
|
- ServerInit?Activate -> Listening;
|
|
|
- Listening?Ping -> End, >SentPing!Ping::Reply;
|
|
|
- SentPing?Ping::Reply -> End;
|
|
|
- }
|
|
|
- //
|
|
|
- // In words, the protocol is described as follows.
|
|
|
- // 1. The ClientInit state receives the Activate message. It returns the SentPing state and a
|
|
|
- // Ping message to be sent to the Listening state.
|
|
|
- // 2. The ServerInit state receives the Activate message. It returns the Listening state.
|
|
|
- // 3. When the Listening state receives the Ping message it returns the End state and a
|
|
|
- // Ping::Reply message to be sent to the SentPing state.
|
|
|
- // 4. When the SentPing state receives the Ping::Reply message it returns the End state.
|
|
|
- //
|
|
|
- // The End state represents an end to the session described by the protocol. When an actor
|
|
|
- // transitions to the End state its function returns.
|
|
|
- // The generated actor implementation is the sender of the Activate message.
|
|
|
- // When a state is expecting a Reply message, an error occurs if the message is not received
|
|
|
- // in a timely manner.
|
|
|
-
|
|
|
- #[derive(Serialize, Deserialize)]
|
|
|
- struct Ping;
|
|
|
- impl CallMsg for Ping {
|
|
|
- type Reply = PingReply;
|
|
|
- }
|
|
|
-
|
|
|
- // I was tempted to name this "Pong", but the proc macro wouldn't think to do that.
|
|
|
- #[derive(Serialize, Deserialize)]
|
|
|
- struct PingReply;
|
|
|
-
|
|
|
- trait ClientInit {
|
|
|
- type AfterActivate: SentPing;
|
|
|
- type HandleActivateFut: Future<Output = Result<(Self::AfterActivate, Ping)>>;
|
|
|
- fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
|
|
|
- }
|
|
|
-
|
|
|
- trait ServerInit {
|
|
|
- type AfterActivate: Listening;
|
|
|
- type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
|
|
|
- fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
|
|
|
- }
|
|
|
-
|
|
|
- trait Listening {
|
|
|
- type HandlePingFut: Future<Output = Result<(End, PingReply)>>;
|
|
|
- fn handle_ping(self, msg: Ping) -> Self::HandlePingFut;
|
|
|
- }
|
|
|
-
|
|
|
- trait SentPing {
|
|
|
- type HandleReplyFut: Future<Output = Result<End>>;
|
|
|
- fn handle_reply(self, msg: PingReply) -> Self::HandleReplyFut;
|
|
|
- }
|
|
|
-
|
|
|
- #[derive(Serialize, Deserialize)]
|
|
|
- enum PingProtocolMsg {
|
|
|
- Ping(Ping),
|
|
|
- PingReply(PingReply),
|
|
|
- }
|
|
|
- impl CallMsg for PingProtocolMsg {
|
|
|
- type Reply = PingProtocolMsg;
|
|
|
- }
|
|
|
- impl SendMsg for PingProtocolMsg {}
|
|
|
-
|
|
|
- struct ClientInitState;
|
|
|
-
|
|
|
- impl ClientInit for ClientInitState {
|
|
|
- type AfterActivate = ClientState;
|
|
|
- type HandleActivateFut = impl Future<Output = Result<(Self::AfterActivate, Ping)>>;
|
|
|
- fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
|
|
|
- ready(Ok((ClientState, Ping)))
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- struct ClientState;
|
|
|
-
|
|
|
- impl SentPing for ClientState {
|
|
|
- type HandleReplyFut = Ready<Result<End>>;
|
|
|
- fn handle_reply(self, _msg: PingReply) -> Self::HandleReplyFut {
|
|
|
- ready(Ok(End))
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- #[allow(dead_code)]
|
|
|
- enum PingClientState {
|
|
|
- Init(ClientInitState),
|
|
|
- SentPing(ClientState),
|
|
|
- End(End),
|
|
|
- }
|
|
|
-
|
|
|
- struct ServerInitState;
|
|
|
-
|
|
|
- struct ServerState;
|
|
|
-
|
|
|
- impl ServerInit for ServerInitState {
|
|
|
- type AfterActivate = ServerState;
|
|
|
- type HandleActivateFut = Ready<Result<Self::AfterActivate>>;
|
|
|
- fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
|
|
|
- ready(Ok(ServerState))
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- impl Listening for ServerState {
|
|
|
- type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
|
|
|
- fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
|
|
|
- ready(Ok((End, PingReply)))
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- #[allow(dead_code)]
|
|
|
- enum PingServerState {
|
|
|
- ServerInit(ServerInitState),
|
|
|
- Listening(ServerState),
|
|
|
- End(End),
|
|
|
- }
|
|
|
-
|
|
|
- async fn ping_server(
|
|
|
- counter: Arc<AtomicU8>,
|
|
|
- rt: &'static Runtime,
|
|
|
- mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
|
|
|
- act_id: Uuid,
|
|
|
- ) {
|
|
|
- let mut state = {
|
|
|
- let init = ServerInitState;
|
|
|
- let state = init.handle_activate(Activate { rt, act_id }).await.unwrap();
|
|
|
- PingServerState::Listening(state)
|
|
|
- };
|
|
|
- while let Some(envelope) = mailbox.recv().await {
|
|
|
- let (msg, replier, _from) = envelope.split();
|
|
|
- match (state, msg) {
|
|
|
- (PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => {
|
|
|
- let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
|
|
|
- state = PingServerState::End(new_state);
|
|
|
- if let Err(_) = replier.unwrap().send(PingProtocolMsg::PingReply(reply)) {
|
|
|
- panic!("Failed to send Ping reply.");
|
|
|
- }
|
|
|
- }
|
|
|
- (_prev_state, _) => {
|
|
|
- panic!("Ping protocol violation.");
|
|
|
- // A real implementation should assign the previous state and log the error.
|
|
|
- // state = prev_state;
|
|
|
- }
|
|
|
- }
|
|
|
- if let PingServerState::End(_) = state {
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- counter.fetch_sub(1, Ordering::SeqCst);
|
|
|
- }
|
|
|
-
|
|
|
- async fn ping_client(
|
|
|
- counter: Arc<AtomicU8>,
|
|
|
- server_name: ActorName,
|
|
|
- rt: &'static Runtime,
|
|
|
- _mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
|
|
|
- act_id: Uuid,
|
|
|
- ) {
|
|
|
- let init = ClientInitState;
|
|
|
- let (state, msg) = init.handle_activate(Activate { rt, act_id }).await.unwrap();
|
|
|
- let from = rt.actor_name(act_id);
|
|
|
- let reply = rt
|
|
|
- .call(server_name, from, PingProtocolMsg::Ping(msg))
|
|
|
- .await
|
|
|
- .unwrap();
|
|
|
- if let PingProtocolMsg::PingReply(msg) = reply {
|
|
|
- state.handle_reply(msg).await.unwrap();
|
|
|
- } else {
|
|
|
- panic!("Incorrect message type sent in reply to Ping.");
|
|
|
- }
|
|
|
- counter.fetch_sub(1, Ordering::SeqCst);
|
|
|
- }
|
|
|
-
|
|
|
- #[test]
|
|
|
- fn ping_pong_test() {
|
|
|
- ASYNC_RT.block_on(async {
|
|
|
- let counter = Arc::new(AtomicU8::new(2));
|
|
|
- let server_name = {
|
|
|
- let counter = counter.clone();
|
|
|
- RUNTIME
|
|
|
- .activate(move |rt, mailbox, act_id| ping_server(counter, rt, mailbox, act_id))
|
|
|
- .await
|
|
|
- };
|
|
|
- let client_name = {
|
|
|
- let server_name = server_name.clone();
|
|
|
- let counter = counter.clone();
|
|
|
- RUNTIME
|
|
|
- .activate(move |rt, mailbox, act_id| {
|
|
|
- ping_client(counter, server_name, rt, mailbox, act_id)
|
|
|
- })
|
|
|
- .await
|
|
|
- };
|
|
|
-
|
|
|
- let deadline = Instant::now() + Duration::from_millis(500);
|
|
|
- while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline {
|
|
|
- tokio::time::sleep(Duration::from_millis(20)).await;
|
|
|
- }
|
|
|
- // Check that both tasks finished successfully and we didn't just timeout.
|
|
|
- assert_eq!(0, counter.load(Ordering::SeqCst));
|
|
|
-
|
|
|
- // TODO: Should actor which return be removed from the runtime automatically?
|
|
|
- RUNTIME.take(&server_name).await.unwrap();
|
|
|
- RUNTIME.take(&client_name).await.unwrap();
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
- // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
|
|
|
- // example in the survey paper "Behavioral Types in Programming Languages."
|
|
|
- // Note that the Choosing state can send messages at any time, not just in response to another
|
|
|
- // message because there is a transition from Choosing that doesn't use the receive operator
|
|
|
- // (`?`).
|
|
|
- protocol! {
|
|
|
- let name = TravelAgency;
|
|
|
- let states = [
|
|
|
- AgencyInit, Listening,
|
|
|
- Choosing,
|
|
|
- ];
|
|
|
- AgencyInit?Activate -> Listening;
|
|
|
- Choosing -> Choosing, >service(Listening)!Query, service(Listening)!Accept, service(Listening)!Reject;
|
|
|
- Listening?Query -> Listening, >Choosing!Query::Reply;
|
|
|
- Choosing?Query::Reply -> Choosing;
|
|
|
- Listening?Accept -> End, >Choosing!Accept::Reply;
|
|
|
- Choosing?Accept::Reply -> End;
|
|
|
- Listening?Reject -> End, >Choosing!Reject::Reply;
|
|
|
- Choosing?Reject::Reply -> End;
|
|
|
- }
|
|
|
-}
|