123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- //! Code which enables sending messages between processes in the blocktree system.
- pub use private::*;
- mod private {
- use crate::{crypto::rand_array, BlockPath, Result, Writecap};
- use btserde::{read_from, write_to};
- use os_pipe::{PipeReader, PipeWriter};
- use serde::{Deserialize, Serialize};
- use std::io::{Read, Write};
- use zerocopy::FromBytes;
- #[derive(PartialEq, Eq, Serialize, Deserialize)]
- pub enum MsgError {}
- #[derive(Serialize, Deserialize)]
- pub enum MsgBody {
- Success,
- Fail(MsgError),
- Ping,
- Hello(Writecap),
- Read { offset: u64 },
- Write { offset: u64, data: Vec<u8> },
- Custom(Vec<u8>),
- }
- #[derive(Serialize, Deserialize)]
- pub struct Msg {
- from: BlockPath,
- to: BlockPath,
- id: u128,
- body: MsgBody,
- }
- impl Msg {
- pub fn from(&self) -> &BlockPath {
- &self.from
- }
- pub fn to(&self) -> &BlockPath {
- &self.to
- }
- pub fn id(&self) -> u128 {
- self.id
- }
- pub fn body(&self) -> &MsgBody {
- &self.body
- }
- }
- #[derive(Serialize, Deserialize)]
- pub enum VerMsg {
- V0(Msg),
- }
- pub trait Sender: Send {
- fn send(&mut self, msg: Msg) -> Result<()>;
- fn path(&self) -> &BlockPath;
- /// Generates and returns a new message ID.
- fn gen_id(&mut self) -> Result<u128> {
- const LEN: usize = std::mem::size_of::<u128>();
- let bytes = rand_array::<LEN>()?;
- let option = u128::read_from(bytes.as_slice());
- // Safety: because LEN == size_of::<u128>(), read_from should have returned Some.
- Ok(option.unwrap())
- }
- /// This is a convenience method which creates a new message, addresses it to the given
- /// path, sets it as from this sender's path, generates a new ID for it, puts the given
- /// body inside of it, and dispatches it with the `send` method.
- fn send_new(&mut self, to: BlockPath, body: MsgBody) -> Result<()> {
- let id = self.gen_id()?;
- let msg = Msg {
- to,
- from: self.path().to_owned(),
- id,
- body,
- };
- self.send(msg)
- }
- }
- pub trait Receiver {
- fn receive(&mut self) -> Result<Msg>;
- fn path(&self) -> &BlockPath;
- }
- pub trait Channel {
- type Sender: Sender;
- type Receiver: Receiver;
- fn new(path: BlockPath) -> Result<(Self::Sender, Self::Receiver)>;
- }
- pub trait Router {
- fn add_sender<S: Sender>(&mut self, sender: S) -> Result<()>;
- fn send(&mut self, msg: Msg) -> Result<()>;
- }
- trait TryClone<O> {
- fn try_clone(&self) -> Result<O>;
- }
- impl TryClone<PipeWriter> for PipeWriter {
- fn try_clone(&self) -> Result<PipeWriter> {
- Ok(PipeWriter::try_clone(self)?)
- }
- }
- pub struct WriteSender<W> {
- write: W,
- path: BlockPath,
- }
- impl<W: Write + Send> Sender for WriteSender<W> {
- fn send(&mut self, msg: Msg) -> Result<()> {
- Ok(write_to(&msg, &mut self.write)?)
- }
- fn path(&self) -> &BlockPath {
- &self.path
- }
- }
- impl<W: Write + TryClone<W>> TryClone<WriteSender<W>> for WriteSender<W> {
- fn try_clone(&self) -> Result<WriteSender<W>> {
- Ok(WriteSender {
- write: self.write.try_clone()?,
- path: self.path.clone(),
- })
- }
- }
- pub struct ReadReceiver<R> {
- read: R,
- path: BlockPath,
- }
- impl<R: Read> Receiver for ReadReceiver<R> {
- fn receive(&mut self) -> Result<Msg> {
- Ok(read_from(&mut self.read)?)
- }
- fn path(&self) -> &BlockPath {
- &self.path
- }
- }
- pub enum PipeChannel {}
- impl Channel for PipeChannel {
- type Sender = WriteSender<PipeWriter>;
- type Receiver = ReadReceiver<PipeReader>;
- fn new(path: BlockPath) -> Result<(Self::Sender, Self::Receiver)> {
- let (read, write) = os_pipe::pipe()?;
- let sender = WriteSender {
- path: path.clone(),
- write,
- };
- let receiver = ReadReceiver { path, read };
- Ok((sender, receiver))
- }
- }
- }
- #[cfg(test)]
- mod tests {
- use crate::test_helpers;
- use super::*;
- #[test]
- fn ping_pong_via_pipes() {
- let ping_path = test_helpers::make_path(vec!["ping"]);
- let (mut ping_sender, mut ping_receiver) =
- PipeChannel::new(ping_path.clone()).expect("failed to create a ping channel");
- let pong_path = test_helpers::make_path(vec!["pong"]);
- let (mut pong_sender, mut pong_receiver) =
- PipeChannel::new(pong_path.clone()).expect("failed to create a pong channel");
- let pong = std::thread::spawn(move || loop {
- let msg = pong_receiver.receive().expect("pong receive failed");
- assert_eq!(pong_receiver.path(), msg.to());
- match msg.body() {
- MsgBody::Ping => ping_sender
- .send_new(ping_path.clone(), MsgBody::Success)
- .expect("send to ping failed"),
- MsgBody::Success => return,
- _ => panic!("unexpected message received by pong"),
- }
- });
- let mut iter = 5;
- while iter > 0 {
- pong_sender
- .send_new(pong_path.clone(), MsgBody::Ping)
- .expect("send to pong failed");
- let msg = ping_receiver.receive().expect("ping receive failed");
- assert_eq!(ping_receiver.path(), msg.to());
- match msg.body() {
- MsgBody::Success => iter -= 1,
- _ => panic!("unexpected message received by ping"),
- }
- }
- pong_sender
- .send_new(pong_path.clone(), MsgBody::Success)
- .expect("send success to pong failed");
- pong.join().expect("join failed");
- }
- }
|