123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- use serde::{Serialize, Deserialize};
- use std::{
- io::{BufWriter, BufReader, prelude::*},
- process::{Child, ChildStdin, Command, Stdio},
- sync::mpsc::{channel, Receiver},
- time::{Duration},
- };
- use serde_block_tree::{self, write_to, read_from};
- use log::{error};
- #[derive(Serialize, Deserialize)]
- pub enum Message {
- /// An echo request. This should elicit an identical reply.
- Echo(String),
- /// Orders the node to halt.
- Halt,
- }
- pub struct Node {
- child: Child,
- stdin: BufWriter<ChildStdin>,
- message_rx: Receiver<Message>,
- }
- impl Node {
- pub fn new() -> Result<Node, std::io::Error> {
- let mut child = Command::new("btnode")
- .stdin(Stdio::piped())
- .stdout(Stdio::piped())
- .stderr(Stdio::inherit())
- .spawn()?;
- let stdin = BufWriter::new(child.stdin.take().unwrap());
- let mut stdout = BufReader::new(child.stdout.take().unwrap());
- let (message_tx, message_rx) = channel();
- std::thread::spawn(move || loop {
- match read_from(&mut stdout) {
- Ok(msg) => {
- match message_tx.send(msg) {
- Ok(_) => (),
- // Break if the receiver is dead.
- Err(_) => break,
- }
- },
- // Break if the child is has exited.
- Err(serde_block_tree::Error::Eof) => break,
- Err(serde_block_tree::Error::Io(io_err)) => {
- match io_err.kind() {
- std::io::ErrorKind::UnexpectedEof => break,
- _ => error!("IO error ocurred: {:?}", io_err),
- }
- },
- Err(err) => error!("Failed to deserialize child message {:?}", err),
- };
- });
- Ok(Node { child, stdin, message_rx })
- }
- pub fn send(&mut self, msg: &Message) -> serde_block_tree::Result<()> {
- write_to(&msg, &mut self.stdin)?;
- self.stdin.flush().map_err(serde_block_tree::Error::Io)?;
- Ok(())
- }
- pub fn receive(&mut self, timeout: Duration) -> Option<Message> {
- match self.message_rx.recv_timeout(timeout) {
- Ok(msg) => Some(msg),
- Err(err) => {
- eprintln!("Failed to receive message from thread. Error: {:?}", err);
- None
- }
- }
- }
- pub fn halt(mut self) -> Result<(), NodeHaltError> {
- self.send(&Message::Halt).unwrap();
- match self.child.wait() {
- Ok(status) => {
- let code = status.code();
- if code == Some(0) {
- Ok(())
- }
- else {
- Err(NodeHaltError::NonZeroExitCode(code))
- }
- },
- Err(err) => Err(NodeHaltError::Io(err)),
- }
- }
- }
- #[derive(Debug)]
- pub enum NodeHaltError {
- /// The node exited with a non-zero or non-existent status code.
- NonZeroExitCode(Option<i32>),
- /// An IO error was encountered.
- Io(std::io::Error),
- }
- impl Drop for Node {
- fn drop(&mut self) {
- let _ = self.child.kill();
- }
- }
- #[cfg(test)]
- mod test {
- use super::*;
- use ctor::ctor;
- use log::info;
- #[ctor]
- fn test_init() {
- use env_logger::{self, Env};
- env_logger::init_from_env(Env::default().default_filter_or("debug"));
- info!("Using cargo to install btnode...");
- let success = Command::new("cargo")
- .args(["install", "--path", "../btnode"])
- .status()
- .expect("failed to invoke cargo")
- .success();
- assert!(success, "failed to build btnode");
- }
- #[test]
- fn message_echo() {
- let mut node = Node::new().unwrap();
- for k in 0..7 {
- let expected = format!("Rep number {}", k);
- node.send(&Message::Echo(expected.clone())).unwrap();
- let reply = node.receive(Duration::from_millis(100)).unwrap();
- let reply_payload = match reply {
- Message::Echo(actual) => Some(actual),
- _ => None,
- };
- assert_eq!(Some(expected), reply_payload);
- }
- }
- #[test]
- fn message_halt() {
- let node = Node::new().unwrap();
- node.halt().unwrap();
- }
- }
|