use serde::{Serialize, Deserialize}; use std::{ io::{BufWriter, BufReader, prelude::*}, process::{Child, ChildStdin, Command, Stdio}, sync::mpsc::{channel, Receiver}, time::{Duration}, }; use serde_block_tree::{self, write_to, read_from}; use log::{error}; #[derive(Serialize, Deserialize)] pub enum Message { /// An echo request. This should elicit an identical reply. Echo(String), /// Orders the node to halt. Halt, } pub struct Node { child: Child, stdin: BufWriter, message_rx: Receiver, } impl Node { pub fn new() -> Result { let mut child = Command::new("btnode") .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::inherit()) .spawn()?; let stdin = BufWriter::new(child.stdin.take().unwrap()); let mut stdout = BufReader::new(child.stdout.take().unwrap()); let (message_tx, message_rx) = channel(); std::thread::spawn(move || loop { match read_from(&mut stdout) { Ok(msg) => { match message_tx.send(msg) { Ok(_) => (), // Break if the receiver is dead. Err(_) => break, } }, // Break if the child is has exited. Err(serde_block_tree::Error::Eof) => break, Err(serde_block_tree::Error::Io(io_err)) => { match io_err.kind() { std::io::ErrorKind::UnexpectedEof => break, _ => error!("IO error ocurred: {:?}", io_err), } }, Err(err) => error!("Failed to deserialize child message {:?}", err), }; }); Ok(Node { child, stdin, message_rx }) } pub fn send(&mut self, msg: &Message) -> serde_block_tree::Result<()> { write_to(&msg, &mut self.stdin)?; self.stdin.flush().map_err(serde_block_tree::Error::Io)?; Ok(()) } pub fn receive(&mut self, timeout: Duration) -> Option { match self.message_rx.recv_timeout(timeout) { Ok(msg) => Some(msg), Err(err) => { eprintln!("Failed to receive message from thread. Error: {:?}", err); None } } } pub fn halt(mut self) -> Result<(), NodeHaltError> { self.send(&Message::Halt).unwrap(); match self.child.wait() { Ok(status) => { let code = status.code(); if code == Some(0) { Ok(()) } else { Err(NodeHaltError::NonZeroExitCode(code)) } }, Err(err) => Err(NodeHaltError::Io(err)), } } } #[derive(Debug)] pub enum NodeHaltError { /// The node exited with a non-zero or non-existent status code. NonZeroExitCode(Option), /// An IO error was encountered. Io(std::io::Error), } impl Drop for Node { fn drop(&mut self) { let _ = self.child.kill(); } } #[cfg(test)] mod test { use super::*; use ctor::ctor; use log::info; #[ctor] fn test_init() { use env_logger::{self, Env}; env_logger::init_from_env(Env::default().default_filter_or("debug")); info!("Using cargo to install btnode..."); let success = Command::new("cargo") .args(["install", "--path", "../btnode"]) .status() .expect("failed to invoke cargo") .success(); assert!(success, "failed to build btnode"); } #[test] fn message_echo() { let mut node = Node::new().unwrap(); for k in 0..7 { let expected = format!("Rep number {}", k); node.send(&Message::Echo(expected.clone())).unwrap(); let reply = node.receive(Duration::from_millis(100)).unwrap(); let reply_payload = match reply { Message::Echo(actual) => Some(actual), _ => None, }; assert_eq!(Some(expected), reply_payload); } } #[test] fn message_halt() { let node = Node::new().unwrap(); node.halt().unwrap(); } }