msg.rs 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. //! Code which enables sending messages between processes in the blocktree system.
  2. pub use private::*;
  3. mod private {
  4. use crate::{crypto::rand_array, BlockPath, Result, Writecap};
  5. use btserde::{read_from, write_to};
  6. use os_pipe::{PipeReader, PipeWriter};
  7. use serde::{Deserialize, Serialize};
  8. use std::io::{Read, Write};
  9. use zerocopy::FromBytes;
  10. #[derive(PartialEq, Eq, Serialize, Deserialize)]
  11. pub enum MsgError {}
  12. #[derive(Serialize, Deserialize)]
  13. pub enum MsgBody {
  14. Success,
  15. Fail(MsgError),
  16. Ping,
  17. Hello(Writecap),
  18. Read { offset: u64 },
  19. Write { offset: u64, data: Vec<u8> },
  20. Custom(Vec<u8>),
  21. }
  22. #[derive(Serialize, Deserialize)]
  23. pub struct Msg {
  24. from: BlockPath,
  25. to: BlockPath,
  26. id: u128,
  27. body: MsgBody,
  28. }
  29. impl Msg {
  30. pub fn from(&self) -> &BlockPath {
  31. &self.from
  32. }
  33. pub fn to(&self) -> &BlockPath {
  34. &self.to
  35. }
  36. pub fn id(&self) -> u128 {
  37. self.id
  38. }
  39. pub fn body(&self) -> &MsgBody {
  40. &self.body
  41. }
  42. }
  43. #[derive(Serialize, Deserialize)]
  44. pub enum VerMsg {
  45. V0(Msg),
  46. }
  47. pub trait Sender: Send {
  48. fn send(&mut self, msg: Msg) -> Result<()>;
  49. fn path(&self) -> &BlockPath;
  50. /// Generates and returns a new message ID.
  51. fn gen_id(&mut self) -> Result<u128> {
  52. const LEN: usize = std::mem::size_of::<u128>();
  53. let bytes = rand_array::<LEN>()?;
  54. let option = u128::read_from(bytes.as_slice());
  55. // Safety: because LEN == size_of::<u128>(), read_from should have returned Some.
  56. Ok(option.unwrap())
  57. }
  58. /// This is a convenience method which creates a new message, addresses it to the given
  59. /// path, sets it as from this sender's path, generates a new ID for it, puts the given
  60. /// body inside of it, and dispatches it with the `send` method.
  61. fn send_new(&mut self, to: BlockPath, body: MsgBody) -> Result<()> {
  62. let id = self.gen_id()?;
  63. let msg = Msg {
  64. to,
  65. from: self.path().to_owned(),
  66. id,
  67. body,
  68. };
  69. self.send(msg)
  70. }
  71. }
  72. pub trait Receiver {
  73. fn receive(&mut self) -> Result<Msg>;
  74. fn path(&self) -> &BlockPath;
  75. }
  76. pub trait Channel {
  77. type Sender: Sender;
  78. type Receiver: Receiver;
  79. fn new(path: BlockPath) -> Result<(Self::Sender, Self::Receiver)>;
  80. }
  81. pub trait Router {
  82. fn add_sender<S: Sender>(&mut self, sender: S) -> Result<()>;
  83. fn send(&mut self, msg: Msg) -> Result<()>;
  84. }
  85. trait TryClone<O> {
  86. fn try_clone(&self) -> Result<O>;
  87. }
  88. impl TryClone<PipeWriter> for PipeWriter {
  89. fn try_clone(&self) -> Result<PipeWriter> {
  90. Ok(PipeWriter::try_clone(self)?)
  91. }
  92. }
  93. pub struct WriteSender<W> {
  94. write: W,
  95. path: BlockPath,
  96. }
  97. impl<W: Write + Send> Sender for WriteSender<W> {
  98. fn send(&mut self, msg: Msg) -> Result<()> {
  99. Ok(write_to(&msg, &mut self.write)?)
  100. }
  101. fn path(&self) -> &BlockPath {
  102. &self.path
  103. }
  104. }
  105. impl<W: Write + TryClone<W>> TryClone<WriteSender<W>> for WriteSender<W> {
  106. fn try_clone(&self) -> Result<WriteSender<W>> {
  107. Ok(WriteSender {
  108. write: self.write.try_clone()?,
  109. path: self.path.clone(),
  110. })
  111. }
  112. }
  113. pub struct ReadReceiver<R> {
  114. read: R,
  115. path: BlockPath,
  116. }
  117. impl<R: Read> Receiver for ReadReceiver<R> {
  118. fn receive(&mut self) -> Result<Msg> {
  119. Ok(read_from(&mut self.read)?)
  120. }
  121. fn path(&self) -> &BlockPath {
  122. &self.path
  123. }
  124. }
  125. pub enum PipeChannel {}
  126. impl Channel for PipeChannel {
  127. type Sender = WriteSender<PipeWriter>;
  128. type Receiver = ReadReceiver<PipeReader>;
  129. fn new(path: BlockPath) -> Result<(Self::Sender, Self::Receiver)> {
  130. let (read, write) = os_pipe::pipe()?;
  131. let sender = WriteSender {
  132. path: path.clone(),
  133. write,
  134. };
  135. let receiver = ReadReceiver { path, read };
  136. Ok((sender, receiver))
  137. }
  138. }
  139. }
  140. #[cfg(test)]
  141. mod tests {
  142. use crate::test_helpers;
  143. use super::*;
  144. #[test]
  145. fn ping_pong_via_pipes() {
  146. let ping_path = test_helpers::make_path(vec!["ping"]);
  147. let (mut ping_sender, mut ping_receiver) =
  148. PipeChannel::new(ping_path.clone()).expect("failed to create a ping channel");
  149. let pong_path = test_helpers::make_path(vec!["pong"]);
  150. let (mut pong_sender, mut pong_receiver) =
  151. PipeChannel::new(pong_path.clone()).expect("failed to create a pong channel");
  152. let pong = std::thread::spawn(move || loop {
  153. let msg = pong_receiver.receive().expect("pong receive failed");
  154. assert_eq!(pong_receiver.path(), msg.to());
  155. match msg.body() {
  156. MsgBody::Ping => ping_sender
  157. .send_new(ping_path.clone(), MsgBody::Success)
  158. .expect("send to ping failed"),
  159. MsgBody::Success => return,
  160. _ => panic!("unexpected message received by pong"),
  161. }
  162. });
  163. let mut iter = 5;
  164. while iter > 0 {
  165. pong_sender
  166. .send_new(pong_path.clone(), MsgBody::Ping)
  167. .expect("send to pong failed");
  168. let msg = ping_receiver.receive().expect("ping receive failed");
  169. assert_eq!(ping_receiver.path(), msg.to());
  170. match msg.body() {
  171. MsgBody::Success => iter -= 1,
  172. _ => panic!("unexpected message received by ping"),
  173. }
  174. }
  175. pong_sender
  176. .send_new(pong_path.clone(), MsgBody::Success)
  177. .expect("send success to pong failed");
  178. pong.join().expect("join failed");
  179. }
  180. }