lib.rs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. use btserde::{self, read_from, write_to};
  2. use log::error;
  3. use serde::{Deserialize, Serialize};
  4. use std::{
  5. io::{prelude::*, BufReader, BufWriter},
  6. process::{Child, ChildStdin, Command, Stdio},
  7. sync::mpsc::{channel, Receiver},
  8. time::Duration,
  9. };
  10. #[derive(Serialize, Deserialize, Debug)]
  11. pub enum Message {
  12. /// An echo request. This should elicit an identical reply.
  13. Echo(String),
  14. /// Orders the node to halt.
  15. Halt,
  16. }
  17. pub struct Node {
  18. child: Child,
  19. stdin: BufWriter<ChildStdin>,
  20. message_rx: Receiver<Message>,
  21. }
  22. impl Node {
  23. pub fn new() -> Result<Node, std::io::Error> {
  24. let mut child = Command::new("btnode")
  25. .stdin(Stdio::piped())
  26. .stdout(Stdio::piped())
  27. .stderr(Stdio::inherit())
  28. .spawn()?;
  29. let stdin = BufWriter::new(child.stdin.take().unwrap());
  30. let mut stdout = BufReader::new(child.stdout.take().unwrap());
  31. let (message_tx, message_rx) = channel();
  32. std::thread::spawn(move || loop {
  33. match read_from(&mut stdout) {
  34. Ok(msg) => {
  35. match message_tx.send(msg) {
  36. Ok(_) => (),
  37. // Break if the receiver is dead.
  38. Err(_) => break,
  39. }
  40. }
  41. // Break if the child is has exited.
  42. Err(btserde::Error::Eof) => break,
  43. Err(btserde::Error::Io(io_err)) => match io_err.kind() {
  44. std::io::ErrorKind::UnexpectedEof => break,
  45. _ => error!("IO error ocurred: {:?}", io_err),
  46. },
  47. Err(err) => error!("Failed to deserialize child message {:?}", err),
  48. };
  49. });
  50. Ok(Node {
  51. child,
  52. stdin,
  53. message_rx,
  54. })
  55. }
  56. pub fn send(&mut self, msg: &Message) -> btserde::Result<()> {
  57. write_to(&msg, &mut self.stdin)?;
  58. self.stdin.flush().map_err(btserde::Error::Io)?;
  59. Ok(())
  60. }
  61. pub fn receive(&mut self, timeout: Duration) -> Option<Message> {
  62. match self.message_rx.recv_timeout(timeout) {
  63. Ok(msg) => Some(msg),
  64. Err(err) => {
  65. eprintln!("Failed to receive message from thread. Error: {err:?}");
  66. None
  67. }
  68. }
  69. }
  70. pub fn halt(mut self) -> Result<(), NodeHaltError> {
  71. self.send(&Message::Halt).unwrap();
  72. match self.child.wait() {
  73. Ok(status) => {
  74. let code = status.code();
  75. if code == Some(0) {
  76. Ok(())
  77. } else {
  78. Err(NodeHaltError::NonZeroExitCode(code))
  79. }
  80. }
  81. Err(err) => Err(NodeHaltError::Io(err)),
  82. }
  83. }
  84. }
  85. #[derive(Debug)]
  86. pub enum NodeHaltError {
  87. /// The node exited with a non-zero or non-existent status code.
  88. NonZeroExitCode(Option<i32>),
  89. /// An IO error was encountered.
  90. Io(std::io::Error),
  91. }
  92. impl Drop for Node {
  93. fn drop(&mut self) {
  94. let _ = self.child.kill();
  95. }
  96. }
  97. #[cfg(test)]
  98. mod test {
  99. use super::*;
  100. use ctor::ctor;
  101. use log::info;
  102. #[ctor]
  103. fn test_init() {
  104. use env_logger::{self, Env};
  105. env_logger::init_from_env(Env::default().default_filter_or("debug"));
  106. info!("Using cargo to install btnode...");
  107. let success = Command::new("cargo")
  108. .args(["install", "--path", "../btnode"])
  109. .status()
  110. .expect("failed to invoke cargo")
  111. .success();
  112. assert!(success, "failed to build btnode");
  113. }
  114. #[test]
  115. fn message_echo() {
  116. let mut node = Node::new().unwrap();
  117. for k in 0..7 {
  118. let expected = format!("Rep number {}", k);
  119. node.send(&Message::Echo(expected.clone())).unwrap();
  120. let reply = node.receive(Duration::from_millis(100)).unwrap();
  121. let reply_payload = match reply {
  122. Message::Echo(actual) => Some(actual),
  123. _ => None,
  124. };
  125. assert_eq!(Some(expected), reply_payload);
  126. }
  127. }
  128. #[test]
  129. fn message_halt() {
  130. let node = Node::new().unwrap();
  131. node.halt().unwrap();
  132. }
  133. }