@@ -0,0 +1,212 @@
+//! Code which enables sending messages between processes in the blocktree system.
+pub use private::*;
+mod private {
+ use crate::{crypto::rand_array, BlockPath, Result, Writecap};
+ use btserde::{read_from, write_to};
+ use os_pipe::{PipeReader, PipeWriter};
+ use serde::{Deserialize, Serialize};
+ use std::io::{Read, Write};
+ use zerocopy::FromBytes;
+ #[derive(PartialEq, Eq, Serialize, Deserialize)]
+ pub enum MsgError {}
+ #[derive(Serialize, Deserialize)]
+ pub enum MsgBody {
+ Success,
+ Fail(MsgError),
+ Ping,
+ Hello(Writecap),
+ Read { offset: u64 },
+ Write { offset: u64, data: Vec<u8> },
+ Custom(Vec<u8>),
+ }
+ #[derive(Serialize, Deserialize)]
+ pub struct Msg {
+ from: BlockPath,
+ to: BlockPath,
+ id: u128,
+ body: MsgBody,
+ }
+ impl Msg {
+ pub fn from(&self) -> &BlockPath {
+ &self.from
+ }
+ pub fn to(&self) -> &BlockPath {
+ &self.to
+ }
+ pub fn id(&self) -> u128 {
+ self.id
+ }
+ pub fn body(&self) -> &MsgBody {
+ &self.body
+ }
+ }
+ #[derive(Serialize, Deserialize)]
+ pub enum VerMsg {
+ V0(Msg),
+ }
+ pub trait Sender: Send {
+ fn send(&mut self, msg: Msg) -> Result<()>;
+ fn path(&self) -> &BlockPath;
+ /// Generates and returns a new message ID.
+ fn gen_id(&mut self) -> Result<u128> {
+ const LEN: usize = std::mem::size_of::<u128>();
+ let bytes = rand_array::<LEN>()?;
+ let option = u128::read_from(bytes.as_slice());
+ // Safety: because LEN == size_of::<u128>(), read_from should have returned Some.
+ Ok(option.unwrap())
+ }
+ /// This is a convenience method which creates a new message, addresses it to the given
+ /// path, sets it as from this sender's path, generates a new ID for it, puts the given
+ /// body inside of it, and dispatches it with the `send` method.
+ fn send_new(&mut self, to: BlockPath, body: MsgBody) -> Result<()> {
+ let id = self.gen_id()?;
+ let msg = Msg {
+ to,
+ from: self.path().to_owned(),
+ id,
+ body,
+ };
+ self.send(msg)
+ }
+ }
+ pub trait Receiver {
+ fn receive(&mut self) -> Result<Msg>;
+ fn path(&self) -> &BlockPath;
+ }
+ pub trait Channel {
+ type Sender: Sender;
+ type Receiver: Receiver;
+ fn new(path: BlockPath) -> Result<(Self::Sender, Self::Receiver)>;
+ }
+ pub trait Router {
+ fn add_sender<S: Sender>(&mut self, sender: S) -> Result<()>;
+ fn send(&mut self, msg: Msg) -> Result<()>;
+ }
+ trait TryClone<O> {
+ fn try_clone(&self) -> Result<O>;
+ }
+ impl TryClone<PipeWriter> for PipeWriter {
+ fn try_clone(&self) -> Result<PipeWriter> {
+ Ok(PipeWriter::try_clone(&self)?)
+ }
+ }
+ pub struct WriteSender<W> {
+ write: W,
+ path: BlockPath,
+ }
+ impl<W: Write + Send> Sender for WriteSender<W> {
+ fn send(&mut self, msg: Msg) -> Result<()> {
+ Ok(write_to(&msg, &mut self.write)?)
+ }
+ fn path(&self) -> &BlockPath {
+ &self.path
+ }
+ }
+ impl<W: Write + TryClone<W>> TryClone<WriteSender<W>> for WriteSender<W> {
+ fn try_clone(&self) -> Result<WriteSender<W>> {
+ Ok(WriteSender {
+ write: self.write.try_clone()?,
+ path: self.path.clone(),
+ })
+ }
+ }
+ pub struct ReadReceiver<R> {
+ read: R,
+ path: BlockPath,
+ }
+ impl<R: Read> Receiver for ReadReceiver<R> {
+ fn receive(&mut self) -> Result<Msg> {
+ Ok(read_from(&mut self.read)?)
+ }
+ fn path(&self) -> &BlockPath {
+ &self.path
+ }
+ }
+ pub enum PipeChannel {}
+ impl Channel for PipeChannel {
+ type Sender = WriteSender<PipeWriter>;
+ type Receiver = ReadReceiver<PipeReader>;
+ fn new(path: BlockPath) -> Result<(Self::Sender, Self::Receiver)> {
+ let (read, write) = os_pipe::pipe()?;
+ let sender = WriteSender {
+ path: path.clone(),
+ write,
+ };
+ let receiver = ReadReceiver { path, read };
+ Ok((sender, receiver))
+ }
+ }
+mod tests {
+ use crate::test_helpers;
+ use super::*;
+ #[test]
+ fn ping_pong_via_pipes() {
+ let ping_path = test_helpers::make_path(vec!["ping"]);
+ let (mut ping_sender, mut ping_receiver) =
+ PipeChannel::new(ping_path.clone()).expect("failed to create a ping channel");
+ let pong_path = test_helpers::make_path(vec!["pong"]);
+ let (mut pong_sender, mut pong_receiver) =
+ PipeChannel::new(pong_path.clone()).expect("failed to create a pong channel");
+ let pong = std::thread::spawn(move || loop {
+ let msg = pong_receiver.receive().expect("pong receive failed");
+ assert_eq!(pong_receiver.path(), msg.to());
+ match msg.body() {
+ MsgBody::Ping => ping_sender
+ .send_new(ping_path.clone(), MsgBody::Success)
+ .expect("send to ping failed"),
+ MsgBody::Success => return,
+ _ => panic!("unexpected message received by pong"),
+ }
+ });
+ let mut iter = 5;
+ while iter > 0 {
+ pong_sender
+ .send_new(pong_path.clone(), MsgBody::Ping)
+ .expect("send to pong failed");
+ let msg = ping_receiver.receive().expect("ping receive failed");
+ assert_eq!(ping_receiver.path(), msg.to());
+ match msg.body() {
+ MsgBody::Success => iter -= 1,
+ _ => panic!("unexpected message received by ping"),
+ }
+ }
+ pong_sender
+ .send_new(pong_path.clone(), MsgBody::Success)
+ .expect("send success to pong failed");
+ pong.join().expect("join failed");
+ }