|
@@ -1,76 +1,56 @@
|
|
|
//! This module contains the code necessary to run and supervise actors.
|
|
|
|
|
|
-use std::{future::Future, pin::Pin};
|
|
|
+use std::time::Duration;
|
|
|
|
|
|
use log::{debug, error, warn};
|
|
|
-use tokio::{
|
|
|
- select,
|
|
|
- sync::{mpsc, oneshot},
|
|
|
- task::{AbortHandle, JoinSet},
|
|
|
-};
|
|
|
+use tokio::time::timeout;
|
|
|
|
|
|
-use crate::{ActorExit, ActorFault, ActorId, ControlMsg, Runtime};
|
|
|
+use crate::{ActorExit, ActorFault, ActorId, ControlMsg, Runtime, RuntimeError};
|
|
|
|
|
|
+/// The type that spawned actors complete with.
|
|
|
+///
|
|
|
+/// It's primary responsibility is conveying the [ActorId] to the [kernel] regardless of whether
|
|
|
+/// an actor exited normally, with an error, or panicked.
|
|
|
pub(super) type FaultResult = std::result::Result<ActorId, ActorFault>;
|
|
|
-pub(super) type BoxedFuture = Pin<Box<dyn Send + Future<Output = FaultResult>>>;
|
|
|
-
|
|
|
-pub struct SpawnReq {
|
|
|
- task: BoxedFuture,
|
|
|
- sender: oneshot::Sender<AbortHandle>,
|
|
|
-}
|
|
|
-
|
|
|
-impl SpawnReq {
|
|
|
- pub(super) fn new<Fut>(task: Fut) -> (Self, oneshot::Receiver<AbortHandle>)
|
|
|
- where
|
|
|
- Fut: 'static + Send + Future<Output = FaultResult>,
|
|
|
- {
|
|
|
- let (sender, receiver) = oneshot::channel();
|
|
|
- let req = Self {
|
|
|
- task: Box::pin(task),
|
|
|
- sender,
|
|
|
- };
|
|
|
- (req, receiver)
|
|
|
- }
|
|
|
-}
|
|
|
|
|
|
/// The kernel is responsible for spawning and supervising actors.
|
|
|
-pub(super) async fn kernel(runtime: &Runtime, mut tasks: mpsc::Receiver<SpawnReq>) {
|
|
|
- let mut running = JoinSet::<FaultResult>::new();
|
|
|
+pub(super) async fn kernel(runtime: &Runtime) {
|
|
|
+ const EXIT_WAIT: Duration = Duration::from_millis(10);
|
|
|
|
|
|
loop {
|
|
|
- select! {
|
|
|
- option = tasks.recv() => {
|
|
|
- match option {
|
|
|
- Some(SpawnReq { task, sender }) => {
|
|
|
- let handle = running.spawn(task);
|
|
|
- if sender.send(handle).is_err() {
|
|
|
- error!("SpawnReq sender dropped the channel. Aborting actor.");
|
|
|
- }
|
|
|
- }
|
|
|
- None => panic!("The Runtime was dropped!")
|
|
|
- };
|
|
|
+ let option = {
|
|
|
+ let mut tasks = runtime.tasks.lock().await;
|
|
|
+ if let Ok(option) = timeout(EXIT_WAIT, tasks.join_next()).await {
|
|
|
+ option
|
|
|
+ } else {
|
|
|
+ // Timed out waiting for a task to exit, releasing lock to prevent starvation of
|
|
|
+ // other tasks.
|
|
|
+ continue;
|
|
|
}
|
|
|
- Some(result) = running.join_next() => {
|
|
|
- match result {
|
|
|
- Ok(actor_result) => {
|
|
|
- handle_actor_exit(actor_result, runtime).await;
|
|
|
- }
|
|
|
- Err(join_error) => {
|
|
|
- match join_error.try_into_panic() {
|
|
|
- Ok(panic_obj) => {
|
|
|
- if let Some(message) = panic_obj.downcast_ref::<String>() {
|
|
|
- error!("An actor panic was uncaught: {message}");
|
|
|
- } else {
|
|
|
- error!("An actor panic was uncaught.");
|
|
|
- }
|
|
|
- }
|
|
|
- Err(join_error) => {
|
|
|
- debug!("Actor was aborted: {join_error}");
|
|
|
+ };
|
|
|
+ if let Some(join_result) = option {
|
|
|
+ match join_result {
|
|
|
+ Ok(actor_result) => {
|
|
|
+ handle_actor_exit(actor_result, runtime).await;
|
|
|
+ }
|
|
|
+ Err(join_error) => {
|
|
|
+ match join_error.try_into_panic() {
|
|
|
+ Ok(panic_obj) => {
|
|
|
+ if let Some(message) = panic_obj.downcast_ref::<String>() {
|
|
|
+ error!("An actor panic was uncaught: {message}");
|
|
|
+ } else {
|
|
|
+ error!("An actor panic was uncaught.");
|
|
|
}
|
|
|
- };
|
|
|
- }
|
|
|
+ }
|
|
|
+ Err(join_error) => {
|
|
|
+ debug!("Actor was aborted: {join_error}");
|
|
|
+ }
|
|
|
+ };
|
|
|
}
|
|
|
}
|
|
|
+ } else {
|
|
|
+ // The join set is currently empty. We just loop again.
|
|
|
+ continue;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -81,7 +61,7 @@ async fn handle_actor_exit(actor_result: FaultResult, runtime: &Runtime) {
|
|
|
let (actor_id, actor_exit) = match actor_result {
|
|
|
Ok(actor_id) => {
|
|
|
debug!("Actor {actor_id} exited normally.");
|
|
|
- (actor_id, ActorExit::Ok)
|
|
|
+ (actor_id, ActorExit::Ok(actor_id))
|
|
|
}
|
|
|
Err(err) => {
|
|
|
error!("{err}");
|
|
@@ -89,6 +69,7 @@ async fn handle_actor_exit(actor_result: FaultResult, runtime: &Runtime) {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
+ // Take ownership of the exited actor's handle.
|
|
|
let mut handle = {
|
|
|
let mut handles = runtime.handles.write().await;
|
|
|
if let Some(handle) = handles.remove(&actor_id) {
|
|
@@ -99,6 +80,7 @@ async fn handle_actor_exit(actor_result: FaultResult, runtime: &Runtime) {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
+ // Send notification to its owner.
|
|
|
if let Some(owner) = handle.take_owner() {
|
|
|
let handle_name = handle.name();
|
|
|
let msg = ControlMsg::OwnedExited {
|
|
@@ -107,12 +89,19 @@ async fn handle_actor_exit(actor_result: FaultResult, runtime: &Runtime) {
|
|
|
};
|
|
|
let result = runtime.send_control(owner.clone(), msg).await;
|
|
|
if let Err(err) = result {
|
|
|
- error!("Failed to notify owner {owner} that {handle_name} exited: {err}");
|
|
|
+ if let Some(RuntimeError::BadActorName(_)) = err.downcast_ref::<RuntimeError>() {
|
|
|
+ // There's nothing to do in this case because the owner has already exited. We just
|
|
|
+ // happened to handle the exit of one of its owned actors first.
|
|
|
+ } else {
|
|
|
+ error!("Failed to notify owner {owner} that {handle_name} exited: {err}");
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // Notify all of its owned actors.
|
|
|
let handle_name = handle.name().clone();
|
|
|
for owned in handle.owns_mut().drain(0..) {
|
|
|
+ debug!("sending owner exit notification to {owned}");
|
|
|
let msg = ControlMsg::OwnerExited(actor_exit.clone());
|
|
|
let result = runtime.send_control(owned.clone(), msg).await;
|
|
|
if let Err(err) = result {
|
|
@@ -120,3 +109,277 @@ async fn handle_actor_exit(actor_result: FaultResult, runtime: &Runtime) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+#[cfg(test)]
|
|
|
+mod tests {
|
|
|
+ use std::{future::Future, sync::Arc};
|
|
|
+
|
|
|
+ use btlib::bterr;
|
|
|
+ use tokio::sync::oneshot;
|
|
|
+
|
|
|
+ use super::*;
|
|
|
+
|
|
|
+ use crate::{
|
|
|
+ tests::{EchoMsg, ASYNC_RT, RUNTIME},
|
|
|
+ ActorError, ActorErrorCommon, ActorName, ActorResult, Envelope, Mailbox, Named, TransKind,
|
|
|
+ };
|
|
|
+
|
|
|
+ #[derive(Debug)]
|
|
|
+ struct ExitTestResult {
|
|
|
+ expected_name: ActorName,
|
|
|
+ actual_name: ActorName,
|
|
|
+ actual_exit: ActorExit,
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Actor used to test the receipt of a notification when one of its owned actors exits.
|
|
|
+ async fn spawn_owning_exit_server<F, Fut>(
|
|
|
+ runtime: &'static Runtime,
|
|
|
+ client: F,
|
|
|
+ sender: oneshot::Sender<ExitTestResult>,
|
|
|
+ ) -> ActorName
|
|
|
+ where
|
|
|
+ Fut: 'static + Send + Future<Output = ActorResult>,
|
|
|
+ F: 'static + Send + FnOnce(Mailbox<EchoMsg>, ActorId, &'static Runtime) -> Fut,
|
|
|
+ {
|
|
|
+ let server = move |mut mailbox: Mailbox<EchoMsg>, actor_id, runtime: &'static Runtime| async move {
|
|
|
+ let owner = runtime.actor_name(actor_id);
|
|
|
+ let mut expected_name = Some(runtime.spawn(Some(owner), client).await.unwrap());
|
|
|
+ let mut sender = Some(sender);
|
|
|
+ while let Some(envelope) = mailbox.recv().await {
|
|
|
+ match envelope {
|
|
|
+ Envelope::Control(msg) => match msg {
|
|
|
+ ControlMsg::OwnedExited {
|
|
|
+ name: actual_name,
|
|
|
+ exit,
|
|
|
+ } => {
|
|
|
+ let test_result = ExitTestResult {
|
|
|
+ actual_name,
|
|
|
+ expected_name: expected_name.take().unwrap(),
|
|
|
+ actual_exit: exit,
|
|
|
+ };
|
|
|
+ sender.take().unwrap().send(test_result).unwrap();
|
|
|
+ }
|
|
|
+ msg => panic!("Unexpected control message: {}", msg.name()),
|
|
|
+ },
|
|
|
+ envelope => panic!("Unexpected envelope type: {}", envelope.name()),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ActorResult::Ok(actor_id)
|
|
|
+ };
|
|
|
+ runtime.spawn(None, server).await.unwrap()
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn owner_notification_when_owned_exit_ok() {
|
|
|
+ ASYNC_RT.block_on(async {
|
|
|
+ let client = |_: Mailbox<EchoMsg>, actor_id, _| async move {
|
|
|
+ ActorResult::Ok(actor_id)
|
|
|
+ };
|
|
|
+ let (sender, receiver) = oneshot::channel();
|
|
|
+
|
|
|
+ spawn_owning_exit_server(&RUNTIME, client, sender).await;
|
|
|
+ let test_result = timeout(Duration::from_millis(500), receiver).await.unwrap().unwrap();
|
|
|
+
|
|
|
+ assert_eq!(test_result.expected_name, test_result.actual_name);
|
|
|
+ let expected_id = test_result.expected_name.act_id();
|
|
|
+ assert!(matches!(test_result.actual_exit, ActorExit::Ok(actor_id) if actor_id == expected_id));
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn owner_notification_when_owned_exit_err() {
|
|
|
+ ASYNC_RT.block_on(async {
|
|
|
+ const EXPECTED_ERR: &str = "Expected error text.";
|
|
|
+ let expected_actor_impl = Arc::new(String::from("notification_client"));
|
|
|
+ let expected_state = Arc::new(String::from("Init"));
|
|
|
+ let expected_message = Arc::new(String::from("None"));
|
|
|
+ const EXPECTED_KIND: TransKind = TransKind::Receive;
|
|
|
+
|
|
|
+ let client = {
|
|
|
+ let expected_actor_impl = expected_actor_impl.clone();
|
|
|
+ let expected_state = expected_state.clone();
|
|
|
+ let expected_message = expected_message.clone();
|
|
|
+ move |_: Mailbox<EchoMsg>, actor_id, _| async move {
|
|
|
+ ActorResult::Err(ActorError::new(
|
|
|
+ bterr!(EXPECTED_ERR),
|
|
|
+ ActorErrorCommon {
|
|
|
+ actor_id,
|
|
|
+ actor_impl: expected_actor_impl,
|
|
|
+ state: expected_state,
|
|
|
+ message: expected_message,
|
|
|
+ kind: EXPECTED_KIND,
|
|
|
+ },
|
|
|
+ ))
|
|
|
+ }
|
|
|
+ };
|
|
|
+ let (sender, receiver) = oneshot::channel();
|
|
|
+
|
|
|
+ spawn_owning_exit_server(&RUNTIME, client, sender).await;
|
|
|
+ let test_result = timeout(Duration::from_millis(500), receiver)
|
|
|
+ .await
|
|
|
+ .unwrap()
|
|
|
+ .unwrap();
|
|
|
+
|
|
|
+ assert_eq!(test_result.expected_name, test_result.actual_name);
|
|
|
+ let expected_id = test_result.expected_name.act_id();
|
|
|
+ if let ActorExit::Error(actual_error) = test_result.actual_exit {
|
|
|
+ assert_eq!(expected_id, actual_error.actor_id());
|
|
|
+ assert_eq!(EXPECTED_ERR, actual_error.err().as_ref());
|
|
|
+ assert_eq!(&expected_actor_impl, actual_error.actor_impl());
|
|
|
+ assert_eq!(&expected_state, actual_error.state());
|
|
|
+ assert_eq!(&expected_message, actual_error.message());
|
|
|
+ assert_eq!(EXPECTED_KIND, actual_error.kind());
|
|
|
+ } else {
|
|
|
+ panic!("Actor exit wasn't the right variant.");
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn owner_notification_when_owned_exit_panic() {
|
|
|
+ ASYNC_RT.block_on(async {
|
|
|
+ const EXPECTED_ERR: &str = "Expected panic text.";
|
|
|
+ let client = |_: Mailbox<EchoMsg>, _, _| async move {
|
|
|
+ panic!("{EXPECTED_ERR}");
|
|
|
+ };
|
|
|
+ let (sender, receiver) = oneshot::channel();
|
|
|
+
|
|
|
+ spawn_owning_exit_server(&RUNTIME, client, sender).await;
|
|
|
+ let test_result = timeout(Duration::from_millis(500), receiver)
|
|
|
+ .await
|
|
|
+ .unwrap()
|
|
|
+ .unwrap();
|
|
|
+
|
|
|
+ assert_eq!(test_result.expected_name, test_result.actual_name);
|
|
|
+ let expected_id = test_result.expected_name.act_id();
|
|
|
+ if let ActorExit::Panic(actual_panic) = test_result.actual_exit {
|
|
|
+ assert_eq!(expected_id, actual_panic.actor_id());
|
|
|
+ assert_eq!(EXPECTED_ERR, actual_panic.err().unwrap());
|
|
|
+ } else {
|
|
|
+ panic!("Actor exit wasn't the right variant.");
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Actor used to test the receipt of a notification when its owner exits.
|
|
|
+ async fn spawn_owned_exit_server(
|
|
|
+ runtime: &'static Runtime,
|
|
|
+ owner: ActorName,
|
|
|
+ sender: oneshot::Sender<ActorExit>,
|
|
|
+ ) {
|
|
|
+ let owned = |mut mailbox: Mailbox<EchoMsg>, owned_id, _| async move {
|
|
|
+ let mut sender = Some(sender);
|
|
|
+ while let Some(envelope) = mailbox.recv().await {
|
|
|
+ match envelope {
|
|
|
+ Envelope::Control(msg) => match msg {
|
|
|
+ ControlMsg::OwnerExited(actor_exit) => {
|
|
|
+ sender.take().unwrap().send(actor_exit).unwrap();
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ msg => panic!("Unexpected control message: {}", msg.name()),
|
|
|
+ },
|
|
|
+ envelope => panic!("Unexpected envelope kind: {}", envelope.name()),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ActorResult::Ok(owned_id)
|
|
|
+ };
|
|
|
+
|
|
|
+ runtime.spawn(Some(owner), owned).await.unwrap();
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn owned_notification_when_owner_exit_ok() {
|
|
|
+ ASYNC_RT.block_on(async {
|
|
|
+ let (sender, receiver) = oneshot::channel();
|
|
|
+ let owner = |_: Mailbox<EchoMsg>, owner_id, runtime: &'static Runtime| async move {
|
|
|
+ let owner_name = runtime.actor_name(owner_id);
|
|
|
+ spawn_owned_exit_server(runtime, owner_name, sender).await;
|
|
|
+ ActorResult::Ok(owner_id)
|
|
|
+ };
|
|
|
+
|
|
|
+ let expected_name = RUNTIME.spawn(None, owner).await.unwrap();
|
|
|
+
|
|
|
+ let actual = timeout(Duration::from_millis(500), receiver)
|
|
|
+ .await
|
|
|
+ .unwrap()
|
|
|
+ .unwrap();
|
|
|
+ let expected_id = expected_name.act_id();
|
|
|
+ assert!(matches!(actual, ActorExit::Ok(actual_id) if actual_id == expected_id));
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn owned_notification_when_owner_exit_error() {
|
|
|
+ ASYNC_RT.block_on(async {
|
|
|
+ const EXPECTED_ERR: &str = "Expected error text.";
|
|
|
+ let expected_actor_impl = Arc::new(String::from("notification_client"));
|
|
|
+ let expected_state = Arc::new(String::from("Init"));
|
|
|
+ let expected_message = Arc::new(String::from("None"));
|
|
|
+ const EXPECTED_KIND: TransKind = TransKind::Receive;
|
|
|
+
|
|
|
+ let (sender, receiver) = oneshot::channel();
|
|
|
+ let owner = {
|
|
|
+ let expected_actor_impl = expected_actor_impl.clone();
|
|
|
+ let expected_state = expected_state.clone();
|
|
|
+ let expected_message = expected_message.clone();
|
|
|
+ move |_: Mailbox<EchoMsg>, owner_id, runtime: &'static Runtime| async move {
|
|
|
+ let owner_name = runtime.actor_name(owner_id);
|
|
|
+ spawn_owned_exit_server(runtime, owner_name, sender).await;
|
|
|
+ ActorResult::Err(ActorError::new(
|
|
|
+ bterr!(EXPECTED_ERR),
|
|
|
+ ActorErrorCommon {
|
|
|
+ actor_id: owner_id,
|
|
|
+ actor_impl: expected_actor_impl,
|
|
|
+ state: expected_state,
|
|
|
+ message: expected_message,
|
|
|
+ kind: EXPECTED_KIND,
|
|
|
+ },
|
|
|
+ ))
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ let expected_name = RUNTIME.spawn(None, owner).await.unwrap();
|
|
|
+
|
|
|
+ let actual = timeout(Duration::from_millis(500), receiver)
|
|
|
+ .await
|
|
|
+ .unwrap()
|
|
|
+ .unwrap();
|
|
|
+ if let ActorExit::Error(actual_error) = actual {
|
|
|
+ assert_eq!(expected_name.act_id(), actual_error.actor_id());
|
|
|
+ assert_eq!(EXPECTED_ERR, actual_error.err().as_ref());
|
|
|
+ assert_eq!(&expected_actor_impl, actual_error.actor_impl());
|
|
|
+ assert_eq!(&expected_state, actual_error.state());
|
|
|
+ assert_eq!(&expected_message, actual_error.message());
|
|
|
+ assert_eq!(EXPECTED_KIND, actual_error.kind());
|
|
|
+ } else {
|
|
|
+ panic!("Actor exit wasn't the right variant.");
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn owned_notification_when_owner_exit_panic() {
|
|
|
+ ASYNC_RT.block_on(async {
|
|
|
+ const EXPECTED_ERR: &str = "Expected panic text.";
|
|
|
+ let (sender, receiver) = oneshot::channel();
|
|
|
+ let owner = |_: Mailbox<EchoMsg>, owner_id, runtime: &'static Runtime| async move {
|
|
|
+ let owner_name = runtime.actor_name(owner_id);
|
|
|
+ spawn_owned_exit_server(runtime, owner_name, sender).await;
|
|
|
+ panic!("{EXPECTED_ERR}");
|
|
|
+ };
|
|
|
+
|
|
|
+ let expected_name = RUNTIME.spawn(None, owner).await.unwrap();
|
|
|
+
|
|
|
+ let actual = timeout(Duration::from_millis(500), receiver)
|
|
|
+ .await
|
|
|
+ .unwrap()
|
|
|
+ .unwrap();
|
|
|
+ if let ActorExit::Panic(panic) = actual {
|
|
|
+ assert_eq!(expected_name.act_id(), panic.actor_id());
|
|
|
+ assert_eq!(EXPECTED_ERR, panic.err().unwrap());
|
|
|
+ } else {
|
|
|
+ panic!("Actor exit wasn't the right variant.");
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+}
|