소스 검색

* Made the messaging traits generic.
* Introduced a fixed-sized BlockAddr type to use for addressing
messages.

Matthew Carr 2 년 전
부모
커밋
0a74ddd286
2개의 변경된 파일206개의 추가작업 그리고 241개의 파일을 삭제
  1. 8 0
      crates/btlib/src/lib.rs
  2. 198 241
      crates/btmsg/src/lib.rs

+ 8 - 0
crates/btlib/src/lib.rs

@@ -1265,6 +1265,14 @@ impl Principal {
     }
 }
 
+impl TryFrom<&str> for Principal {
+    type Error = Error;
+
+    fn try_from(value: &str) -> Result<Self> {
+        Ok(Principal(value.try_into()?))
+    }
+}
+
 impl Display for Principal {
     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
         self.0.fmt(f)

+ 198 - 241
crates/btmsg/src/lib.rs

@@ -1,5 +1,5 @@
 //! Code which enables sending messages between processes in the blocktree system.
-use btlib::{crypto::rand_array, error::BoxInIoErr, BlockPath, Result, Writecap};
+use btlib::{crypto::rand_array, error::BoxInIoErr, Principal, Result};
 use btserde::{read_from, write_to};
 use bytes::{BufMut, BytesMut};
 use core::{
@@ -14,7 +14,7 @@ use futures::{
 };
 use lazy_static::lazy_static;
 use serde::{Deserialize, Serialize};
-use std::{io, net::Shutdown, path::PathBuf};
+use std::{io, marker::PhantomData, net::Shutdown, path::PathBuf};
 use tokio::{
     io::{AsyncRead, AsyncWrite, ReadBuf},
     net::UnixDatagram,
@@ -38,164 +38,104 @@ mod private {
     }
 
     /// Appends the given Blocktree path to the path of the given directory.
-    fn socket_path(fs_path: &mut PathBuf, block_path: &BlockPath) {
-        fs_path.push(block_path.to_string());
+    fn socket_path(fs_path: &mut PathBuf, addr: &BlockAddr) {
+        fs_path.push(addr.inode.to_string());
     }
 
-    #[derive(PartialEq, Eq, Serialize, Deserialize)]
-    pub enum MsgError {
-        Unknown,
-    }
-
-    /// The owned version of a message body. This is the body type for received messages. It's
-    /// very important that this enum's variants match those of [MsgBodyRef], otherwise runtime
-    /// deserialization errors will occur.
-    #[derive(Deserialize)]
-    pub enum MsgBodyOwned {
-        Success,
-        Fail(MsgError),
-        Ping,
-        Hello(Writecap),
-        Read { offset: u64, size: u64 },
-        Write { offset: u64, buf: Vec<u8> },
-        Custom(Vec<u8>),
+    #[derive(Serialize, Deserialize, PartialEq, Eq, Default, Hash, Clone, Debug)]
+    pub struct BlockAddr {
+        pub root: Principal,
+        pub generation: u64,
+        pub inode: u64,
     }
 
-    /// The reference version of a message body. This is the body type when sending messages. It's
-    /// very important that this enum's variants match those of [MsgBodyOwned], otherwise runtime
-    /// deserialization errors will occur.
-    #[derive(Serialize)]
-    pub enum MsgBodyRef<'a> {
-        Success,
-        Fail(&'a MsgError),
-        Ping,
-        Hello(&'a Writecap),
-        Read { offset: u64, size: u64 },
-        Write { offset: u64, buf: &'a [u8] },
-        Custom(&'a [u8]),
+    impl BlockAddr {
+        pub fn new(root: Principal, generation: u64, inode: u64) -> Self {
+            Self {
+                root,
+                generation,
+                inode,
+            }
+        }
     }
 
-    /// Trait which unifies owned and borrowed messages.
-    pub trait Msg {
-        type Body;
-        fn to(&self) -> &BlockPath;
-        fn from(&self) -> &BlockPath;
-        fn id(&self) -> u128;
-        fn body(&self) -> &Self::Body;
+    /// Generates and returns a new message ID.
+    fn rand_msg_id() -> Result<u128> {
+        const LEN: usize = std::mem::size_of::<u128>();
+        let bytes = rand_array::<LEN>()?;
+        let option = u128::read_from(bytes.as_slice());
+        // Safety: because LEN == size_of::<u128>(), read_from should have returned Some.
+        Ok(option.unwrap())
     }
 
-    /// An owned message. This is the type which observed by the receiver.
-    #[derive(Deserialize)]
-    pub struct MsgOwned {
-        to: BlockPath,
-        from: BlockPath,
-        id: u128,
-        body: MsgBodyOwned,
+    #[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
+    pub struct Msg<T> {
+        pub to: BlockAddr,
+        pub from: BlockAddr,
+        pub id: u128,
+        pub body: T,
     }
 
-    impl Msg for MsgOwned {
-        type Body = MsgBodyOwned;
-
-        fn to(&self) -> &BlockPath {
-            &self.to
-        }
-
-        fn from(&self) -> &BlockPath {
-            &self.from
-        }
-
-        fn id(&self) -> u128 {
-            self.id
+    impl<T> Msg<T> {
+        pub fn new(to: BlockAddr, from: BlockAddr, id: u128, body: T) -> Self {
+            Self { to, from, id, body }
         }
 
-        fn body(&self) -> &Self::Body {
-            &self.body
+        pub fn with_rand_id(to: BlockAddr, from: BlockAddr, body: T) -> Result<Self> {
+            Ok(Self {
+                to,
+                from,
+                id: rand_msg_id()?,
+                body,
+            })
         }
     }
 
-    /// A borrowed message. This is the type which is produced by the sender.
-    #[derive(Serialize)]
-    pub struct MsgRef<'a> {
-        to: &'a BlockPath,
-        from: &'a BlockPath,
-        id: u128,
-        body: MsgBodyRef<'a>,
-    }
-
-    impl<'a> MsgRef<'a> {
-        pub fn new(to: &'a BlockPath, from: &'a BlockPath, id: u128, body: MsgBodyRef<'a>) -> Self {
-            Self { to, from, id, body }
-        }
+    #[derive(Serialize, Deserialize)]
+    enum VerMsg<T> {
+        V0(Msg<T>),
     }
 
-    impl<'a> Msg for MsgRef<'a> {
-        type Body = MsgBodyRef<'a>;
-
-        fn to(&self) -> &BlockPath {
-            self.to
-        }
-
-        fn from(&self) -> &BlockPath {
-            self.from
-        }
-
-        fn id(&self) -> u128 {
-            self.id
-        }
+    /// A type which can be used to send messages.
+    pub trait Sender {
+        type SendFut<'a, T>: 'a + Future<Output = Result<()>> + core::marker::Send
+        where
+            Self: 'a,
+            T: 'a + Serialize + core::marker::Send;
 
-        fn body(&self) -> &Self::Body {
-            &self.body
-        }
-    }
+        fn send<'a, T: 'a + Serialize + core::marker::Send>(
+            &'a mut self,
+            msg: Msg<T>,
+        ) -> Self::SendFut<'a, T>;
 
-    /// An owned message tagged with a version.
-    #[derive(Deserialize)]
-    enum VerMsgOwned {
-        V0(MsgOwned),
-    }
+        fn addr(&self) -> &BlockAddr;
 
-    /// A borrowed message tagged with a version.
-    #[derive(Serialize)]
-    enum VerMsgRef<'a> {
-        V0(MsgRef<'a>),
-    }
-
-    /// A type which can be used to send messages.
-    pub trait Sender<'a>: Sink<MsgRef<'a>, Error = btlib::Error> {
-        type SendFut: Future<Output = Result<()>> + std::marker::Send;
-
-        fn path(&self) -> &BlockPath;
-
-        /// Creates a new message with the given `from` and `body` fields and sends it to the peer
-        /// this [Sender] is associated with.
-        fn send_msg(&'a mut self, from: &'a BlockPath, body: MsgBodyRef<'a>) -> Self::SendFut;
-
-        /// Generates and returns a new message ID.
-        fn gen_id() -> Result<u128> {
-            const LEN: usize = std::mem::size_of::<u128>();
-            let bytes = rand_array::<LEN>()?;
-            let option = u128::read_from(bytes.as_slice());
-            // Safety: because LEN == size_of::<u128>(), read_from should have returned Some.
-            Ok(option.unwrap())
+        fn send_msg<'a, T: 'a + Serialize + core::marker::Send>(
+            &'a mut self,
+            to: BlockAddr,
+            body: T,
+        ) -> Self::SendFut<'a, T> {
+            let msg = Msg::with_rand_id(to, self.addr().clone(), body).unwrap();
+            self.send(msg)
         }
     }
 
     /// A type which can be used to receive messages.
-    pub trait Receiver: Stream<Item = Result<MsgOwned>> {
-        fn path(&self) -> &BlockPath;
+    pub trait Receiver<T>: Stream<Item = Result<Msg<T>>> {
+        fn addr(&self) -> &BlockAddr;
     }
 
     /// Encodes and decodes messages using [btserde].
-    struct MessageCodec;
+    struct MsgEncoder;
 
-    impl<'a> Encoder<MsgRef<'a>> for MessageCodec {
+    impl<T: Serialize> Encoder<Msg<T>> for MsgEncoder {
         type Error = btlib::Error;
 
-        fn encode(&mut self, item: MsgRef<'a>, dst: &mut BytesMut) -> Result<()> {
+        fn encode(&mut self, item: Msg<T>, dst: &mut BytesMut) -> Result<()> {
             const U64_LEN: usize = std::mem::size_of::<u64>();
             let payload = dst.split_off(U64_LEN);
             let mut writer = payload.writer();
-            write_to(&VerMsgRef::V0(item), &mut writer)?;
+            write_to(&VerMsg::V0(item), &mut writer)?;
             let payload = writer.into_inner();
             let payload_len = payload.len() as u64;
             let mut writer = dst.writer();
@@ -206,8 +146,10 @@ mod private {
         }
     }
 
-    impl Decoder for MessageCodec {
-        type Item = MsgOwned;
+    struct MsgDecoder<T>(PhantomData<T>);
+
+    impl<T: for<'de> Deserialize<'de>> Decoder for MsgDecoder<T> {
+        type Item = Msg<T>;
         type Error = btlib::Error;
 
         fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
@@ -226,7 +168,8 @@ mod private {
                 src.reserve(payload_len - slice.len());
                 return Ok(None);
             }
-            let VerMsgOwned::V0(msg) = read_from(&mut slice)?;
+            let VerMsg::V0(msg) = read_from(&mut slice)?;
+            // Consume all the bytes that have been read out of the buffer.
             let _ = src.split_to(std::mem::size_of::<u64>() + payload_len);
             Ok(Some(msg))
         }
@@ -293,117 +236,120 @@ mod private {
         }
     }
 
-    /// Returns a [Receiver] which can be used to receive messages addressed to the given path.
-    /// The `fs_path` argument specifies the filesystem directory under which the receiver's socket
-    /// will be stored.
-    pub fn local_receiver(fs_path: PathBuf, block_path: BlockPath) -> Result<impl Receiver> {
-        UnixReceiver::new(fs_path, block_path)
-    }
-
     /// An implementation of [Receiver] which uses a Unix datagram socket for receiving messages.
-    struct UnixReceiver {
-        path: BlockPath,
-        socket: FramedRead<DatagramAdapter, MessageCodec>,
+    struct UnixReceiver<T> {
+        addr: BlockAddr,
+        socket: FramedRead<DatagramAdapter, MsgDecoder<T>>,
     }
 
-    impl UnixReceiver {
-        fn new(mut fs_path: PathBuf, block_path: BlockPath) -> Result<Self> {
-            socket_path(&mut fs_path, &block_path);
-            std::fs::create_dir_all(fs_path.parent().unwrap())?;
+    impl<T: for<'de> Deserialize<'de>> UnixReceiver<T> {
+        fn new(mut fs_path: PathBuf, addr: BlockAddr) -> Result<Self> {
+            socket_path(&mut fs_path, &addr);
             let socket = DatagramAdapter::new(UnixDatagram::bind(fs_path)?);
-            let socket = FramedRead::new(socket, MessageCodec);
-            Ok(Self {
-                path: block_path,
-                socket,
-            })
+            let socket = FramedRead::new(socket, MsgDecoder(PhantomData));
+            Ok(Self { addr, socket })
         }
     }
 
-    impl Stream for UnixReceiver {
-        type Item = Result<MsgOwned>;
+    impl<T: for<'de> Deserialize<'de>> Stream for UnixReceiver<T> {
+        type Item = Result<Msg<T>>;
 
         fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
             self.socket.poll_next_unpin(cx)
         }
     }
 
-    impl Receiver for UnixReceiver {
-        fn path(&self) -> &BlockPath {
-            &self.path
+    impl<T: for<'de> Deserialize<'de>> Receiver<T> for UnixReceiver<T> {
+        fn addr(&self) -> &BlockAddr {
+            &self.addr
         }
     }
 
-    /// Returns a [Sender] which can be used to send messages to the given Blocktree path.
-    /// The `fs_path` argument specifies the filesystem directory in which to locate the
-    /// socket of the recipient.
-    pub fn local_sender(
+    /// Returns a [Receiver] which can be used to receive messages addressed to the given path.
+    /// The `fs_path` argument specifies the filesystem directory under which the receiver's socket
+    /// will be stored.
+    pub fn local_receiver<T: for<'de> Deserialize<'de>>(
         fs_path: PathBuf,
-        block_path: BlockPath,
-    ) -> Result<impl for<'a> Sender<'a>> {
-        UnixSender::new(fs_path, block_path)
+        addr: BlockAddr,
+    ) -> Result<impl Receiver<T>> {
+        UnixReceiver::new(fs_path, addr)
     }
 
     /// An implementation of [Sender] which uses a Unix datagram socket to send messages.
     struct UnixSender {
-        path: BlockPath,
-        socket: FramedWrite<DatagramAdapter, MessageCodec>,
+        addr: BlockAddr,
+        socket: FramedWrite<DatagramAdapter, MsgEncoder>,
     }
 
     impl UnixSender {
-        fn new(mut fs_path: PathBuf, block_path: BlockPath) -> Result<Self> {
+        fn new(mut fs_path: PathBuf, addr: BlockAddr) -> Result<Self> {
             let socket = UnixDatagram::unbound()?;
-            socket_path(&mut fs_path, &block_path);
+            socket_path(&mut fs_path, &addr);
             socket.connect(fs_path)?;
-            let socket = FramedWrite::new(DatagramAdapter::new(socket), MessageCodec);
-            Ok(Self {
-                path: block_path,
-                socket,
-            })
+            let socket = FramedWrite::new(DatagramAdapter::new(socket), MsgEncoder);
+            Ok(Self { addr, socket })
         }
     }
 
-    impl Sink<MsgRef<'_>> for UnixSender {
+    impl<T: Serialize> Sink<Msg<T>> for UnixSender {
         type Error = btlib::Error;
 
         fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
-            self.socket.poll_ready_unpin(cx)
+            <tokio_util::codec::FramedWrite<DatagramAdapter, MsgEncoder> as futures::SinkExt<
+                Msg<T>,
+            >>::poll_ready_unpin(&mut self.socket, cx)
         }
 
-        fn start_send(mut self: Pin<&mut Self>, item: MsgRef<'_>) -> Result<()> {
+        fn start_send(mut self: Pin<&mut Self>, item: Msg<T>) -> Result<()> {
             self.socket.start_send_unpin(item)
         }
 
         fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
-            self.socket.poll_flush_unpin(cx)
+            <tokio_util::codec::FramedWrite<DatagramAdapter, MsgEncoder> as futures::SinkExt<
+                Msg<T>,
+            >>::poll_flush_unpin(&mut self.socket, cx)
         }
 
         fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
-            self.socket.poll_close_unpin(cx)
+            <tokio_util::codec::FramedWrite<DatagramAdapter, MsgEncoder> as futures::SinkExt<
+                Msg<T>,
+            >>::poll_close_unpin(&mut self.socket, cx)
         }
     }
 
-    impl<'a> Sender<'a> for UnixSender {
-        type SendFut = Send<'a, FramedWrite<DatagramAdapter, MessageCodec>, MsgRef<'a>>;
-
-        fn path(&self) -> &BlockPath {
-            &self.path
+    impl Sender for UnixSender {
+        fn addr(&self) -> &BlockAddr {
+            &self.addr
         }
 
-        fn send_msg(&'a mut self, from: &'a BlockPath, body: MsgBodyRef<'a>) -> Self::SendFut {
-            let id = Self::gen_id().unwrap();
-            let msg = MsgRef::new(&self.path, from, id, body);
+        type SendFut<'a, T>
+            = Send<'a, FramedWrite<DatagramAdapter, MsgEncoder>, Msg<T>>
+                where T: 'a + Serialize + core::marker::Send;
+
+        fn send<'a, T: 'a + Serialize + core::marker::Send>(
+            &'a mut self,
+            msg: Msg<T>,
+        ) -> Self::SendFut<'a, T> {
             self.socket.send(msg)
         }
     }
 
-    /// Returns a connected [Sender] and [Receiver].
-    pub fn local_pair(
-        dir: PathBuf,
-        block_path: BlockPath,
-    ) -> Result<(impl for<'a> Sender<'a>, impl Receiver)> {
-        let receiver = local_receiver(dir.clone(), block_path.clone())?;
-        let sender = local_sender(dir, block_path)?;
-        Ok((sender, receiver))
+    /// Returns a [Sender] which can be used to send messages to the given Blocktree path.
+    /// The `fs_path` argument specifies the filesystem directory in which to locate the
+    /// socket of the recipient.
+    pub fn local_sender(fs_path: PathBuf, addr: BlockAddr) -> Result<impl Sender> {
+        UnixSender::new(fs_path, addr)
+    }
+
+    /// This is an identify function which allows you to specify a type parameter for the output
+    /// of a future.
+    /// This was needed to work around a failure in type inference for types with higher-rank
+    /// lifetimes. Once this issue is resolved this can be removed:
+    /// https://github.com/rust-lang/rust/issues/102211
+    pub fn assert_send<'a, T>(
+        fut: impl 'a + Future<Output = T> + core::marker::Send,
+    ) -> impl 'a + Future<Output = T> + core::marker::Send {
+        fut
     }
 }
 
@@ -414,16 +360,35 @@ mod tests {
     use tempdir::TempDir;
 
     lazy_static! {
-        static ref BT_ROOT: BlockPath =
-            BlockPath::try_from("0!dSip4J0kurN5VhVo_aTipM-ywOOWrqJuRRVQ7aa-bew").unwrap();
+        static ref ROOT_PRINCIPAL: Principal =
+            Principal::try_from("0!dSip4J0kurN5VhVo_aTipM-ywOOWrqJuRRVQ7aa-bew").unwrap();
     }
 
-    fn block_path<'a, I: Iterator<Item = &'a str>>(components: I) -> BlockPath {
-        let mut path = BT_ROOT.clone();
-        for component in components {
-            path.push_component(component.to_string());
-        }
-        path
+    fn block_addr(generation: u64, inode: u64) -> BlockAddr {
+        BlockAddr::new(ROOT_PRINCIPAL.clone(), generation, inode)
+    }
+
+    #[derive(Serialize, Deserialize)]
+    enum MsgError {
+        Unknown,
+    }
+
+    #[derive(Deserialize)]
+    enum BodyOwned {
+        Ping,
+        Success,
+        Fail(MsgError),
+        Read { offset: u64, size: u64 },
+        Write { offset: u64, buf: Vec<u8> },
+    }
+
+    #[derive(Serialize)]
+    enum BodyRef<'a> {
+        Ping,
+        Success,
+        Fail(MsgError),
+        Read { offset: u64, size: u64 },
+        Write { offset: u64, buf: &'a [u8] },
     }
 
     struct TestCase {
@@ -437,61 +402,54 @@ mod tests {
             }
         }
 
-        fn endpoint(&self, name: &str) -> (BlockPath, impl for<'a> Sender<'a>, impl Receiver) {
-            let block_path = block_path(["apps", name].into_iter());
-            let (sender, receiver) =
-                local_pair(self.dir.path().to_owned(), block_path.clone()).unwrap();
-            (block_path, sender, receiver)
+        fn endpoint(&self, inode: u64) -> (BlockAddr, impl Sender, impl Receiver<BodyOwned>) {
+            let addr = block_addr(1, inode);
+            let fs_path = self.dir.path().to_owned();
+            let receiver = local_receiver(fs_path.clone(), addr.clone()).unwrap();
+            let sender = local_sender(fs_path, addr.clone()).unwrap();
+            (addr, sender, receiver)
         }
     }
 
     #[tokio::test]
     async fn message_received_is_message_sent() {
         let case = TestCase::new();
-        let (block_path, mut sender, mut receiver) = case.endpoint("social");
+        let (addr, mut sender, mut receiver) = case.endpoint(1);
 
-        sender
-            .send_msg(&block_path, MsgBodyRef::Ping)
-            .await
-            .unwrap();
+        sender.send_msg(addr.clone(), BodyRef::Ping).await.unwrap();
         let actual = receiver.next().await.unwrap().unwrap();
 
-        let matched = if let MsgBodyOwned::Ping = actual.body() {
+        let matched = if let BodyOwned::Ping = actual.body {
             true
         } else {
             false
         };
         assert!(matched);
-        assert_eq!(&block_path, actual.to());
-        assert_eq!(&block_path, actual.from());
+        assert_eq!(&addr, &actual.to);
+        assert_eq!(&addr, &actual.from);
     }
 
     #[tokio::test]
     async fn ping_pong() {
         let case = TestCase::new();
-        let (block_path_one, mut sender_one, mut receiver_one) = case.endpoint("one");
-        let (block_path_two, mut sender_two, mut receiver_two) = case.endpoint("two");
+        let (addr_one, mut sender_one, mut receiver_one) = case.endpoint(1);
+        let (addr_two, mut sender_two, mut receiver_two) = case.endpoint(2);
 
         let handle = tokio::spawn(async move {
             let msg = receiver_one.next().await.unwrap().unwrap();
-            let reply_body = if let MsgBodyOwned::Ping = msg.body() {
-                MsgBodyRef::Success
+            let reply_body = if let BodyOwned::Ping = msg.body {
+                BodyRef::Success
             } else {
-                MsgBodyRef::Fail(&MsgError::Unknown)
+                BodyRef::Fail(MsgError::Unknown)
             };
-            sender_two
-                .send_msg(&block_path_one, reply_body)
-                .await
-                .unwrap();
+            let fut = assert_send::<'_, Result<()>>(sender_two.send_msg(addr_one, reply_body));
+            fut.await.unwrap();
         });
 
-        sender_one
-            .send_msg(&block_path_two, MsgBodyRef::Ping)
-            .await
-            .unwrap();
+        sender_one.send_msg(addr_two, BodyRef::Ping).await.unwrap();
         handle.await.unwrap();
         let reply = receiver_two.next().await.unwrap().unwrap();
-        let matched = if let MsgBodyOwned::Success = reply.body() {
+        let matched = if let BodyOwned::Success = reply.body {
             true
         } else {
             false
@@ -502,40 +460,39 @@ mod tests {
     #[tokio::test]
     async fn read_write() {
         let case = TestCase::new();
-        let (block_path_one, mut sender_one, mut receiver_one) = case.endpoint("one");
-        let (block_path_two, mut sender_two, mut receiver_two) = case.endpoint("two");
+        let (addr_one, mut sender_one, mut receiver_one) = case.endpoint(1);
+        let (addr_two, mut sender_two, mut receiver_two) = case.endpoint(2);
 
         let handle = tokio::spawn(async move {
             let data: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
             let msg = receiver_one.next().await.unwrap().unwrap();
-            let reply_body = if let MsgBodyOwned::Read { offset, size } = msg.body() {
-                let offset: usize = (*offset).try_into().unwrap();
-                let size: usize = (*size).try_into().unwrap();
+            let reply_body = if let BodyOwned::Read { offset, size } = msg.body {
+                let offset: usize = offset.try_into().unwrap();
+                let size: usize = size.try_into().unwrap();
                 let end: usize = offset + size;
-                MsgBodyRef::Write {
+                BodyRef::Write {
                     offset: offset as u64,
                     buf: &data[offset..end],
                 }
             } else {
-                MsgBodyRef::Fail(&MsgError::Unknown)
+                BodyRef::Fail(MsgError::Unknown)
             };
-            sender_two
-                .send_msg(&block_path_one, reply_body)
-                .await
-                .unwrap();
+            let msg = Msg::new(msg.from, addr_one, msg.id, reply_body);
+            let fut = assert_send::<'_, Result<()>>(sender_two.send(msg));
+            fut.await.unwrap();
         });
 
         sender_one
-            .send_msg(&block_path_two, MsgBodyRef::Read { offset: 2, size: 2 })
+            .send_msg(addr_two, BodyRef::Read { offset: 2, size: 2 })
             .await
             .unwrap();
         handle.await.unwrap();
         let reply = receiver_two.next().await.unwrap().unwrap();
-        if let MsgBodyOwned::Write { offset, buf } = reply.body() {
-            assert_eq!(2, *offset);
+        if let BodyOwned::Write { offset, buf } = reply.body {
+            assert_eq!(2, offset);
             assert_eq!([2, 3].as_slice(), buf.as_slice());
         } else {
-            panic!("replay was not the right type");
+            panic!("reply was not the right type");
         };
     }
 }