lib.rs 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. use serde::{Serialize, Deserialize};
  2. use std::{
  3. io::{BufWriter, BufReader, prelude::*},
  4. process::{Child, ChildStdin, Command, Stdio},
  5. sync::mpsc::{channel, Receiver},
  6. time::{Duration},
  7. };
  8. use serde_block_tree::{self, write_to, read_from};
  9. use log::{error};
  10. #[derive(Serialize, Deserialize)]
  11. pub enum Message {
  12. /// An echo request. This should elicit an identical reply.
  13. Echo(String),
  14. /// Orders the node to halt.
  15. Halt,
  16. }
  17. pub struct Node {
  18. child: Child,
  19. stdin: BufWriter<ChildStdin>,
  20. message_rx: Receiver<Message>,
  21. }
  22. impl Node {
  23. pub fn new() -> Result<Node, std::io::Error> {
  24. let mut child = Command::new("btnode")
  25. .stdin(Stdio::piped())
  26. .stdout(Stdio::piped())
  27. .stderr(Stdio::inherit())
  28. .spawn()?;
  29. let stdin = BufWriter::new(child.stdin.take().unwrap());
  30. let mut stdout = BufReader::new(child.stdout.take().unwrap());
  31. let (message_tx, message_rx) = channel();
  32. std::thread::spawn(move || loop {
  33. match read_from(&mut stdout) {
  34. Ok(msg) => {
  35. match message_tx.send(msg) {
  36. Ok(_) => (),
  37. // Break if the receiver is dead.
  38. Err(_) => break,
  39. }
  40. },
  41. // Break if the child is has exited.
  42. Err(serde_block_tree::Error::Eof) => break,
  43. Err(serde_block_tree::Error::Io(io_err)) => {
  44. match io_err.kind() {
  45. std::io::ErrorKind::UnexpectedEof => break,
  46. _ => error!("IO error ocurred: {:?}", io_err),
  47. }
  48. },
  49. Err(err) => error!("Failed to deserialize child message {:?}", err),
  50. };
  51. });
  52. Ok(Node { child, stdin, message_rx })
  53. }
  54. pub fn send(&mut self, msg: &Message) -> serde_block_tree::Result<()> {
  55. write_to(&msg, &mut self.stdin)?;
  56. self.stdin.flush().map_err(serde_block_tree::Error::Io)?;
  57. Ok(())
  58. }
  59. pub fn receive(&mut self, timeout: Duration) -> Option<Message> {
  60. match self.message_rx.recv_timeout(timeout) {
  61. Ok(msg) => Some(msg),
  62. Err(err) => {
  63. eprintln!("Failed to receive message from thread. Error: {:?}", err);
  64. None
  65. }
  66. }
  67. }
  68. pub fn halt(mut self) -> Result<(), NodeHaltError> {
  69. self.send(&Message::Halt).unwrap();
  70. match self.child.wait() {
  71. Ok(status) => {
  72. let code = status.code();
  73. if code == Some(0) {
  74. Ok(())
  75. }
  76. else {
  77. Err(NodeHaltError::NonZeroExitCode(code))
  78. }
  79. },
  80. Err(err) => Err(NodeHaltError::Io(err)),
  81. }
  82. }
  83. }
  84. #[derive(Debug)]
  85. pub enum NodeHaltError {
  86. /// The node exited with a non-zero or non-existent status code.
  87. NonZeroExitCode(Option<i32>),
  88. /// An IO error was encountered.
  89. Io(std::io::Error),
  90. }
  91. impl Drop for Node {
  92. fn drop(&mut self) {
  93. let _ = self.child.kill();
  94. }
  95. }
  96. #[cfg(test)]
  97. mod test {
  98. use super::*;
  99. use ctor::ctor;
  100. use log::info;
  101. #[ctor]
  102. fn test_init() {
  103. use env_logger::{self, Env};
  104. env_logger::init_from_env(Env::default().default_filter_or("debug"));
  105. info!("Using cargo to install btnode...");
  106. let success = Command::new("cargo")
  107. .args(["install", "--path", "../btnode"])
  108. .status()
  109. .expect("failed to invoke cargo")
  110. .success();
  111. assert!(success, "failed to build btnode");
  112. }
  113. #[test]
  114. fn message_echo() {
  115. let mut node = Node::new().unwrap();
  116. for k in 0..7 {
  117. let expected = format!("Rep number {}", k);
  118. node.send(&Message::Echo(expected.clone())).unwrap();
  119. let reply = node.receive(Duration::from_millis(100)).unwrap();
  120. let reply_payload = match reply {
  121. Message::Echo(actual) => Some(actual),
  122. _ => None,
  123. };
  124. assert_eq!(Some(expected), reply_payload);
  125. }
  126. }
  127. #[test]
  128. fn message_halt() {
  129. let node = Node::new().unwrap();
  130. node.halt().unwrap();
  131. }
  132. }