//! Code which enables sending messages between processes in the blocktree system. pub use private::*; mod private { use crate::{crypto::rand_array, BlockPath, Result, Writecap}; use btserde::{read_from, write_to}; use os_pipe::{PipeReader, PipeWriter}; use serde::{Deserialize, Serialize}; use std::io::{Read, Write}; use zerocopy::FromBytes; #[derive(PartialEq, Eq, Serialize, Deserialize)] pub enum MsgError {} #[derive(Serialize, Deserialize)] pub enum MsgBody { Success, Fail(MsgError), Ping, Hello(Writecap), Read { offset: u64 }, Write { offset: u64, data: Vec }, Custom(Vec), } #[derive(Serialize, Deserialize)] pub struct Msg { from: BlockPath, to: BlockPath, id: u128, body: MsgBody, } impl Msg { pub fn from(&self) -> &BlockPath { &self.from } pub fn to(&self) -> &BlockPath { &self.to } pub fn id(&self) -> u128 { self.id } pub fn body(&self) -> &MsgBody { &self.body } } #[derive(Serialize, Deserialize)] pub enum VerMsg { V0(Msg), } pub trait Sender: Send { fn send(&mut self, msg: Msg) -> Result<()>; fn path(&self) -> &BlockPath; /// Generates and returns a new message ID. fn gen_id(&mut self) -> Result { const LEN: usize = std::mem::size_of::(); let bytes = rand_array::()?; let option = u128::read_from(bytes.as_slice()); // Safety: because LEN == size_of::(), read_from should have returned Some. Ok(option.unwrap()) } /// This is a convenience method which creates a new message, addresses it to the given /// path, sets it as from this sender's path, generates a new ID for it, puts the given /// body inside of it, and dispatches it with the `send` method. fn send_new(&mut self, to: BlockPath, body: MsgBody) -> Result<()> { let id = self.gen_id()?; let msg = Msg { to, from: self.path().to_owned(), id, body, }; self.send(msg) } } pub trait Receiver { fn receive(&mut self) -> Result; fn path(&self) -> &BlockPath; } pub trait Channel { type Sender: Sender; type Receiver: Receiver; fn new(path: BlockPath) -> Result<(Self::Sender, Self::Receiver)>; } pub trait Router { fn add_sender(&mut self, sender: S) -> Result<()>; fn send(&mut self, msg: Msg) -> Result<()>; } trait TryClone { fn try_clone(&self) -> Result; } impl TryClone for PipeWriter { fn try_clone(&self) -> Result { Ok(PipeWriter::try_clone(self)?) } } pub struct WriteSender { write: W, path: BlockPath, } impl Sender for WriteSender { fn send(&mut self, msg: Msg) -> Result<()> { Ok(write_to(&msg, &mut self.write)?) } fn path(&self) -> &BlockPath { &self.path } } impl> TryClone> for WriteSender { fn try_clone(&self) -> Result> { Ok(WriteSender { write: self.write.try_clone()?, path: self.path.clone(), }) } } pub struct ReadReceiver { read: R, path: BlockPath, } impl Receiver for ReadReceiver { fn receive(&mut self) -> Result { Ok(read_from(&mut self.read)?) } fn path(&self) -> &BlockPath { &self.path } } pub enum PipeChannel {} impl Channel for PipeChannel { type Sender = WriteSender; type Receiver = ReadReceiver; fn new(path: BlockPath) -> Result<(Self::Sender, Self::Receiver)> { let (read, write) = os_pipe::pipe()?; let sender = WriteSender { path: path.clone(), write, }; let receiver = ReadReceiver { path, read }; Ok((sender, receiver)) } } } #[cfg(test)] mod tests { use crate::test_helpers; use super::*; #[test] fn ping_pong_via_pipes() { let ping_path = test_helpers::make_path(vec!["ping"]); let (mut ping_sender, mut ping_receiver) = PipeChannel::new(ping_path.clone()).expect("failed to create a ping channel"); let pong_path = test_helpers::make_path(vec!["pong"]); let (mut pong_sender, mut pong_receiver) = PipeChannel::new(pong_path.clone()).expect("failed to create a pong channel"); let pong = std::thread::spawn(move || loop { let msg = pong_receiver.receive().expect("pong receive failed"); assert_eq!(pong_receiver.path(), msg.to()); match msg.body() { MsgBody::Ping => ping_sender .send_new(ping_path.clone(), MsgBody::Success) .expect("send to ping failed"), MsgBody::Success => return, _ => panic!("unexpected message received by pong"), } }); let mut iter = 5; while iter > 0 { pong_sender .send_new(pong_path.clone(), MsgBody::Ping) .expect("send to pong failed"); let msg = ping_receiver.receive().expect("ping receive failed"); assert_eq!(ping_receiver.path(), msg.to()); match msg.body() { MsgBody::Success => iter -= 1, _ => panic!("unexpected message received by ping"), } } pong_sender .send_new(pong_path.clone(), MsgBody::Success) .expect("send success to pong failed"); pong.join().expect("join failed"); } }