1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042 |
- #![feature(impl_trait_in_assoc_type)]
- pub mod sector_proto;
- pub mod fs_proto;
- use std::{
- any::Any,
- collections::HashMap,
- fmt::Display,
- future::{ready, Future, Ready},
- marker::PhantomData,
- net::IpAddr,
- ops::DerefMut,
- pin::Pin,
- sync::Arc,
- };
- use btlib::{bterr, crypto::Creds, error::StringError, BlockPath, Result};
- use btserde::{field_helpers::smart_ptr, from_slice, to_vec, write_to};
- use bttp::{DeserCallback, MsgCallback, Receiver, Replier, Transmitter};
- use serde::{de::DeserializeOwned, Deserialize, Serialize};
- use tokio::{
- sync::{mpsc, oneshot, Mutex, RwLock},
- task::JoinHandle,
- };
- use uuid::Uuid;
- /// Declares a new [Runtime] which listens for messages at the given IP address and uses the given
- /// [Creds]. Runtimes are intended to be created once in a process's lifetime and continue running
- /// until the process exits.
- #[macro_export]
- macro_rules! declare_runtime {
- ($name:ident, $ip_addr:expr, $creds:expr) => {
- ::lazy_static::lazy_static! {
- static ref $name: &'static Runtime = {
- ::lazy_static::lazy_static! {
- static ref RUNTIME: Runtime = Runtime::_new($creds).unwrap();
- static ref RECEIVER: Receiver = _new_receiver($ip_addr, $creds, &*RUNTIME);
- }
- // By dereferencing RECEIVER we ensure it is started.
- let _ = &*RECEIVER;
- &*RUNTIME
- };
- }
- };
- }
- /// This function is not intended to be called by downstream crates.
- #[doc(hidden)]
- pub fn _new_receiver<C>(ip_addr: IpAddr, creds: Arc<C>, runtime: &'static Runtime) -> Receiver
- where
- C: 'static + Send + Sync + Creds,
- {
- let callback = RuntimeCallback::new(runtime);
- Receiver::new(ip_addr, creds, callback).unwrap()
- }
- /// An actor runtime.
- ///
- /// Actors can be activated by the runtime and execute autonomously until they return. Running
- /// actors can be sent messages using the `send` method, which does not wait for a response from the
- /// recipient. If a reply is needed, then `call` can be used, which returns a future that will
- /// be ready when the reply has been received.
- pub struct Runtime {
- path: Arc<BlockPath>,
- handles: RwLock<HashMap<Uuid, ActorHandle>>,
- peers: RwLock<HashMap<Arc<BlockPath>, Transmitter>>,
- }
- impl Runtime {
- /// This method is not intended to be called directly by downstream crates. Use the macro
- /// [declare_runtime] to create a [Runtime].
- ///
- /// If you create a non-static [Runtime], your process will panic when it is dropped.
- #[doc(hidden)]
- pub fn _new<C: 'static + Send + Sync + Creds>(creds: Arc<C>) -> Result<Runtime> {
- let path = Arc::new(creds.bind_path()?);
- Ok(Runtime {
- path,
- handles: RwLock::new(HashMap::new()),
- peers: RwLock::new(HashMap::new()),
- })
- }
- pub fn path(&self) -> &Arc<BlockPath> {
- &self.path
- }
- /// Returns the number of actors that are currently executing in this [Runtime].
- pub async fn num_running(&self) -> usize {
- let guard = self.handles.read().await;
- guard.len()
- }
- /// Sends a message to the actor identified by the given [ActorName].
- pub async fn send<T: 'static + SendMsg>(
- &self,
- to: ActorName,
- from: ActorName,
- msg: T,
- ) -> Result<()> {
- if to.path == self.path {
- let guard = self.handles.read().await;
- if let Some(handle) = guard.get(&to.act_id) {
- handle.send(from, msg).await
- } else {
- Err(bterr!("invalid actor name"))
- }
- } else {
- let guard = self.peers.read().await;
- if let Some(peer) = guard.get(&to.path) {
- let buf = to_vec(&msg)?;
- let wire_msg = WireMsg {
- to,
- from,
- payload: &buf,
- };
- peer.send(wire_msg).await
- } else {
- // TODO: Use the filesystem to discover the address of the recipient and connect to
- // it.
- todo!()
- }
- }
- }
- /// Sends a message to the actor identified by the given [ActorName] and returns a future which
- /// is ready when a reply has been received.
- pub async fn call<T: 'static + CallMsg>(
- &self,
- to: ActorName,
- from: ActorName,
- msg: T,
- ) -> Result<T::Reply> {
- if to.path == self.path {
- let guard = self.handles.read().await;
- if let Some(handle) = guard.get(&to.act_id) {
- handle.call_through(from, msg).await
- } else {
- Err(bterr!("invalid actor name"))
- }
- } else {
- let guard = self.peers.read().await;
- if let Some(peer) = guard.get(&to.path) {
- let buf = to_vec(&msg)?;
- let wire_msg = WireMsg {
- to,
- from,
- payload: &buf,
- };
- peer.call(wire_msg, ReplyCallback::<T>::new()).await?
- } else {
- // TODO: Use the filesystem to discover the address of the recipient and connect to
- // it.
- todo!()
- }
- }
- }
- /// Resolves the given [ServiceName] to an [ActorName] which is part of it.
- pub async fn resolve<'a>(&'a self, _service: &ServiceName) -> Result<ActorName> {
- todo!()
- }
- /// Activates a new actor using the given activator function and returns a handle to it.
- pub async fn activate<Msg, F, Fut>(&'static self, activator: F) -> ActorName
- where
- Msg: 'static + CallMsg,
- Fut: 'static + Send + Future<Output = ()>,
- F: FnOnce(&'static Runtime, mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
- {
- let mut guard = self.handles.write().await;
- let act_id = {
- let mut act_id = Uuid::new_v4();
- while guard.contains_key(&act_id) {
- act_id = Uuid::new_v4();
- }
- act_id
- };
- let act_name = self.actor_name(act_id);
- let (tx, rx) = mpsc::channel::<Envelope<Msg>>(MAILBOX_LIMIT);
- // The deliverer closure is responsible for deserializing messages received over the wire
- // and delivering them to the actor's mailbox, and sending replies to call messages.
- let deliverer = {
- let buffer = Arc::new(Mutex::new(Vec::<u8>::new()));
- let tx = tx.clone();
- let act_name = act_name.clone();
- move |envelope: WireEnvelope| {
- let (wire_msg, replier) = envelope.into_parts();
- let result = from_slice(wire_msg.payload);
- let buffer = buffer.clone();
- let tx = tx.clone();
- let act_name = act_name.clone();
- let fut: FutureResult = Box::pin(async move {
- let msg = result?;
- if let Some(mut replier) = replier {
- let (envelope, rx) = Envelope::new_call(act_name, msg);
- tx.send(envelope).await.map_err(|_| {
- bterr!("failed to deliver message. Recipient may have halted.")
- })?;
- match rx.await {
- Ok(reply) => {
- let mut guard = buffer.lock().await;
- guard.clear();
- write_to(&reply, guard.deref_mut())?;
- let wire_reply = WireReply::Ok(&guard);
- replier.reply(wire_reply).await
- }
- Err(err) => replier.reply_err(err.to_string(), None).await,
- }
- } else {
- tx.send(Envelope::new_send(act_name, msg))
- .await
- .map_err(|_| {
- bterr!("failed to deliver message. Recipient may have halted.")
- })
- }
- });
- fut
- }
- };
- let handle = tokio::task::spawn(activator(self, rx, act_id));
- let actor_handle = ActorHandle::new(handle, tx, deliverer);
- guard.insert(act_id, actor_handle);
- act_name
- }
- /// Registers an actor as a service with the given [ServiceId].
- pub async fn register<Msg, Fut, F, G>(
- &self,
- _id: ServiceId,
- _activator: F,
- _deserializer: G,
- ) -> Result<()>
- where
- Msg: 'static + CallMsg,
- Fut: 'static + Send + Future<Output = ()>,
- F: Fn(mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
- G: 'static + Send + Sync + Fn(&[u8]) -> Result<Msg>,
- {
- todo!()
- }
- /// Returns the [ActorHandle] for the actor with the given name.
- ///
- /// If there is no such actor in this runtime then a [RuntimeError::BadActorName] error is
- /// returned.
- ///
- /// Note that the actor will be aborted when the given handle is dropped (unless it has already
- /// returned when the handle is dropped), and no further messages will be delivered to it by
- /// this runtime.
- pub async fn take(&self, name: &ActorName) -> Result<ActorHandle> {
- if name.path == self.path {
- let mut guard = self.handles.write().await;
- if let Some(handle) = guard.remove(&name.act_id) {
- Ok(handle)
- } else {
- Err(RuntimeError::BadActorName(name.clone()).into())
- }
- } else {
- Err(RuntimeError::BadActorName(name.clone()).into())
- }
- }
- /// Returns the name of the actor in this runtime with the given actor ID.
- pub fn actor_name(&self, act_id: Uuid) -> ActorName {
- ActorName::new(self.path.clone(), act_id)
- }
- }
- impl Drop for Runtime {
- fn drop(&mut self) {
- panic!("A Runtime was dropped. Panicking to avoid undefined behavior.");
- }
- }
- #[derive(Debug, Clone, PartialEq, Eq)]
- pub enum RuntimeError {
- BadActorName(ActorName),
- }
- impl Display for RuntimeError {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- Self::BadActorName(name) => write!(f, "bad actor name: {name}"),
- }
- }
- }
- impl std::error::Error for RuntimeError {}
- #[allow(dead_code)]
- /// Represents the terminal state of an actor, where it stops processing messages and halts.
- struct End;
- #[allow(dead_code)]
- /// Delivered to an actor implementation when it starts up.
- pub struct Activate {
- /// A reference to the `Runtime` which is running this actor.
- rt: &'static Runtime,
- /// The ID assigned to this actor.
- act_id: Uuid,
- }
- /// Deserializes replies sent over the wire.
- struct ReplyCallback<T> {
- _phantom: PhantomData<T>,
- }
- impl<T: CallMsg> ReplyCallback<T> {
- fn new() -> Self {
- Self {
- _phantom: PhantomData,
- }
- }
- }
- impl<T: CallMsg> DeserCallback for ReplyCallback<T> {
- type Arg<'de> = WireReply<'de> where T: 'de;
- type Return = Result<T::Reply>;
- type CallFut<'de> = Ready<Self::Return> where T: 'de, T::Reply: 'de;
- fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
- let result = match arg {
- WireReply::Ok(slice) => from_slice(slice).map_err(|err| err.into()),
- WireReply::Err(msg) => Err(StringError::new(msg.to_string()).into()),
- };
- ready(result)
- }
- }
- struct SendReplyCallback {
- replier: Option<Replier>,
- }
- impl SendReplyCallback {
- fn new(replier: Replier) -> Self {
- Self {
- replier: Some(replier),
- }
- }
- }
- impl DeserCallback for SendReplyCallback {
- type Arg<'de> = WireReply<'de>;
- type Return = Result<()>;
- type CallFut<'de> = impl 'de + Future<Output = Self::Return>;
- fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
- async move {
- if let Some(mut replier) = self.replier.take() {
- replier.reply(arg).await
- } else {
- Ok(())
- }
- }
- }
- }
- /// This struct implements the server callback for network messages.
- #[derive(Clone)]
- struct RuntimeCallback {
- rt: &'static Runtime,
- }
- impl RuntimeCallback {
- fn new(rt: &'static Runtime) -> Self {
- Self { rt }
- }
- async fn deliver_local(&self, msg: WireMsg<'_>, replier: Option<Replier>) -> Result<()> {
- let guard = self.rt.handles.read().await;
- if let Some(handle) = guard.get(&msg.to.act_id) {
- let envelope = if let Some(replier) = replier {
- WireEnvelope::Call { msg, replier }
- } else {
- WireEnvelope::Send { msg }
- };
- (handle.deliverer)(envelope).await
- } else {
- Err(bterr!("invalid actor name: {}", msg.to))
- }
- }
- async fn route_msg(&self, msg: WireMsg<'_>, replier: Option<Replier>) -> Result<()> {
- let guard = self.rt.peers.read().await;
- if let Some(tx) = guard.get(msg.to.path()) {
- if let Some(replier) = replier {
- let callback = SendReplyCallback::new(replier);
- tx.call(msg, callback).await?
- } else {
- tx.send(msg).await
- }
- } else {
- Err(bterr!(
- "unable to deliver message to peer at '{}'",
- msg.to.path()
- ))
- }
- }
- }
- impl MsgCallback for RuntimeCallback {
- type Arg<'de> = WireMsg<'de>;
- type CallFut<'de> = impl 'de + Future<Output = Result<()>>;
- fn call<'de>(&'de self, arg: bttp::MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de> {
- async move {
- let (_, body, replier) = arg.into_parts();
- if body.to.path() == self.rt.path() {
- self.deliver_local(body, replier).await
- } else {
- self.route_msg(body, replier).await
- }
- }
- }
- }
- /// A unique identifier for a particular service.
- #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
- pub struct ServiceId(#[serde(with = "smart_ptr")] Arc<String>);
- impl From<String> for ServiceId {
- fn from(value: String) -> Self {
- Self(Arc::new(value))
- }
- }
- impl<'a> From<&'a str> for ServiceId {
- fn from(value: &'a str) -> Self {
- Self(Arc::new(value.to_owned()))
- }
- }
- /// A unique identifier for a service.
- ///
- /// A service is a collection of actors in the same directory which provide some functionality.
- #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
- pub struct ServiceName {
- /// The path to the directory containing the service.
- #[serde(with = "smart_ptr")]
- path: Arc<BlockPath>,
- /// The id of the service.
- service_id: ServiceId,
- }
- /// A unique identifier for a specific actor activation.
- #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
- pub struct ActorName {
- /// The path to the directory containing this actor.
- #[serde(with = "smart_ptr")]
- path: Arc<BlockPath>,
- /// A unique identifier for an actor activation. Even as an actor transitions to different types
- /// as it handles messages, this value does not change. Thus this value can be used to trace an
- /// actor through a series of state transitions.
- act_id: Uuid,
- }
- impl ActorName {
- pub fn new(path: Arc<BlockPath>, act_id: Uuid) -> Self {
- Self { path, act_id }
- }
- pub fn path(&self) -> &Arc<BlockPath> {
- &self.path
- }
- pub fn act_id(&self) -> Uuid {
- self.act_id
- }
- }
- impl Display for ActorName {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "{}@{}", self.act_id, self.path)
- }
- }
- /// Trait for messages which expect exactly one reply.
- pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync {
- /// The reply type expected for this message.
- type Reply: Serialize + DeserializeOwned + Send + Sync;
- }
- /// Trait for messages which expect exactly zero replies.
- pub trait SendMsg: CallMsg {}
- /// A type used to express when a reply is not expected for a message type.
- #[derive(Serialize, Deserialize)]
- enum NoReply {}
- /// The maximum number of messages which can be kept in an actor's mailbox.
- const MAILBOX_LIMIT: usize = 32;
- /// The type of messages sent over the wire between runtimes.
- #[derive(Serialize, Deserialize)]
- struct WireMsg<'a> {
- to: ActorName,
- from: ActorName,
- payload: &'a [u8],
- }
- impl<'a> bttp::CallMsg<'a> for WireMsg<'a> {
- type Reply<'r> = WireReply<'r>;
- }
- impl<'a> bttp::SendMsg<'a> for WireMsg<'a> {}
- #[derive(Serialize, Deserialize)]
- enum WireReply<'a> {
- Ok(&'a [u8]),
- Err(&'a str),
- }
- /// A wrapper around [WireMsg] which indicates whether a call or send was executed.
- enum WireEnvelope<'de> {
- Send { msg: WireMsg<'de> },
- Call { msg: WireMsg<'de>, replier: Replier },
- }
- impl<'de> WireEnvelope<'de> {
- fn into_parts(self) -> (WireMsg<'de>, Option<Replier>) {
- match self {
- Self::Send { msg } => (msg, None),
- Self::Call { msg, replier } => (msg, Some(replier)),
- }
- }
- }
- /// Wrapper around a message type `T` which indicates who the message is from and, if the message
- /// was dispatched with `call`, provides a channel to reply to it.
- pub struct Envelope<T: CallMsg> {
- from: ActorName,
- reply: Option<oneshot::Sender<T::Reply>>,
- msg: T,
- }
- impl<T: CallMsg> Envelope<T> {
- pub fn new(msg: T, reply: Option<oneshot::Sender<T::Reply>>, from: ActorName) -> Self {
- Self { from, reply, msg }
- }
- /// Creates a new envelope containing the given message which does not expect a reply.
- fn new_send(from: ActorName, msg: T) -> Self {
- Self {
- from,
- msg,
- reply: None,
- }
- }
- /// Creates a new envelope containing the given message which expects exactly one reply.
- fn new_call(from: ActorName, msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
- let (tx, rx) = oneshot::channel::<T::Reply>();
- let envelope = Self {
- from,
- msg,
- reply: Some(tx),
- };
- (envelope, rx)
- }
- /// Returns the name of the actor which sent this message.
- pub fn from(&self) -> &ActorName {
- &self.from
- }
- /// Returns a reference to the message in this envelope.
- pub fn msg(&self) -> &T {
- &self.msg
- }
- /// Sends a reply to this message.
- ///
- /// If this message is not expecting a reply, or if this message has already been replied to,
- /// then an error is returned.
- pub fn reply(&mut self, reply: T::Reply) -> Result<()> {
- if let Some(tx) = self.reply.take() {
- if tx.send(reply).is_ok() {
- Ok(())
- } else {
- Err(bterr!("failed to send reply"))
- }
- } else {
- Err(bterr!("reply already sent"))
- }
- }
- /// Returns true if this message expects a reply and it has not already been replied to.
- pub fn needs_reply(&self) -> bool {
- self.reply.is_some()
- }
- pub fn split(self) -> (T, Option<oneshot::Sender<T::Reply>>, ActorName) {
- (self.msg, self.reply, self.from)
- }
- }
- type FutureResult = Pin<Box<dyn Send + Future<Output = Result<()>>>>;
- pub struct ActorHandle {
- handle: Option<JoinHandle<()>>,
- sender: Box<dyn Send + Sync + Any>,
- deliverer: Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult>,
- }
- impl ActorHandle {
- fn new<T, F>(handle: JoinHandle<()>, sender: mpsc::Sender<Envelope<T>>, deliverer: F) -> Self
- where
- T: 'static + CallMsg,
- F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult,
- {
- Self {
- handle: Some(handle),
- sender: Box::new(sender),
- deliverer: Box::new(deliverer),
- }
- }
- fn sender<T: 'static + CallMsg>(&self) -> Result<&mpsc::Sender<Envelope<T>>> {
- self.sender
- .downcast_ref::<mpsc::Sender<Envelope<T>>>()
- .ok_or_else(|| bterr!("unexpected message type"))
- }
- /// Sends a message to the actor represented by this handle.
- pub async fn send<T: 'static + SendMsg>(&self, from: ActorName, msg: T) -> Result<()> {
- let sender = self.sender()?;
- sender
- .send(Envelope::new_send(from, msg))
- .await
- .map_err(|_| bterr!("failed to enqueue message"))?;
- Ok(())
- }
- pub async fn call_through<T: 'static + CallMsg>(
- &self,
- from: ActorName,
- msg: T,
- ) -> Result<T::Reply> {
- let sender = self.sender()?;
- let (envelope, rx) = Envelope::new_call(from, msg);
- sender
- .send(envelope)
- .await
- .map_err(|_| bterr!("failed to enqueue call"))?;
- let reply = rx.await?;
- Ok(reply)
- }
- pub async fn returned(&mut self) -> Result<()> {
- if let Some(handle) = self.handle.take() {
- handle.await?;
- }
- Ok(())
- }
- pub fn abort(&mut self) {
- if let Some(handle) = self.handle.take() {
- handle.abort();
- }
- }
- }
- impl Drop for ActorHandle {
- fn drop(&mut self) {
- self.abort();
- }
- }
- #[cfg(test)]
- mod tests {
- use super::*;
- use btlib::{
- crypto::{ConcreteCreds, CredStore, CredsPriv},
- log::BuilderExt,
- };
- use btlib_tests::TEST_STORE;
- use btproto::protocol;
- use btserde::to_vec;
- use bttp::BlockAddr;
- use ctor::ctor;
- use lazy_static::lazy_static;
- use std::{
- net::{IpAddr, Ipv4Addr},
- sync::atomic::{AtomicU8, Ordering},
- time::{Duration, Instant},
- };
- use tokio::runtime::Builder;
- const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
- lazy_static! {
- static ref RUNTIME_CREDS: Arc<ConcreteCreds> = TEST_STORE.node_creds().unwrap();
- }
- declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
- lazy_static! {
- /// A tokio async runtime.
- ///
- /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
- /// is created for each test
- /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
- /// This creates a problem, because the first test thread to access the `RUNTIME` static
- /// will initialize its `Receiver` in its runtime, which will stop running at the end of
- /// the test. Hence subsequent tests will not be able to send remote messages to this
- /// `Runtime`.
- ///
- /// By creating a single async runtime which is used by all of the tests, we can avoid this
- /// problem.
- static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
- }
- /// The log level to use when running tests.
- const LOG_LEVEL: &str = "warn";
- #[ctor]
- fn ctor() {
- std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
- env_logger::Builder::from_default_env().btformat().init();
- }
- #[derive(Serialize, Deserialize)]
- struct EchoMsg(String);
- impl CallMsg for EchoMsg {
- type Reply = EchoMsg;
- }
- async fn echo(
- _rt: &'static Runtime,
- mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
- _act_id: Uuid,
- ) {
- while let Some(envelope) = mailbox.recv().await {
- let (msg, replier, ..) = envelope.split();
- if let Some(replier) = replier {
- if let Err(_) = replier.send(msg) {
- panic!("failed to send reply");
- }
- }
- }
- }
- #[test]
- fn local_call() {
- ASYNC_RT.block_on(async {
- const EXPECTED: &str = "hello";
- let name = RUNTIME.activate(echo).await;
- let from = ActorName::new(name.path().clone(), Uuid::default());
- let reply = RUNTIME
- .call(name.clone(), from, EchoMsg(EXPECTED.into()))
- .await
- .unwrap();
- assert_eq!(EXPECTED, reply.0);
- RUNTIME.take(&name).await.unwrap();
- })
- }
- #[test]
- fn remote_call() {
- ASYNC_RT.block_on(async {
- const EXPECTED: &str = "hello";
- let actor_name = RUNTIME.activate(echo).await;
- let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
- let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
- let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
- .await
- .unwrap();
- let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
- let wire_msg = WireMsg {
- to: actor_name.clone(),
- from: RUNTIME.actor_name(Uuid::default()),
- payload: &buf,
- };
- let reply = transmitter
- .call(wire_msg, ReplyCallback::<EchoMsg>::new())
- .await
- .unwrap()
- .unwrap();
- assert_eq!(EXPECTED, reply.0);
- RUNTIME.take(&actor_name).await.unwrap();
- });
- }
- /// Tests the `num_running` method.
- ///
- /// This test uses its own runtime and so can use the `#[tokio::test]` attribute.
- #[tokio::test]
- async fn num_running() {
- declare_runtime!(
- LOCAL_RT,
- // This needs to be different from the address where `RUNTIME` is listening.
- IpAddr::from([127, 0, 0, 2]),
- TEST_STORE.node_creds().unwrap()
- );
- assert_eq!(0, LOCAL_RT.num_running().await);
- let name = LOCAL_RT.activate(echo).await;
- assert_eq!(1, LOCAL_RT.num_running().await);
- LOCAL_RT.take(&name).await.unwrap();
- assert_eq!(0, LOCAL_RT.num_running().await);
- }
- // The following code is a proof-of-concept for what types should be generated for a
- // simple ping-pong protocol:
- //
- protocol! {
- let name = PingPongProtocol;
- let states = [
- ClientInit, SentPing,
- ServerInit, Listening,
- ];
- ClientInit?Activate -> SentPing, Listening!Ping;
- ServerInit?Activate -> Listening;
- Listening?Ping -> End, SentPing!Ping::Reply;
- SentPing?Ping::Reply -> End;
- }
- //
- // In words, the protocol is described as follows.
- // 1. The ClientInit state receives the Activate message. It returns the SentPing state and a
- // Ping message to be sent to the Listening state.
- // 2. The ServerInit state receives the Activate message. It returns the Listening state.
- // 3. When the Listening state receives the Ping message it returns the End state and a
- // Ping::Reply message to be sent to the SentPing state.
- // 4. When the SentPing state receives the Ping::Reply message it returns the End state.
- //
- // The End state represents an end to the session described by the protocol. When an actor
- // transitions to the End state its function returns.
- // The generated actor implementation is the sender of the Activate message.
- // When a state is expecting a Reply message, an error occurs if the message is not received
- // in a timely manner.
- #[derive(Serialize, Deserialize)]
- struct Ping;
- impl CallMsg for Ping {
- type Reply = PingReply;
- }
- // I was tempted to name this "Pong", but the proc macro wouldn't think to do that.
- #[derive(Serialize, Deserialize)]
- struct PingReply;
- trait ClientInit {
- type AfterActivate: SentPing;
- type HandleActivateFut: Future<Output = Result<(Self::AfterActivate, Ping)>>;
- fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
- }
- trait ServerInit {
- type AfterActivate: Listening;
- type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
- fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
- }
- trait Listening {
- type HandlePingFut: Future<Output = Result<(End, PingReply)>>;
- fn handle_ping(self, msg: Ping) -> Self::HandlePingFut;
- }
- trait SentPing {
- type HandleReplyFut: Future<Output = Result<End>>;
- fn handle_reply(self, msg: PingReply) -> Self::HandleReplyFut;
- }
- #[derive(Serialize, Deserialize)]
- enum PingProtocolMsg {
- Ping(Ping),
- PingReply(PingReply),
- }
- impl CallMsg for PingProtocolMsg {
- type Reply = PingProtocolMsg;
- }
- impl SendMsg for PingProtocolMsg {}
- struct ClientInitState;
- impl ClientInit for ClientInitState {
- type AfterActivate = ClientState;
- type HandleActivateFut = impl Future<Output = Result<(Self::AfterActivate, Ping)>>;
- fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
- ready(Ok((ClientState, Ping)))
- }
- }
- struct ClientState;
- impl SentPing for ClientState {
- type HandleReplyFut = Ready<Result<End>>;
- fn handle_reply(self, _msg: PingReply) -> Self::HandleReplyFut {
- ready(Ok(End))
- }
- }
- #[allow(dead_code)]
- enum PingClientState {
- Init(ClientInitState),
- SentPing(ClientState),
- End(End),
- }
- struct ServerInitState;
- struct ServerState;
- impl ServerInit for ServerInitState {
- type AfterActivate = ServerState;
- type HandleActivateFut = Ready<Result<Self::AfterActivate>>;
- fn handle_activate(self, _msg: Activate) -> Self::HandleActivateFut {
- ready(Ok(ServerState))
- }
- }
- impl Listening for ServerState {
- type HandlePingFut = impl Future<Output = Result<(End, PingReply)>>;
- fn handle_ping(self, _msg: Ping) -> Self::HandlePingFut {
- ready(Ok((End, PingReply)))
- }
- }
- #[allow(dead_code)]
- enum PingServerState {
- ServerInit(ServerInitState),
- Listening(ServerState),
- End(End),
- }
- async fn ping_server(
- counter: Arc<AtomicU8>,
- rt: &'static Runtime,
- mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
- act_id: Uuid,
- ) {
- let mut state = {
- let init = ServerInitState;
- let state = init.handle_activate(Activate { rt, act_id }).await.unwrap();
- PingServerState::Listening(state)
- };
- while let Some(envelope) = mailbox.recv().await {
- let (msg, replier, _from) = envelope.split();
- match (state, msg) {
- (PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => {
- let (new_state, reply) = listening_state.handle_ping(msg).await.unwrap();
- state = PingServerState::End(new_state);
- if let Err(_) = replier.unwrap().send(PingProtocolMsg::PingReply(reply)) {
- panic!("Failed to send Ping reply.");
- }
- }
- (_prev_state, _) => {
- panic!("Ping protocol violation.");
- // A real implementation should assign the previous state and log the error.
- // state = prev_state;
- }
- }
- if let PingServerState::End(_) = state {
- break;
- }
- }
- counter.fetch_sub(1, Ordering::SeqCst);
- }
- async fn ping_client(
- counter: Arc<AtomicU8>,
- server_name: ActorName,
- rt: &'static Runtime,
- _mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
- act_id: Uuid,
- ) {
- let init = ClientInitState;
- let (state, msg) = init.handle_activate(Activate { rt, act_id }).await.unwrap();
- let from = rt.actor_name(act_id);
- let reply = rt
- .call(server_name, from, PingProtocolMsg::Ping(msg))
- .await
- .unwrap();
- if let PingProtocolMsg::PingReply(msg) = reply {
- state.handle_reply(msg).await.unwrap();
- } else {
- panic!("Incorrect message type sent in reply to Ping.");
- }
- counter.fetch_sub(1, Ordering::SeqCst);
- }
- #[test]
- fn ping_pong_test() {
- ASYNC_RT.block_on(async {
- let counter = Arc::new(AtomicU8::new(2));
- let server_name = {
- let counter = counter.clone();
- RUNTIME
- .activate(move |rt, mailbox, act_id| ping_server(counter, rt, mailbox, act_id))
- .await
- };
- let client_name = {
- let server_name = server_name.clone();
- let counter = counter.clone();
- RUNTIME
- .activate(move |rt, mailbox, act_id| {
- ping_client(counter, server_name, rt, mailbox, act_id)
- })
- .await
- };
- let deadline = Instant::now() + Duration::from_millis(500);
- while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline {
- tokio::time::sleep(Duration::from_millis(20)).await;
- }
- // Check that both tasks finished successfully and we didn't just timeout.
- assert_eq!(0, counter.load(Ordering::SeqCst));
- // TODO: Should actor which return be removed from the runtime automatically?
- RUNTIME.take(&server_name).await.unwrap();
- RUNTIME.take(&client_name).await.unwrap();
- });
- }
- // Here's another protocol example. This is the Customer and Travel Agency protocol used as an
- // example in the survey paper "Behavioral Types in Programming Languages."
- // Note that the Choosing state can send messages at any time, not just in response to another
- // message because there is a transition from Choosing that doesn't use the receive operator
- // (`?`).
- protocol! {
- let name = TravelAgency;
- let states = [
- AgencyInit, Listening,
- Choosing,
- ];
- AgencyInit?Activate -> Listening;
- Choosing -> Choosing, Listening!Query|Accept|Reject;
- Listening?Query -> Listening, Choosing!Query::Reply;
- Choosing?Query::Reply -> Choosing;
- Listening?Accept -> End, Choosing!Accept::Reply;
- Choosing?Accept::Reply -> End;
- Listening?Reject -> End, Choosing!Reject:Reply;
- Choosing?Reject::Reply -> End;
- }
- }
|