lib.rs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652
  1. #![feature(impl_trait_in_assoc_type)]
  2. use std::{
  3. any::Any,
  4. collections::HashMap,
  5. fmt::Display,
  6. future::{ready, Future, Ready},
  7. marker::PhantomData,
  8. net::IpAddr,
  9. ops::DerefMut,
  10. pin::Pin,
  11. sync::Arc,
  12. };
  13. use btlib::{bterr, crypto::Creds, error::StringError, BlockPath, Result};
  14. use btmsg::{DeserCallback, MsgCallback, Receiver, Replier, Transmitter};
  15. use btserde::{field_helpers::smart_ptr, from_slice, to_vec, write_to};
  16. use serde::{de::DeserializeOwned, Deserialize, Serialize};
  17. use tokio::{
  18. sync::{mpsc, oneshot, Mutex, RwLock},
  19. task::JoinHandle,
  20. };
  21. use uuid::Uuid;
  22. /// Declares a new [Runtime] which listens for messages at the given IP address and uses the given
  23. /// [Creds]. Runtimes are intended to be created once in a process's lifetime and continue running
  24. /// until the process exits.
  25. #[macro_export]
  26. macro_rules! declare_runtime {
  27. ($name:ident, $ip_addr:expr, $creds:expr) => {
  28. ::lazy_static::lazy_static! {
  29. static ref $name: Runtime = _new_runtime($ip_addr, $creds).unwrap();
  30. }
  31. };
  32. }
  33. /// This function is not intended to be called directly by downstream crates. Use the macro
  34. /// [declare_runtime] to create a [Runtime] instead.
  35. pub fn _new_runtime<C: 'static + Send + Sync + Creds>(
  36. ip_addr: IpAddr,
  37. creds: Arc<C>,
  38. ) -> Result<Runtime> {
  39. let path = Arc::new(creds.bind_path()?);
  40. let handles = Arc::new(RwLock::new(HashMap::new()));
  41. let callback = RuntimeCallback::new(handles.clone());
  42. let rx = Receiver::new(ip_addr, creds, callback)?;
  43. Ok(Runtime {
  44. _rx: rx,
  45. path,
  46. handles,
  47. peers: RwLock::new(HashMap::new()),
  48. })
  49. }
  50. /// An actor runtime.
  51. ///
  52. /// Actors can be activated by the runtime and execute autonomously until they return. Running
  53. /// actors can be sent messages using the `send` method, which does not wait for a response from the
  54. /// recipient. If a reply is needed, then `call` can be used, which returns a future that will
  55. /// be ready when the reply has been received.
  56. pub struct Runtime {
  57. _rx: Receiver,
  58. path: Arc<BlockPath>,
  59. handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>,
  60. peers: RwLock<HashMap<Arc<BlockPath>, Transmitter>>,
  61. }
  62. impl Runtime {
  63. pub fn path(&self) -> &Arc<BlockPath> {
  64. &self.path
  65. }
  66. /// Returns the number of actors that are currently executing in this [Runtime].
  67. pub async fn num_running(&self) -> usize {
  68. let guard = self.handles.read().await;
  69. guard.len()
  70. }
  71. /// Sends a message to the actor identified by the given [ActorName].
  72. pub async fn send<T: 'static + SendMsg>(
  73. &self,
  74. to: &ActorName,
  75. from: Uuid,
  76. msg: T,
  77. ) -> Result<()> {
  78. if to.path == self.path {
  79. let guard = self.handles.read().await;
  80. if let Some(handle) = guard.get(&to.act_id) {
  81. handle.send(msg).await
  82. } else {
  83. Err(bterr!("invalid actor name"))
  84. }
  85. } else {
  86. let guard = self.peers.read().await;
  87. if let Some(peer) = guard.get(&to.path) {
  88. let buf = to_vec(&msg)?;
  89. let wire_msg = WireMsg {
  90. to: to.act_id,
  91. from,
  92. payload: &buf,
  93. };
  94. peer.send(wire_msg).await
  95. } else {
  96. // TODO: Use the filesystem to discover the address of the recipient and connect to
  97. // it.
  98. todo!()
  99. }
  100. }
  101. }
  102. /// Sends a message to the actor identified by the given [ActorName] and returns a future which
  103. /// is ready when a reply has been received.
  104. pub async fn call<T: 'static + CallMsg>(
  105. &self,
  106. to: &ActorName,
  107. from: Uuid,
  108. msg: T,
  109. ) -> Result<T::Reply> {
  110. if to.path == self.path {
  111. let guard = self.handles.read().await;
  112. if let Some(handle) = guard.get(&to.act_id) {
  113. handle.call_through(msg).await
  114. } else {
  115. Err(bterr!("invalid actor name"))
  116. }
  117. } else {
  118. let guard = self.peers.read().await;
  119. if let Some(peer) = guard.get(&to.path) {
  120. let buf = to_vec(&msg)?;
  121. let wire_msg = WireMsg {
  122. to: to.act_id,
  123. from,
  124. payload: &buf,
  125. };
  126. peer.call(wire_msg, ReplyCallback::<T>::new()).await?
  127. } else {
  128. // TODO: Use the filesystem to discover the address of the recipient and connect to
  129. // it.
  130. todo!()
  131. }
  132. }
  133. }
  134. /// Resolves the given [ServiceName] to an [ActorName] which is part of it.
  135. pub async fn resolve<'a>(&'a self, _service: &ServiceName) -> Result<ActorName> {
  136. todo!()
  137. }
  138. /// Activates a new actor using the given activator function and returns a handle to it.
  139. pub async fn activate<Msg, F, Fut>(&'static self, activator: F) -> ActorName
  140. where
  141. Msg: 'static + CallMsg,
  142. Fut: 'static + Send + Future<Output = ()>,
  143. F: FnOnce(&'static Runtime, mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
  144. {
  145. let mut guard = self.handles.write().await;
  146. let act_id = {
  147. let mut act_id = Uuid::new_v4();
  148. while guard.contains_key(&act_id) {
  149. act_id = Uuid::new_v4();
  150. }
  151. act_id
  152. };
  153. let (tx, rx) = mpsc::channel::<Envelope<Msg>>(MAILBOX_LIMIT);
  154. // The deliverer closure is responsible for deserializing messages received over the wire
  155. // and delivering them to the actor's mailbox and sending replies to call messages.
  156. let deliverer = {
  157. let buffer = Arc::new(Mutex::new(Vec::<u8>::new()));
  158. let tx = tx.clone();
  159. move |envelope: WireEnvelope| {
  160. let (wire_msg, replier) = envelope.into_parts();
  161. let result = from_slice(wire_msg.payload);
  162. let buffer = buffer.clone();
  163. let tx = tx.clone();
  164. let fut: FutureResult = Box::pin(async move {
  165. let msg = result?;
  166. if let Some(mut replier) = replier {
  167. let (envelope, rx) = Envelope::call(msg);
  168. tx.send(envelope).await.map_err(|_| {
  169. bterr!("failed to deliver message. Recipient may have halted.")
  170. })?;
  171. match rx.await {
  172. Ok(reply) => {
  173. let mut guard = buffer.lock().await;
  174. guard.clear();
  175. write_to(&reply, guard.deref_mut())?;
  176. let wire_reply = WireReply::Ok(&guard);
  177. replier.reply(wire_reply).await
  178. }
  179. Err(err) => replier.reply_err(err.to_string(), None).await,
  180. }
  181. } else {
  182. tx.send(Envelope::Send { msg }).await.map_err(|_| {
  183. bterr!("failed to deliver message. Recipient may have halted.")
  184. })
  185. }
  186. });
  187. fut
  188. }
  189. };
  190. let handle = tokio::task::spawn(activator(self, rx, act_id));
  191. let actor_handle = ActorHandle::new(handle, tx, deliverer);
  192. guard.insert(act_id, actor_handle);
  193. ActorName::new(self.path.clone(), act_id)
  194. }
  195. /// Registers an actor as a service with the given [ServiceId].
  196. pub async fn register<Msg, Fut, F, G>(
  197. &self,
  198. _id: ServiceId,
  199. _activator: F,
  200. _deserializer: G,
  201. ) -> Result<()>
  202. where
  203. Msg: 'static + CallMsg,
  204. Fut: 'static + Send + Future<Output = ()>,
  205. F: Fn(mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
  206. G: 'static + Send + Sync + Fn(&[u8]) -> Result<Msg>,
  207. {
  208. todo!()
  209. }
  210. /// Returns the [ActorHandle] for the actor with the given name.
  211. ///
  212. /// If there is no such actor in this runtime then a [RuntimeError::BadActorName] error is
  213. /// returned.
  214. ///
  215. /// Note that the actor will be aborted when the given handle is dropped (unless it has already
  216. /// returned when the handle is dropped), and no further messages will be delivered to it by
  217. /// this runtime.
  218. pub async fn take(&self, name: &ActorName) -> Result<ActorHandle> {
  219. if name.path == self.path {
  220. let mut guard = self.handles.write().await;
  221. if let Some(handle) = guard.remove(&name.act_id) {
  222. Ok(handle)
  223. } else {
  224. Err(RuntimeError::BadActorName(name.clone()).into())
  225. }
  226. } else {
  227. Err(RuntimeError::BadActorName(name.clone()).into())
  228. }
  229. }
  230. }
  231. impl Drop for Runtime {
  232. fn drop(&mut self) {
  233. panic!("A Runtime was dropped. Panicking to avoid undefined behavior.");
  234. }
  235. }
  236. #[derive(Debug, Clone, PartialEq, Eq)]
  237. pub enum RuntimeError {
  238. BadActorName(ActorName),
  239. }
  240. impl Display for RuntimeError {
  241. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  242. match self {
  243. Self::BadActorName(name) => write!(f, "bad actor name: {name}"),
  244. }
  245. }
  246. }
  247. impl std::error::Error for RuntimeError {}
  248. /// Deserializes replies sent over the wire.
  249. struct ReplyCallback<T> {
  250. _phantom: PhantomData<T>,
  251. }
  252. impl<T: CallMsg> ReplyCallback<T> {
  253. fn new() -> Self {
  254. Self {
  255. _phantom: PhantomData,
  256. }
  257. }
  258. }
  259. impl<T: CallMsg> DeserCallback for ReplyCallback<T> {
  260. type Arg<'de> = WireReply<'de> where T: 'de;
  261. type Return = Result<T::Reply>;
  262. type CallFut<'de> = Ready<Self::Return> where T: 'de, T::Reply: 'de;
  263. fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
  264. let result = match arg {
  265. WireReply::Ok(slice) => from_slice(slice).map_err(|err| err.into()),
  266. WireReply::Err(msg) => Err(StringError::new(msg.to_string()).into()),
  267. };
  268. ready(result)
  269. }
  270. }
  271. /// This struct implements the server callback for network messages.
  272. #[derive(Clone)]
  273. struct RuntimeCallback {
  274. handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>,
  275. }
  276. impl RuntimeCallback {
  277. fn new(handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>) -> Self {
  278. Self { handles }
  279. }
  280. }
  281. impl MsgCallback for RuntimeCallback {
  282. type Arg<'de> = WireMsg<'de>;
  283. type CallFut<'de> = impl 'de + Future<Output = Result<()>>;
  284. fn call<'de>(&'de self, arg: btmsg::MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de> {
  285. async move {
  286. let (_, body, replier) = arg.into_parts();
  287. let guard = self.handles.read().await;
  288. if let Some(handle) = guard.get(&body.to) {
  289. let envelope = if let Some(replier) = replier {
  290. WireEnvelope::Call { msg: body, replier }
  291. } else {
  292. WireEnvelope::Send { msg: body }
  293. };
  294. (handle.deliverer)(envelope).await
  295. } else {
  296. Err(bterr!("invalid actor ID: {}", body.to))
  297. }
  298. }
  299. }
  300. }
  301. /// A unique identifier for a particular service.
  302. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  303. pub struct ServiceId(#[serde(with = "smart_ptr")] Arc<String>);
  304. impl From<String> for ServiceId {
  305. fn from(value: String) -> Self {
  306. Self(Arc::new(value))
  307. }
  308. }
  309. impl<'a> From<&'a str> for ServiceId {
  310. fn from(value: &'a str) -> Self {
  311. Self(Arc::new(value.to_owned()))
  312. }
  313. }
  314. /// A unique identifier for a service.
  315. ///
  316. /// A service is a collection of actors in the same directory which provide some functionality.
  317. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  318. pub struct ServiceName {
  319. /// The path to the directory containing the service.
  320. #[serde(with = "smart_ptr")]
  321. path: Arc<BlockPath>,
  322. /// The id of the service.
  323. service_id: ServiceId,
  324. }
  325. /// A unique identifier for a specific actor activation.
  326. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  327. pub struct ActorName {
  328. /// The path to the directory containing this actor.
  329. #[serde(with = "smart_ptr")]
  330. path: Arc<BlockPath>,
  331. /// A unique identifier for an actor activation. Even as an actor transitions to different types
  332. /// as it handles messages, this value does not change. Thus this value can be used to trace an
  333. /// actor through a series of state transitions.
  334. act_id: Uuid,
  335. }
  336. impl ActorName {
  337. pub fn new(path: Arc<BlockPath>, act_id: Uuid) -> Self {
  338. Self { path, act_id }
  339. }
  340. }
  341. impl Display for ActorName {
  342. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  343. write!(f, "{}@{}", self.act_id, self.path)
  344. }
  345. }
  346. /// Trait for messages which expect exactly one reply.
  347. pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync {
  348. /// The reply type expected for this message.
  349. type Reply: Serialize + DeserializeOwned + Send + Sync;
  350. }
  351. /// Trait for messages which expect exactly zero replies.
  352. pub trait SendMsg: CallMsg {}
  353. /// The maximum number of messages which can be kept in an actor's mailbox.
  354. const MAILBOX_LIMIT: usize = 32;
  355. /// The type of messages sent over the wire between runtimes.
  356. #[derive(Serialize, Deserialize)]
  357. struct WireMsg<'a> {
  358. to: Uuid,
  359. from: Uuid,
  360. payload: &'a [u8],
  361. }
  362. impl<'a> btmsg::CallMsg<'a> for WireMsg<'a> {
  363. type Reply<'r> = WireReply<'r>;
  364. }
  365. impl<'a> btmsg::SendMsg<'a> for WireMsg<'a> {}
  366. #[derive(Serialize, Deserialize)]
  367. enum WireReply<'a> {
  368. Ok(&'a [u8]),
  369. Err(&'a str),
  370. }
  371. /// A wrapper around [WireMsg] which indicates whether a call or send was executed.
  372. enum WireEnvelope<'de> {
  373. Send { msg: WireMsg<'de> },
  374. Call { msg: WireMsg<'de>, replier: Replier },
  375. }
  376. impl<'de> WireEnvelope<'de> {
  377. fn into_parts(self) -> (WireMsg<'de>, Option<Replier>) {
  378. match self {
  379. Self::Send { msg } => (msg, None),
  380. Self::Call { msg, replier } => (msg, Some(replier)),
  381. }
  382. }
  383. }
  384. /// Wraps a message to indicate if it was sent with `call` or `send`.
  385. ///
  386. /// If the message was sent with call, then this enum will contain a channel that can be used to
  387. /// reply to it.
  388. pub enum Envelope<T: CallMsg> {
  389. /// The message was sent with `send` and does not expect a reply.
  390. Send { msg: T },
  391. /// The message was sent with `call` and expects a reply.
  392. Call {
  393. msg: T,
  394. /// A reply message must be sent using this channel.
  395. reply: oneshot::Sender<T::Reply>,
  396. },
  397. }
  398. impl<T: CallMsg> Envelope<T> {
  399. fn send(msg: T) -> Self {
  400. Self::Send { msg }
  401. }
  402. fn call(msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
  403. let (tx, rx) = oneshot::channel::<T::Reply>();
  404. let kind = Envelope::Call { msg, reply: tx };
  405. (kind, rx)
  406. }
  407. }
  408. type FutureResult = Pin<Box<dyn Send + Future<Output = Result<()>>>>;
  409. pub struct ActorHandle {
  410. handle: Option<JoinHandle<()>>,
  411. sender: Box<dyn Send + Sync + Any>,
  412. deliverer: Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult>,
  413. }
  414. impl ActorHandle {
  415. fn new<T, F>(handle: JoinHandle<()>, sender: mpsc::Sender<Envelope<T>>, deliverer: F) -> Self
  416. where
  417. T: 'static + CallMsg,
  418. F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult,
  419. {
  420. Self {
  421. handle: Some(handle),
  422. sender: Box::new(sender),
  423. deliverer: Box::new(deliverer),
  424. }
  425. }
  426. fn sender<T: 'static + CallMsg>(&self) -> Result<&mpsc::Sender<Envelope<T>>> {
  427. self.sender
  428. .downcast_ref::<mpsc::Sender<Envelope<T>>>()
  429. .ok_or_else(|| bterr!("unexpected message type"))
  430. }
  431. pub async fn send<T: 'static + SendMsg>(&self, msg: T) -> Result<()> {
  432. let sender = self.sender()?;
  433. sender
  434. .send(Envelope::send(msg))
  435. .await
  436. .map_err(|_| bterr!("failed to enqueue message"))?;
  437. Ok(())
  438. }
  439. pub async fn call_through<T: 'static + CallMsg>(&self, msg: T) -> Result<T::Reply> {
  440. let sender = self.sender()?;
  441. let (envelope, rx) = Envelope::call(msg);
  442. sender
  443. .send(envelope)
  444. .await
  445. .map_err(|_| bterr!("failed to enqueue call"))?;
  446. let reply = rx.await?;
  447. Ok(reply)
  448. }
  449. pub async fn returned(&mut self) -> Result<()> {
  450. if let Some(handle) = self.handle.take() {
  451. handle.await?;
  452. }
  453. Ok(())
  454. }
  455. pub fn abort(&mut self) {
  456. if let Some(handle) = self.handle.take() {
  457. handle.abort();
  458. }
  459. }
  460. }
  461. impl Drop for ActorHandle {
  462. fn drop(&mut self) {
  463. self.abort();
  464. }
  465. }
  466. #[cfg(test)]
  467. mod tests {
  468. use super::*;
  469. use btlib::{
  470. crypto::{ConcreteCreds, CredStore, CredsPriv},
  471. log::BuilderExt,
  472. };
  473. use btlib_tests::TEST_STORE;
  474. use btmsg::BlockAddr;
  475. use btserde::to_vec;
  476. use ctor::ctor;
  477. use lazy_static::lazy_static;
  478. use std::net::{IpAddr, Ipv4Addr};
  479. use tokio::runtime::Builder;
  480. const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
  481. lazy_static! {
  482. static ref RUNTIME_CREDS: Arc<ConcreteCreds> = TEST_STORE.node_creds().unwrap();
  483. }
  484. declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
  485. lazy_static! {
  486. /// A tokio async runtime.
  487. ///
  488. /// When the `#[tokio::test]` attribute is used on a test, a new current thread runtime
  489. /// is created for each test
  490. /// (source: https://docs.rs/tokio/latest/tokio/attr.test.html#current-thread-runtime).
  491. /// This creates a problem, because the first test thread to access the `RUNTIME` static
  492. /// will initialize its `Receiver` in its runtime, which will stop running at the end of
  493. /// the test. Hence subsequent tests will not be able to send remote messages to this
  494. /// `Runtime`.
  495. ///
  496. /// By creating a single async runtime which is used by all of the tests, we can avoid this
  497. /// problem.
  498. static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread()
  499. .enable_all()
  500. .build()
  501. .unwrap();
  502. }
  503. /// The log level to use when running tests.
  504. const LOG_LEVEL: &str = "warn";
  505. #[ctor]
  506. fn ctor() {
  507. std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
  508. env_logger::Builder::from_default_env().btformat().init();
  509. }
  510. #[derive(Serialize, Deserialize)]
  511. struct EchoMsg(String);
  512. impl CallMsg for EchoMsg {
  513. type Reply = EchoMsg;
  514. }
  515. async fn echo(
  516. _rt: &'static Runtime,
  517. mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
  518. _act_id: Uuid,
  519. ) {
  520. while let Some(msg) = mailbox.recv().await {
  521. if let Envelope::Call { msg, reply } = msg {
  522. if let Err(_) = reply.send(msg) {
  523. panic!("failed to send reply");
  524. }
  525. }
  526. }
  527. }
  528. #[test]
  529. fn local_call() {
  530. ASYNC_RT.block_on(async {
  531. const EXPECTED: &str = "hello";
  532. let name = RUNTIME.activate(echo).await;
  533. let reply = RUNTIME
  534. .call(&name, Uuid::default(), EchoMsg(EXPECTED.into()))
  535. .await
  536. .unwrap();
  537. assert_eq!(EXPECTED, reply.0);
  538. RUNTIME.take(&name).await.unwrap();
  539. })
  540. }
  541. #[test]
  542. fn remote_call() {
  543. ASYNC_RT.block_on(async {
  544. const EXPECTED: &str = "hello";
  545. let actor_name = RUNTIME.activate(echo).await;
  546. let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
  547. let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
  548. let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
  549. .await
  550. .unwrap();
  551. let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
  552. let wire_msg = WireMsg {
  553. to: actor_name.act_id,
  554. from: Uuid::default(),
  555. payload: &buf,
  556. };
  557. let reply = transmitter
  558. .call(wire_msg, ReplyCallback::<EchoMsg>::new())
  559. .await
  560. .unwrap()
  561. .unwrap();
  562. assert_eq!(EXPECTED, reply.0);
  563. RUNTIME.take(&actor_name).await.unwrap();
  564. });
  565. }
  566. /// Tests the `num_running` method.
  567. ///
  568. /// This test uses its own runtime and so can use the `#[tokio::test]` attribute.
  569. #[tokio::test]
  570. async fn num_running() {
  571. declare_runtime!(
  572. LOCAL_RT,
  573. // This needs to be different from the address where `RUNTIME` is listening.
  574. IpAddr::from([127, 0, 0, 2]),
  575. TEST_STORE.node_creds().unwrap()
  576. );
  577. assert_eq!(0, LOCAL_RT.num_running().await);
  578. let name = LOCAL_RT.activate(echo).await;
  579. assert_eq!(1, LOCAL_RT.num_running().await);
  580. LOCAL_RT.take(&name).await.unwrap();
  581. assert_eq!(0, LOCAL_RT.num_running().await);
  582. }
  583. }