lib.rs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704
  1. #![feature(impl_trait_in_assoc_type)]
  2. use std::{
  3. any::Any,
  4. collections::HashMap,
  5. fmt::Display,
  6. future::{ready, Future, Ready},
  7. marker::PhantomData,
  8. net::IpAddr,
  9. ops::DerefMut,
  10. pin::Pin,
  11. sync::Arc,
  12. };
  13. use btlib::{bterr, crypto::Creds, error::StringError, BlockPath, Result};
  14. use btserde::{field_helpers::smart_ptr, from_slice, to_vec, write_to};
  15. use bttp::{DeserCallback, MsgCallback, Receiver, Replier, Transmitter};
  16. use serde::{de::DeserializeOwned, Deserialize, Serialize};
  17. use tokio::{
  18. sync::{mpsc, oneshot, Mutex, RwLock},
  19. task::JoinHandle,
  20. };
  21. use uuid::Uuid;
  22. /// Declares a new [Runtime] which listens for messages at the given IP address and uses the given
  23. /// [Creds]. Runtimes are intended to be created once in a process's lifetime and continue running
  24. /// until the process exits.
  25. #[macro_export]
  26. macro_rules! declare_runtime {
  27. ($name:ident, $ip_addr:expr, $creds:expr) => {
  28. ::lazy_static::lazy_static! {
  29. static ref $name: &'static ::btrun::Runtime = {
  30. ::lazy_static::lazy_static! {
  31. static ref RUNTIME: ::btrun::Runtime = ::btrun::Runtime::_new($creds).unwrap();
  32. static ref RECEIVER: ::bttp::Receiver = _new_receiver($ip_addr, $creds, &*RUNTIME);
  33. }
  34. // By dereferencing RECEIVER we ensure it is started.
  35. let _ = &*RECEIVER;
  36. &*RUNTIME
  37. };
  38. }
  39. };
  40. }
  41. /// This function is not intended to be called by downstream crates.
  42. #[doc(hidden)]
  43. pub fn _new_receiver<C>(ip_addr: IpAddr, creds: Arc<C>, runtime: &'static Runtime) -> Receiver
  44. where
  45. C: 'static + Send + Sync + Creds,
  46. {
  47. let callback = RuntimeCallback::new(runtime);
  48. Receiver::new(ip_addr, creds, callback).unwrap()
  49. }
  50. /// An actor runtime.
  51. ///
  52. /// Actors can be activated by the runtime and execute autonomously until they return. Running
  53. /// actors can be sent messages using the `send` method, which does not wait for a response from the
  54. /// recipient. If a reply is needed, then `call` can be used, which returns a future that will
  55. /// be ready when the reply has been received.
  56. pub struct Runtime {
  57. path: Arc<BlockPath>,
  58. handles: RwLock<HashMap<Uuid, ActorHandle>>,
  59. peers: RwLock<HashMap<Arc<BlockPath>, Transmitter>>,
  60. }
  61. impl Runtime {
  62. /// This method is not intended to be called directly by downstream crates. Use the macro
  63. /// [declare_runtime] to create a [Runtime].
  64. ///
  65. /// If you create a non-static [Runtime], your process will panic when it is dropped.
  66. #[doc(hidden)]
  67. pub fn _new<C: 'static + Send + Sync + Creds>(creds: Arc<C>) -> Result<Runtime> {
  68. let path = Arc::new(creds.bind_path()?);
  69. Ok(Runtime {
  70. path,
  71. handles: RwLock::new(HashMap::new()),
  72. peers: RwLock::new(HashMap::new()),
  73. })
  74. }
  75. pub fn path(&self) -> &Arc<BlockPath> {
  76. &self.path
  77. }
  78. /// Returns the number of actors that are currently executing in this [Runtime].
  79. pub async fn num_running(&self) -> usize {
  80. let guard = self.handles.read().await;
  81. guard.len()
  82. }
  83. /// Sends a message to the actor identified by the given [ActorName].
  84. pub async fn send<T: 'static + SendMsg>(
  85. &self,
  86. to: ActorName,
  87. from: ActorName,
  88. msg: T,
  89. ) -> Result<()> {
  90. if to.path == self.path {
  91. let guard = self.handles.read().await;
  92. if let Some(handle) = guard.get(&to.act_id) {
  93. handle.send(from, msg).await
  94. } else {
  95. Err(bterr!("invalid actor name"))
  96. }
  97. } else {
  98. let guard = self.peers.read().await;
  99. if let Some(peer) = guard.get(&to.path) {
  100. let buf = to_vec(&msg)?;
  101. let wire_msg = WireMsg {
  102. to,
  103. from,
  104. payload: &buf,
  105. };
  106. peer.send(wire_msg).await
  107. } else {
  108. // TODO: Use the filesystem to discover the address of the recipient and connect to
  109. // it.
  110. todo!()
  111. }
  112. }
  113. }
  114. /// Sends a message to the service identified by [ServiceName].
  115. pub async fn send_service<T: 'static + SendMsg>(
  116. &self,
  117. _to: ServiceName,
  118. _from: ActorName,
  119. _msg: T,
  120. ) -> Result<()> {
  121. todo!()
  122. }
  123. /// Sends a message to the actor identified by the given [ActorName] and returns a future which
  124. /// is ready when a reply has been received.
  125. pub async fn call<T: 'static + CallMsg>(
  126. &self,
  127. to: ActorName,
  128. from: ActorName,
  129. msg: T,
  130. ) -> Result<T::Reply> {
  131. if to.path == self.path {
  132. let guard = self.handles.read().await;
  133. if let Some(handle) = guard.get(&to.act_id) {
  134. handle.call_through(from, msg).await
  135. } else {
  136. Err(bterr!("invalid actor name"))
  137. }
  138. } else {
  139. let guard = self.peers.read().await;
  140. if let Some(peer) = guard.get(&to.path) {
  141. let buf = to_vec(&msg)?;
  142. let wire_msg = WireMsg {
  143. to,
  144. from,
  145. payload: &buf,
  146. };
  147. peer.call(wire_msg, ReplyCallback::<T>::new()).await?
  148. } else {
  149. // TODO: Use the filesystem to discover the address of the recipient and connect to
  150. // it.
  151. todo!()
  152. }
  153. }
  154. }
  155. /// Calls a service identified by [ServiceName].
  156. pub async fn send_call<T: 'static + CallMsg>(
  157. &self,
  158. _to: ServiceName,
  159. _from: ActorName,
  160. _msg: T,
  161. ) -> Result<T::Reply> {
  162. todo!()
  163. }
  164. /// Activates a new actor using the given activator function and returns a handle to it.
  165. pub async fn activate<Msg, F, Fut>(&'static self, activator: F) -> ActorName
  166. where
  167. Msg: 'static + CallMsg,
  168. Fut: 'static + Send + Future<Output = ()>,
  169. F: FnOnce(&'static Runtime, mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
  170. {
  171. let mut guard = self.handles.write().await;
  172. let act_id = {
  173. let mut act_id = Uuid::new_v4();
  174. while guard.contains_key(&act_id) {
  175. act_id = Uuid::new_v4();
  176. }
  177. act_id
  178. };
  179. let act_name = self.actor_name(act_id);
  180. let (tx, rx) = mpsc::channel::<Envelope<Msg>>(MAILBOX_LIMIT);
  181. // The deliverer closure is responsible for deserializing messages received over the wire
  182. // and delivering them to the actor's mailbox, and sending replies to call messages.
  183. let deliverer = {
  184. let buffer = Arc::new(Mutex::new(Vec::<u8>::new()));
  185. let tx = tx.clone();
  186. let act_name = act_name.clone();
  187. move |envelope: WireEnvelope| {
  188. let (wire_msg, replier) = envelope.into_parts();
  189. let result = from_slice(wire_msg.payload);
  190. let buffer = buffer.clone();
  191. let tx = tx.clone();
  192. let act_name = act_name.clone();
  193. let fut: FutureResult = Box::pin(async move {
  194. let msg = result?;
  195. if let Some(mut replier) = replier {
  196. let (envelope, rx) = Envelope::new_call(act_name, msg);
  197. tx.send(envelope).await.map_err(|_| {
  198. bterr!("failed to deliver message. Recipient may have halted.")
  199. })?;
  200. match rx.await {
  201. Ok(reply) => {
  202. let mut guard = buffer.lock().await;
  203. guard.clear();
  204. write_to(&reply, guard.deref_mut())?;
  205. let wire_reply = WireReply::Ok(&guard);
  206. replier.reply(wire_reply).await
  207. }
  208. Err(err) => replier.reply_err(err.to_string(), None).await,
  209. }
  210. } else {
  211. tx.send(Envelope::new_send(act_name, msg))
  212. .await
  213. .map_err(|_| {
  214. bterr!("failed to deliver message. Recipient may have halted.")
  215. })
  216. }
  217. });
  218. fut
  219. }
  220. };
  221. let handle = tokio::task::spawn(activator(self, rx, act_id));
  222. let actor_handle = ActorHandle::new(handle, tx, deliverer);
  223. guard.insert(act_id, actor_handle);
  224. act_name
  225. }
  226. /// Registers an actor as a service with the given [ServiceId].
  227. pub async fn register<Msg, Fut, F, G>(
  228. &self,
  229. _id: ServiceId,
  230. _activator: F,
  231. _deserializer: G,
  232. ) -> Result<()>
  233. where
  234. Msg: 'static + CallMsg,
  235. Fut: 'static + Send + Future<Output = ()>,
  236. F: Fn(mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
  237. G: 'static + Send + Sync + Fn(&[u8]) -> Result<Msg>,
  238. {
  239. todo!()
  240. }
  241. /// Returns the [ActorHandle] for the actor with the given name.
  242. ///
  243. /// If there is no such actor in this runtime then a [RuntimeError::BadActorName] error is
  244. /// returned.
  245. ///
  246. /// Note that the actor will be aborted when the given handle is dropped (unless it has already
  247. /// returned when the handle is dropped), and no further messages will be delivered to it by
  248. /// this runtime.
  249. pub async fn take(&self, name: &ActorName) -> Result<ActorHandle> {
  250. if name.path == self.path {
  251. let mut guard = self.handles.write().await;
  252. if let Some(handle) = guard.remove(&name.act_id) {
  253. Ok(handle)
  254. } else {
  255. Err(RuntimeError::BadActorName(name.clone()).into())
  256. }
  257. } else {
  258. Err(RuntimeError::BadActorName(name.clone()).into())
  259. }
  260. }
  261. /// Returns the name of the actor in this runtime with the given actor ID.
  262. pub fn actor_name(&self, act_id: Uuid) -> ActorName {
  263. ActorName::new(self.path.clone(), act_id)
  264. }
  265. }
  266. impl Drop for Runtime {
  267. fn drop(&mut self) {
  268. panic!("A Runtime was dropped. Panicking to avoid undefined behavior.");
  269. }
  270. }
  271. #[derive(Debug, Clone, PartialEq, Eq)]
  272. pub enum RuntimeError {
  273. BadActorName(ActorName),
  274. }
  275. impl Display for RuntimeError {
  276. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  277. match self {
  278. Self::BadActorName(name) => write!(f, "bad actor name: {name}"),
  279. }
  280. }
  281. }
  282. impl std::error::Error for RuntimeError {}
  283. /// Represents the terminal state of an actor, where it stops processing messages and halts.
  284. pub struct End;
  285. impl End {
  286. /// Returns the identifier for this type which is expected in protocol definitions.
  287. pub fn ident() -> &'static str {
  288. stringify!(End)
  289. }
  290. }
  291. #[allow(dead_code)]
  292. /// Delivered to an actor implementation when it starts up.
  293. pub struct Activate {
  294. /// A reference to the `Runtime` which is running this actor.
  295. rt: &'static Runtime,
  296. /// The ID assigned to this actor.
  297. act_id: Uuid,
  298. }
  299. impl Activate {
  300. pub fn new(rt: &'static Runtime, act_id: Uuid) -> Self {
  301. Self { rt, act_id }
  302. }
  303. }
  304. /// Deserializes replies sent over the wire.
  305. pub struct ReplyCallback<T> {
  306. _phantom: PhantomData<T>,
  307. }
  308. impl<T: CallMsg> ReplyCallback<T> {
  309. pub fn new() -> Self {
  310. Self {
  311. _phantom: PhantomData,
  312. }
  313. }
  314. }
  315. impl<T: CallMsg> Default for ReplyCallback<T> {
  316. fn default() -> Self {
  317. Self::new()
  318. }
  319. }
  320. impl<T: CallMsg> DeserCallback for ReplyCallback<T> {
  321. type Arg<'de> = WireReply<'de> where T: 'de;
  322. type Return = Result<T::Reply>;
  323. type CallFut<'de> = Ready<Self::Return> where T: 'de, T::Reply: 'de;
  324. fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
  325. let result = match arg {
  326. WireReply::Ok(slice) => from_slice(slice).map_err(|err| err.into()),
  327. WireReply::Err(msg) => Err(StringError::new(msg.to_string()).into()),
  328. };
  329. ready(result)
  330. }
  331. }
  332. struct SendReplyCallback {
  333. replier: Option<Replier>,
  334. }
  335. impl SendReplyCallback {
  336. fn new(replier: Replier) -> Self {
  337. Self {
  338. replier: Some(replier),
  339. }
  340. }
  341. }
  342. impl DeserCallback for SendReplyCallback {
  343. type Arg<'de> = WireReply<'de>;
  344. type Return = Result<()>;
  345. type CallFut<'de> = impl 'de + Future<Output = Self::Return>;
  346. fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
  347. async move {
  348. if let Some(mut replier) = self.replier.take() {
  349. replier.reply(arg).await
  350. } else {
  351. Ok(())
  352. }
  353. }
  354. }
  355. }
  356. /// This struct implements the server callback for network messages.
  357. #[derive(Clone)]
  358. struct RuntimeCallback {
  359. rt: &'static Runtime,
  360. }
  361. impl RuntimeCallback {
  362. fn new(rt: &'static Runtime) -> Self {
  363. Self { rt }
  364. }
  365. async fn deliver_local(&self, msg: WireMsg<'_>, replier: Option<Replier>) -> Result<()> {
  366. let guard = self.rt.handles.read().await;
  367. if let Some(handle) = guard.get(&msg.to.act_id) {
  368. let envelope = if let Some(replier) = replier {
  369. WireEnvelope::Call { msg, replier }
  370. } else {
  371. WireEnvelope::Send { msg }
  372. };
  373. (handle.deliverer)(envelope).await
  374. } else {
  375. Err(bterr!("invalid actor name: {}", msg.to))
  376. }
  377. }
  378. async fn route_msg(&self, msg: WireMsg<'_>, replier: Option<Replier>) -> Result<()> {
  379. let guard = self.rt.peers.read().await;
  380. if let Some(tx) = guard.get(msg.to.path()) {
  381. if let Some(replier) = replier {
  382. let callback = SendReplyCallback::new(replier);
  383. tx.call(msg, callback).await?
  384. } else {
  385. tx.send(msg).await
  386. }
  387. } else {
  388. Err(bterr!(
  389. "unable to deliver message to peer at '{}'",
  390. msg.to.path()
  391. ))
  392. }
  393. }
  394. }
  395. impl MsgCallback for RuntimeCallback {
  396. type Arg<'de> = WireMsg<'de>;
  397. type CallFut<'de> = impl 'de + Future<Output = Result<()>>;
  398. fn call<'de>(&'de self, arg: bttp::MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de> {
  399. async move {
  400. let (_, body, replier) = arg.into_parts();
  401. if body.to.path() == self.rt.path() {
  402. self.deliver_local(body, replier).await
  403. } else {
  404. self.route_msg(body, replier).await
  405. }
  406. }
  407. }
  408. }
  409. /// A unique identifier for a particular service.
  410. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  411. pub struct ServiceId(#[serde(with = "smart_ptr")] Arc<String>);
  412. impl From<String> for ServiceId {
  413. fn from(value: String) -> Self {
  414. Self(Arc::new(value))
  415. }
  416. }
  417. impl<'a> From<&'a str> for ServiceId {
  418. fn from(value: &'a str) -> Self {
  419. Self(Arc::new(value.to_owned()))
  420. }
  421. }
  422. /// A unique identifier for a service.
  423. ///
  424. /// A service is a collection of actors in the same directory which provide some functionality.
  425. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  426. pub struct ServiceName {
  427. /// The path to the directory containing the service.
  428. #[serde(with = "smart_ptr")]
  429. path: Arc<BlockPath>,
  430. /// The id of the service.
  431. service_id: ServiceId,
  432. /// Indicates if the message should be routed towards the root of the tree or away from it.
  433. rootward: bool,
  434. }
  435. /// A unique identifier for a specific actor activation.
  436. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
  437. pub struct ActorName {
  438. /// The path to the directory containing this actor.
  439. #[serde(with = "smart_ptr")]
  440. path: Arc<BlockPath>,
  441. /// A unique identifier for an actor activation. Even as an actor transitions to different types
  442. /// as it handles messages, this value does not change. Thus this value can be used to trace an
  443. /// actor through a series of state transitions.
  444. act_id: Uuid,
  445. }
  446. impl ActorName {
  447. pub fn new(path: Arc<BlockPath>, act_id: Uuid) -> Self {
  448. Self { path, act_id }
  449. }
  450. pub fn path(&self) -> &Arc<BlockPath> {
  451. &self.path
  452. }
  453. pub fn act_id(&self) -> Uuid {
  454. self.act_id
  455. }
  456. }
  457. impl Display for ActorName {
  458. fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
  459. write!(f, "{}@{}", self.act_id, self.path)
  460. }
  461. }
  462. /// Trait for messages which expect exactly one reply.
  463. pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync {
  464. /// The reply type expected for this message.
  465. type Reply: Serialize + DeserializeOwned + Send + Sync;
  466. }
  467. /// Trait for messages which expect exactly zero replies.
  468. pub trait SendMsg: CallMsg {}
  469. /// A type used to express when a reply is not expected for a message type.
  470. #[derive(Serialize, Deserialize)]
  471. pub enum NoReply {}
  472. /// The maximum number of messages which can be kept in an actor's mailbox.
  473. const MAILBOX_LIMIT: usize = 32;
  474. /// The type of messages sent over the wire between runtimes.
  475. #[derive(Serialize, Deserialize)]
  476. pub struct WireMsg<'a> {
  477. to: ActorName,
  478. from: ActorName,
  479. payload: &'a [u8],
  480. }
  481. impl<'a> WireMsg<'a> {
  482. pub fn new(to: ActorName, from: ActorName, payload: &'a [u8]) -> Self {
  483. Self { to, from, payload }
  484. }
  485. }
  486. impl<'a> bttp::CallMsg<'a> for WireMsg<'a> {
  487. type Reply<'r> = WireReply<'r>;
  488. }
  489. impl<'a> bttp::SendMsg<'a> for WireMsg<'a> {}
  490. #[derive(Serialize, Deserialize)]
  491. pub enum WireReply<'a> {
  492. Ok(&'a [u8]),
  493. Err(&'a str),
  494. }
  495. /// A wrapper around [WireMsg] which indicates whether a call or send was executed.
  496. enum WireEnvelope<'de> {
  497. Send { msg: WireMsg<'de> },
  498. Call { msg: WireMsg<'de>, replier: Replier },
  499. }
  500. impl<'de> WireEnvelope<'de> {
  501. fn into_parts(self) -> (WireMsg<'de>, Option<Replier>) {
  502. match self {
  503. Self::Send { msg } => (msg, None),
  504. Self::Call { msg, replier } => (msg, Some(replier)),
  505. }
  506. }
  507. }
  508. /// Wrapper around a message type `T` which indicates who the message is from and, if the message
  509. /// was dispatched with `call`, provides a channel to reply to it.
  510. pub struct Envelope<T: CallMsg> {
  511. from: ActorName,
  512. reply: Option<oneshot::Sender<T::Reply>>,
  513. msg: T,
  514. }
  515. impl<T: CallMsg> Envelope<T> {
  516. pub fn new(msg: T, reply: Option<oneshot::Sender<T::Reply>>, from: ActorName) -> Self {
  517. Self { from, reply, msg }
  518. }
  519. /// Creates a new envelope containing the given message which does not expect a reply.
  520. fn new_send(from: ActorName, msg: T) -> Self {
  521. Self {
  522. from,
  523. msg,
  524. reply: None,
  525. }
  526. }
  527. /// Creates a new envelope containing the given message which expects exactly one reply.
  528. fn new_call(from: ActorName, msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
  529. let (tx, rx) = oneshot::channel::<T::Reply>();
  530. let envelope = Self {
  531. from,
  532. msg,
  533. reply: Some(tx),
  534. };
  535. (envelope, rx)
  536. }
  537. /// Returns the name of the actor which sent this message.
  538. pub fn from(&self) -> &ActorName {
  539. &self.from
  540. }
  541. /// Returns a reference to the message in this envelope.
  542. pub fn msg(&self) -> &T {
  543. &self.msg
  544. }
  545. /// Sends a reply to this message.
  546. ///
  547. /// If this message is not expecting a reply, or if this message has already been replied to,
  548. /// then an error is returned.
  549. pub fn reply(&mut self, reply: T::Reply) -> Result<()> {
  550. if let Some(tx) = self.reply.take() {
  551. if tx.send(reply).is_ok() {
  552. Ok(())
  553. } else {
  554. Err(bterr!("failed to send reply"))
  555. }
  556. } else {
  557. Err(bterr!("reply already sent"))
  558. }
  559. }
  560. /// Returns true if this message expects a reply and it has not already been replied to.
  561. pub fn needs_reply(&self) -> bool {
  562. self.reply.is_some()
  563. }
  564. pub fn split(self) -> (T, Option<oneshot::Sender<T::Reply>>, ActorName) {
  565. (self.msg, self.reply, self.from)
  566. }
  567. }
  568. type FutureResult = Pin<Box<dyn Send + Future<Output = Result<()>>>>;
  569. pub struct ActorHandle {
  570. handle: Option<JoinHandle<()>>,
  571. sender: Box<dyn Send + Sync + Any>,
  572. deliverer: Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult>,
  573. }
  574. impl ActorHandle {
  575. fn new<T, F>(handle: JoinHandle<()>, sender: mpsc::Sender<Envelope<T>>, deliverer: F) -> Self
  576. where
  577. T: 'static + CallMsg,
  578. F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult,
  579. {
  580. Self {
  581. handle: Some(handle),
  582. sender: Box::new(sender),
  583. deliverer: Box::new(deliverer),
  584. }
  585. }
  586. fn sender<T: 'static + CallMsg>(&self) -> Result<&mpsc::Sender<Envelope<T>>> {
  587. self.sender
  588. .downcast_ref::<mpsc::Sender<Envelope<T>>>()
  589. .ok_or_else(|| bterr!("unexpected message type"))
  590. }
  591. /// Sends a message to the actor represented by this handle.
  592. pub async fn send<T: 'static + SendMsg>(&self, from: ActorName, msg: T) -> Result<()> {
  593. let sender = self.sender()?;
  594. sender
  595. .send(Envelope::new_send(from, msg))
  596. .await
  597. .map_err(|_| bterr!("failed to enqueue message"))?;
  598. Ok(())
  599. }
  600. pub async fn call_through<T: 'static + CallMsg>(
  601. &self,
  602. from: ActorName,
  603. msg: T,
  604. ) -> Result<T::Reply> {
  605. let sender = self.sender()?;
  606. let (envelope, rx) = Envelope::new_call(from, msg);
  607. sender
  608. .send(envelope)
  609. .await
  610. .map_err(|_| bterr!("failed to enqueue call"))?;
  611. let reply = rx.await?;
  612. Ok(reply)
  613. }
  614. pub async fn returned(&mut self) -> Result<()> {
  615. if let Some(handle) = self.handle.take() {
  616. handle.await?;
  617. }
  618. Ok(())
  619. }
  620. pub fn abort(&mut self) {
  621. if let Some(handle) = self.handle.take() {
  622. handle.abort();
  623. }
  624. }
  625. }
  626. impl Drop for ActorHandle {
  627. fn drop(&mut self) {
  628. self.abort();
  629. }
  630. }