Browse Source

Rewrote `btmsg` to use tokio and be fully asynchronous.

Matthew Carr 2 years ago
parent
commit
84f5fb59c8
6 changed files with 618 additions and 166 deletions
  1. 215 9
      Cargo.lock
  2. 1 1
      crates/btlib/src/crypto/mod.rs
  3. 8 0
      crates/btmsg/Cargo.toml
  4. 380 142
      crates/btmsg/src/lib.rs
  5. 1 1
      crates/btnode/src/main.rs
  6. 13 13
      crates/btserde/src/ser.rs

+ 215 - 9
Cargo.lock

@@ -236,7 +236,13 @@ version = "0.1.0"
 dependencies = [
  "btlib",
  "btserde",
+ "bytes",
+ "futures",
+ "lazy_static",
  "serde",
+ "tempdir",
+ "tokio",
+ "tokio-util",
  "zerocopy",
 ]
 
@@ -271,6 +277,12 @@ version = "1.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
 
+[[package]]
+name = "bytes"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
+
 [[package]]
 name = "caps"
 version = "0.5.4"
@@ -641,6 +653,95 @@ dependencies = [
  "vmm-sys-util",
 ]
 
+[[package]]
+name = "futures"
+version = "0.3.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-channel"
+version = "0.3.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed"
+dependencies = [
+ "futures-core",
+ "futures-sink",
+]
+
+[[package]]
+name = "futures-core"
+version = "0.3.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac"
+
+[[package]]
+name = "futures-executor"
+version = "0.3.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "futures-sink"
+version = "0.3.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9"
+
+[[package]]
+name = "futures-task"
+version = "0.3.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea"
+
+[[package]]
+name = "futures-util"
+version = "0.3.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-io",
+ "futures-macro",
+ "futures-sink",
+ "futures-task",
+ "memchr",
+ "pin-project-lite",
+ "pin-utils",
+ "slab",
+]
+
 [[package]]
 name = "gimli"
 version = "0.26.2"
@@ -885,7 +986,7 @@ dependencies = [
  "libc",
  "log",
  "wasi 0.11.0+wasi-snapshot-preview1",
- "windows-sys",
+ "windows-sys 0.36.1",
 ]
 
 [[package]]
@@ -1578,16 +1679,30 @@ dependencies = [
 
 [[package]]
 name = "tokio"
-version = "1.21.2"
+version = "1.23.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099"
+checksum = "eab6d665857cc6ca78d6e80303a02cea7a7851e85dfbd77cbdc09bd129f1ef46"
 dependencies = [
  "autocfg",
+ "bytes",
  "libc",
+ "memchr",
  "mio",
  "pin-project-lite",
  "socket2",
- "winapi",
+ "tokio-macros",
+ "windows-sys 0.42.0",
+]
+
+[[package]]
+name = "tokio-macros"
+version = "1.8.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
 ]
 
 [[package]]
@@ -1604,6 +1719,40 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "tokio-util"
+version = "0.7.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
+dependencies = [
+ "bytes",
+ "futures-core",
+ "futures-sink",
+ "pin-project-lite",
+ "tokio",
+ "tracing",
+]
+
+[[package]]
+name = "tracing"
+version = "0.1.37"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
+dependencies = [
+ "cfg-if",
+ "pin-project-lite",
+ "tracing-core",
+]
+
+[[package]]
+name = "tracing-core"
+version = "0.1.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a"
+dependencies = [
+ "once_cell",
+]
+
 [[package]]
 name = "tss-esapi"
 version = "7.1.0"
@@ -1828,43 +1977,100 @@ version = "0.36.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
 dependencies = [
- "windows_aarch64_msvc",
- "windows_i686_gnu",
- "windows_i686_msvc",
- "windows_x86_64_gnu",
- "windows_x86_64_msvc",
+ "windows_aarch64_msvc 0.36.1",
+ "windows_i686_gnu 0.36.1",
+ "windows_i686_msvc 0.36.1",
+ "windows_x86_64_gnu 0.36.1",
+ "windows_x86_64_msvc 0.36.1",
+]
+
+[[package]]
+name = "windows-sys"
+version = "0.42.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7"
+dependencies = [
+ "windows_aarch64_gnullvm",
+ "windows_aarch64_msvc 0.42.0",
+ "windows_i686_gnu 0.42.0",
+ "windows_i686_msvc 0.42.0",
+ "windows_x86_64_gnu 0.42.0",
+ "windows_x86_64_gnullvm",
+ "windows_x86_64_msvc 0.42.0",
 ]
 
+[[package]]
+name = "windows_aarch64_gnullvm"
+version = "0.42.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "41d2aa71f6f0cbe00ae5167d90ef3cfe66527d6f613ca78ac8024c3ccab9a19e"
+
 [[package]]
 name = "windows_aarch64_msvc"
 version = "0.36.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
 
+[[package]]
+name = "windows_aarch64_msvc"
+version = "0.42.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dd0f252f5a35cac83d6311b2e795981f5ee6e67eb1f9a7f64eb4500fbc4dcdb4"
+
 [[package]]
 name = "windows_i686_gnu"
 version = "0.36.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
 
+[[package]]
+name = "windows_i686_gnu"
+version = "0.42.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fbeae19f6716841636c28d695375df17562ca208b2b7d0dc47635a50ae6c5de7"
+
 [[package]]
 name = "windows_i686_msvc"
 version = "0.36.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
 
+[[package]]
+name = "windows_i686_msvc"
+version = "0.42.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "84c12f65daa39dd2babe6e442988fc329d6243fdce47d7d2d155b8d874862246"
+
 [[package]]
 name = "windows_x86_64_gnu"
 version = "0.36.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
 
+[[package]]
+name = "windows_x86_64_gnu"
+version = "0.42.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bf7b1b21b5362cbc318f686150e5bcea75ecedc74dd157d874d754a2ca44b0ed"
+
+[[package]]
+name = "windows_x86_64_gnullvm"
+version = "0.42.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "09d525d2ba30eeb3297665bd434a54297e4170c7f1a44cad4ef58095b4cd2028"
+
 [[package]]
 name = "windows_x86_64_msvc"
 version = "0.36.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
 
+[[package]]
+name = "windows_x86_64_msvc"
+version = "0.42.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f40009d85759725a34da6d89a94e63d7bdc50a862acf0dbc7c8e488f1edcb6f5"
+
 [[package]]
 name = "zerocopy"
 version = "0.6.1"

+ 1 - 1
crates/btlib/src/crypto/mod.rs

@@ -1772,7 +1772,7 @@ pub trait Signer {
     }
 
     fn ser_sign_into<T: Serialize>(&self, value: &T, buf: &mut Vec<u8>) -> Result<Signature> {
-        write_to(value, buf)?;
+        write_to(value, &mut *buf)?;
         self.sign(std::iter::once(buf.as_slice()))
     }
 }

+ 8 - 0
crates/btmsg/Cargo.toml

@@ -9,4 +9,12 @@ edition = "2021"
 btlib = { path = "../btlib" }
 btserde = { path = "../btserde" }
 serde = { version = "^1.0.136", features = ["derive"] }
+bytes = "1.3.0"
+futures = "0.3.25"
+tokio = { version = "1.23.0", features = ["net", "io-util", "macros"] }
+tokio-util = { version = "0.7.4", features = ["codec"] }
 zerocopy = "0.6.1"
+lazy_static = { version = "1.4.0" }
+
+[dev-dependencies]
+tempdir = { version = "0.3.7" }

+ 380 - 142
crates/btmsg/src/lib.rs

@@ -1,72 +1,94 @@
 //! Code which enables sending messages between processes in the blocktree system.
-use btlib::{
-    Result,
-    crypto::rand_array,
-    BlockPath, Writecap,
+use btlib::{crypto::rand_array, error::BoxInIoErr, BlockPath, Result, Writecap};
+use btserde::{read_from, write_to};
+use bytes::{BufMut, BytesMut};
+use core::{
+    future::Future,
+    pin::Pin,
+    task::{Context, Poll},
 };
-use std::{
-    path::PathBuf,
-    io::{Read, Write},
-    os::unix::net::{UnixStream, UnixListener}, 
+use futures::{
+    sink::{Send, Sink},
+    stream::Stream,
+    SinkExt, StreamExt,
 };
-use btserde::{read_from, write_to};
+use lazy_static::lazy_static;
 use serde::{Deserialize, Serialize};
+use std::{io, net::Shutdown, path::PathBuf};
+use tokio::{
+    io::{AsyncRead, AsyncWrite, ReadBuf},
+    net::UnixDatagram,
+};
+use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
 use zerocopy::FromBytes;
 
 pub use private::*;
 
 mod private {
-    use std::io;
 
     use super::*;
 
-    /// The directory where sockets are created.
-    fn socket_dir() -> PathBuf {
-        let home_dir: PathBuf = std::env::var("HOME").unwrap().into();
-        home_dir.join(".btmsg")
+    lazy_static! {
+        /// The default directory in which to place blocktree sockets.
+        static ref SOCK_DIR: PathBuf = {
+            let mut path: PathBuf = std::env::var("HOME").unwrap().into();
+            path.push(".btmsg");
+            path
+        };
     }
 
-    pub fn socket_path(block_path: &BlockPath) -> PathBuf {
-        socket_dir().join(block_path.to_string())
+    /// Appends the given Blocktree path to the path of the given directory.
+    fn socket_path(fs_path: &mut PathBuf, block_path: &BlockPath) {
+        fs_path.push(block_path.to_string());
     }
 
     #[derive(PartialEq, Eq, Serialize, Deserialize)]
-    pub enum MsgError {}
+    pub enum MsgError {
+        Unknown,
+    }
 
+    /// The owned version of a message body. This is the body type for received messages. It's
+    /// very important that this enum's variants match those of [MsgBodyRef], otherwise runtime
+    /// deserialization errors will occur.
     #[derive(Deserialize)]
     pub enum MsgBodyOwned {
         Success,
         Fail(MsgError),
         Ping,
         Hello(Writecap),
-        Read { offset: u64, size: u64, },
+        Read { offset: u64, size: u64 },
         Write { offset: u64, buf: Vec<u8> },
         Custom(Vec<u8>),
     }
 
+    /// The reference version of a message body. This is the body type when sending messages. It's
+    /// very important that this enum's variants match those of [MsgBodyOwned], otherwise runtime
+    /// deserialization errors will occur.
     #[derive(Serialize)]
     pub enum MsgBodyRef<'a> {
         Success,
         Fail(&'a MsgError),
         Ping,
         Hello(&'a Writecap),
-        Read { offset: u64, buf: &'a [u8] },
-        Write { offset: u64, buf: &'a mut [u8] },
+        Read { offset: u64, size: u64 },
+        Write { offset: u64, buf: &'a [u8] },
         Custom(&'a [u8]),
     }
 
+    /// Trait which unifies owned and borrowed messages.
     pub trait Msg {
         type Body;
-        fn from(&self) -> &BlockPath;
         fn to(&self) -> &BlockPath;
+        fn from(&self) -> &BlockPath;
         fn id(&self) -> u128;
         fn body(&self) -> &Self::Body;
     }
 
+    /// An owned message. This is the type which observed by the receiver.
     #[derive(Deserialize)]
     pub struct MsgOwned {
-        from: BlockPath,
         to: BlockPath,
+        from: BlockPath,
         id: u128,
         body: MsgBodyOwned,
     }
@@ -74,14 +96,14 @@ mod private {
     impl Msg for MsgOwned {
         type Body = MsgBodyOwned;
 
-        fn from(&self) -> &BlockPath {
-            &self.from
-        }
-
         fn to(&self) -> &BlockPath {
             &self.to
         }
 
+        fn from(&self) -> &BlockPath {
+            &self.from
+        }
+
         fn id(&self) -> u128 {
             self.id
         }
@@ -91,23 +113,30 @@ mod private {
         }
     }
 
+    /// A borrowed message. This is the type which is produced by the sender.
     #[derive(Serialize)]
     pub struct MsgRef<'a> {
-        from: &'a BlockPath,
         to: &'a BlockPath,
+        from: &'a BlockPath,
         id: u128,
         body: MsgBodyRef<'a>,
     }
 
+    impl<'a> MsgRef<'a> {
+        pub fn new(to: &'a BlockPath, from: &'a BlockPath, id: u128, body: MsgBodyRef<'a>) -> Self {
+            Self { to, from, id, body }
+        }
+    }
+
     impl<'a> Msg for MsgRef<'a> {
         type Body = MsgBodyRef<'a>;
 
         fn to(&self) -> &BlockPath {
-            &self.to
+            self.to
         }
 
         fn from(&self) -> &BlockPath {
-            &self.from
+            self.from
         }
 
         fn id(&self) -> u128 {
@@ -119,185 +148,394 @@ mod private {
         }
     }
 
+    /// An owned message tagged with a version.
     #[derive(Deserialize)]
-    pub enum VerMsg {
+    enum VerMsgOwned {
         V0(MsgOwned),
     }
 
+    /// A borrowed message tagged with a version.
     #[derive(Serialize)]
-    pub enum VerMsgRef<'a> {
+    enum VerMsgRef<'a> {
         V0(MsgRef<'a>),
     }
 
-    pub trait Sender: Send {
-        fn send<'a>(&mut self, msg: MsgRef<'a>) -> Result<()>;
+    /// A type which can be used to send messages.
+    pub trait Sender<'a>: Sink<MsgRef<'a>, Error = btlib::Error> {
+        type SendFut: Future<Output = Result<()>> + std::marker::Send;
+
         fn path(&self) -> &BlockPath;
 
+        /// Creates a new message with the given `from` and `body` fields and sends it to the peer
+        /// this [Sender] is associated with.
+        fn send_msg(&'a mut self, from: &'a BlockPath, body: MsgBodyRef<'a>) -> Self::SendFut;
+
         /// Generates and returns a new message ID.
-        fn gen_id(&mut self) -> Result<u128> {
+        fn gen_id() -> Result<u128> {
             const LEN: usize = std::mem::size_of::<u128>();
             let bytes = rand_array::<LEN>()?;
             let option = u128::read_from(bytes.as_slice());
             // Safety: because LEN == size_of::<u128>(), read_from should have returned Some.
             Ok(option.unwrap())
         }
+    }
+
+    /// A type which can be used to receive messages.
+    pub trait Receiver: Stream<Item = Result<MsgOwned>> {
+        fn path(&self) -> &BlockPath;
+    }
+
+    /// Encodes and decodes messages using [btserde].
+    struct MessageCodec;
+
+    impl<'a> Encoder<MsgRef<'a>> for MessageCodec {
+        type Error = btlib::Error;
+
+        fn encode(&mut self, item: MsgRef<'a>, dst: &mut BytesMut) -> Result<()> {
+            const U64_LEN: usize = std::mem::size_of::<u64>();
+            let payload = dst.split_off(U64_LEN);
+            let mut writer = payload.writer();
+            write_to(&VerMsgRef::V0(item), &mut writer)?;
+            let payload = writer.into_inner();
+            let payload_len = payload.len() as u64;
+            let mut writer = dst.writer();
+            write_to(&payload_len, &mut writer)?;
+            let dst = writer.into_inner();
+            dst.unsplit(payload);
+            Ok(())
+        }
+    }
+
+    impl Decoder for MessageCodec {
+        type Item = MsgOwned;
+        type Error = btlib::Error;
 
-        fn send_new<'a>(&mut self, to: &BlockPath, from: &BlockPath, body: MsgBodyRef<'a>) -> Result<()> {
-            let id = self.gen_id()?;
-            let msg = MsgRef {
-                to,
-                from,
-                id,
-                body,
+        fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
+            let mut slice: &[u8] = src.as_ref();
+            let payload_len: u64 = match read_from(&mut slice) {
+                Ok(payload_len) => payload_len,
+                Err(err) => {
+                    if let btserde::Error::Eof = err {
+                        return Ok(None);
+                    }
+                    return Err(err.into());
+                }
             };
-            self.send(msg)
+            let payload_len: usize = payload_len.try_into().box_err()?;
+            if slice.len() < payload_len {
+                src.reserve(payload_len - slice.len());
+                return Ok(None);
+            }
+            let VerMsgOwned::V0(msg) = read_from(&mut slice)?;
+            let _ = src.split_to(std::mem::size_of::<u64>() + payload_len);
+            Ok(Some(msg))
         }
     }
 
-    pub trait Receiver {
-        fn receive(&mut self) -> Result<MsgOwned>;
-        fn path(&self) -> &BlockPath;
+    /// Wraps a [UnixDatagram] and implements [AsyncRead] and [AsyncWrite] for it. Read operations
+    /// are translated  to calls to `recv_from` and write operations are translated to `send`. Note
+    /// that this means that writes will fail unless the wrapped socket is connected  to a peer.
+    struct DatagramAdapter {
+        socket: UnixDatagram,
     }
 
-    pub trait Channel: Sized + Receiver {
-        type Sender: Sender;
+    impl DatagramAdapter {
+        fn new(socket: UnixDatagram) -> Self {
+            Self { socket }
+        }
+
+        fn get_ref(&self) -> &UnixDatagram {
+            &self.socket
+        }
 
-        fn new(receiver_path: BlockPath) -> Result<Self>;
-        fn connect_to(receiver_path: BlockPath) -> Result<Self::Sender>;
+        fn get_mut(&mut self) -> &mut UnixDatagram {
+            &mut self.socket
+        }
     }
 
-    pub struct WriteSender<W> {
-        write: W,
-        path: BlockPath,
+    impl AsRef<UnixDatagram> for DatagramAdapter {
+        fn as_ref(&self) -> &UnixDatagram {
+            self.get_ref()
+        }
     }
 
-    impl<W> WriteSender<W> {
-        pub fn new(write: W, path: BlockPath) -> Self {
-            Self { write, path }
+    impl AsMut<UnixDatagram> for DatagramAdapter {
+        fn as_mut(&mut self) -> &mut UnixDatagram {
+            self.get_mut()
         }
     }
 
-    impl<W: Write + Send> Sender for WriteSender<W> {
-        fn send<'a>(&mut self, msg: MsgRef<'a>) -> Result<()> {
-            Ok(write_to(&VerMsgRef::V0(msg), &mut self.write)?)
+    impl AsyncRead for DatagramAdapter {
+        fn poll_read(
+            self: Pin<&mut Self>,
+            cx: &mut Context<'_>,
+            buf: &mut ReadBuf<'_>,
+        ) -> Poll<io::Result<()>> {
+            self.socket.poll_recv(cx, buf)
         }
+    }
 
-        fn path(&self) -> &BlockPath {
-            &self.path
+    impl AsyncWrite for DatagramAdapter {
+        fn poll_write(
+            self: Pin<&mut Self>,
+            cx: &mut Context<'_>,
+            buf: &[u8],
+        ) -> Poll<io::Result<usize>> {
+            self.socket.poll_send(cx, buf)
+        }
+
+        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+            Poll::Ready(self.socket.shutdown(Shutdown::Write))
         }
+
+        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+            Poll::Ready(Ok(()))
+        }
+    }
+
+    /// Returns a [Receiver] which can be used to receive messages addressed to the given path.
+    /// The `fs_path` argument specifies the filesystem directory under which the receiver's socket
+    /// will be stored.
+    pub fn local_receiver(fs_path: PathBuf, block_path: BlockPath) -> Result<impl Receiver> {
+        UnixReceiver::new(fs_path, block_path)
     }
 
-    pub struct ReadReceiver<R> {
-        read: R,
+    /// An implementation of [Receiver] which uses a Unix datagram socket for receiving messages.
+    struct UnixReceiver {
         path: BlockPath,
+        socket: FramedRead<DatagramAdapter, MessageCodec>,
     }
 
-    impl<R> ReadReceiver<R> {
-        pub fn new(read: R, path: BlockPath) -> Self {
-            Self { read, path }
+    impl UnixReceiver {
+        fn new(mut fs_path: PathBuf, block_path: BlockPath) -> Result<Self> {
+            socket_path(&mut fs_path, &block_path);
+            std::fs::create_dir_all(fs_path.parent().unwrap())?;
+            let socket = DatagramAdapter::new(UnixDatagram::bind(fs_path)?);
+            let socket = FramedRead::new(socket, MessageCodec);
+            Ok(Self {
+                path: block_path,
+                socket,
+            })
         }
     }
 
-    impl<R: Read> Receiver for ReadReceiver<R> {
-        fn receive(&mut self) -> Result<MsgOwned> {
-            let VerMsg::V0(msg) = read_from(&mut self.read)?;
-            Ok(msg)
+    impl Stream for UnixReceiver {
+        type Item = Result<MsgOwned>;
+
+        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+            self.socket.poll_next_unpin(cx)
         }
+    }
 
+    impl Receiver for UnixReceiver {
         fn path(&self) -> &BlockPath {
             &self.path
         }
     }
 
-    pub struct UnixChannel {
-        receiver_path: BlockPath,
-        listener: UnixListener,
-        streams: Vec<ReadReceiver<UnixStream>>,
-        next: usize,
+    /// Returns a [Sender] which can be used to send messages to the given Blocktree path.
+    /// The `fs_path` argument specifies the filesystem directory in which to locate the
+    /// socket of the recipient.
+    pub fn local_sender(
+        fs_path: PathBuf,
+        block_path: BlockPath,
+    ) -> Result<impl for<'a> Sender<'a>> {
+        UnixSender::new(fs_path, block_path)
     }
 
-    impl Receiver for UnixChannel {
-        fn path(&self) -> &BlockPath {
-            &self.receiver_path
+    /// An implementation of [Sender] which uses a Unix datagram socket to send messages.
+    struct UnixSender {
+        path: BlockPath,
+        socket: FramedWrite<DatagramAdapter, MessageCodec>,
+    }
+
+    impl UnixSender {
+        fn new(mut fs_path: PathBuf, block_path: BlockPath) -> Result<Self> {
+            let socket = UnixDatagram::unbound()?;
+            socket_path(&mut fs_path, &block_path);
+            socket.connect(fs_path)?;
+            let socket = FramedWrite::new(DatagramAdapter::new(socket), MessageCodec);
+            Ok(Self {
+                path: block_path,
+                socket,
+            })
         }
+    }
 
-        fn receive(&mut self) -> Result<MsgOwned> {
-            // Note that the listener is put in non-blocking mode when it is created.
-            match self.listener.accept() {
-                Ok((socket, ..)) => {
-                    self.streams.push(ReadReceiver::new(socket, self.receiver_path.clone()));
-                }
-                Err(err) => {
-                    // If the error is anything other than `WouldBlock`, then it is unexpected.
-                    if io::ErrorKind::WouldBlock != err.kind() {
-                        return Err(err.into());
-                    }
-                }
-            }
-            let receiver = self.streams.get_mut(self.next).unwrap();
-            self.next += 1;
-            receiver.receive()
+    impl Sink<MsgRef<'_>> for UnixSender {
+        type Error = btlib::Error;
+
+        fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+            self.socket.poll_ready_unpin(cx)
+        }
+
+        fn start_send(mut self: Pin<&mut Self>, item: MsgRef<'_>) -> Result<()> {
+            self.socket.start_send_unpin(item)
+        }
+
+        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+            self.socket.poll_flush_unpin(cx)
+        }
+
+        fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+            self.socket.poll_close_unpin(cx)
         }
     }
 
-    impl Channel for UnixChannel {
-        type Sender = WriteSender<UnixStream>;
+    impl<'a> Sender<'a> for UnixSender {
+        type SendFut = Send<'a, FramedWrite<DatagramAdapter, MessageCodec>, MsgRef<'a>>;
 
-        fn new(receiver_path: BlockPath) -> Result<Self> {
-            let path = socket_path(&receiver_path);
-            std::fs::create_dir_all(path.parent().unwrap())?;
-            let listener = UnixListener::bind(&path)?;
-            listener.set_nonblocking(true)?;
-            Ok(UnixChannel { receiver_path, listener, streams: Vec::new(), next: 0 })
+        fn path(&self) -> &BlockPath {
+            &self.path
         }
 
-        fn connect_to(receiver_path: BlockPath) -> Result<Self::Sender> {
-            let path = socket_path(&receiver_path);
-            let stream = UnixStream::connect(&path)?;
-            let sender = WriteSender::new(stream, receiver_path);
-            Ok(sender)
+        fn send_msg(&'a mut self, from: &'a BlockPath, body: MsgBodyRef<'a>) -> Self::SendFut {
+            let id = Self::gen_id().unwrap();
+            let msg = MsgRef::new(&self.path, from, id, body);
+            self.socket.send(msg)
         }
     }
+
+    /// Returns a connected [Sender] and [Receiver].
+    pub fn local_pair(
+        dir: PathBuf,
+        block_path: BlockPath,
+    ) -> Result<(impl for<'a> Sender<'a>, impl Receiver)> {
+        let receiver = local_receiver(dir.clone(), block_path.clone())?;
+        let sender = local_sender(dir, block_path)?;
+        Ok((sender, receiver))
+    }
 }
 
 #[cfg(test)]
 mod tests {
     use super::*;
 
-    //#[test]
-    //fn ping_pong_via_pipes() {
-    //    let ping_path = test_helpers::make_path(vec!["ping"]);
-    //    let (mut ping_sender, mut ping_receiver) =
-    //        PipeChannel::new(ping_path.clone()).expect("failed to create a ping channel");
-    //    let pong_path = test_helpers::make_path(vec!["pong"]);
-    //    let (mut pong_sender, mut pong_receiver) =
-    //        PipeChannel::new(pong_path.clone()).expect("failed to create a pong channel");
-    //    let pong = std::thread::spawn(move || loop {
-    //        let msg = pong_receiver.receive().expect("pong receive failed");
-    //        assert_eq!(pong_receiver.path(), msg.to());
-    //        match msg.body() {
-    //            MsgBody::Ping => ping_sender
-    //                .send_new(ping_path.clone(), MsgBody::Success)
-    //                .expect("send to ping failed"),
-    //            MsgBody::Success => return,
-    //            _ => panic!("unexpected message received by pong"),
-    //        }
-    //    });
-    //    let mut iter = 5;
-    //    while iter > 0 {
-    //        pong_sender
-    //            .send_new(pong_path.clone(), MsgBody::Ping)
-    //            .expect("send to pong failed");
-    //        let msg = ping_receiver.receive().expect("ping receive failed");
-    //        assert_eq!(ping_receiver.path(), msg.to());
-    //        match msg.body() {
-    //            MsgBody::Success => iter -= 1,
-    //            _ => panic!("unexpected message received by ping"),
-    //        }
-    //    }
-    //    pong_sender
-    //        .send_new(pong_path.clone(), MsgBody::Success)
-    //        .expect("send success to pong failed");
-    //    pong.join().expect("join failed");
-    //}
+    use tempdir::TempDir;
+
+    lazy_static! {
+        static ref BT_ROOT: BlockPath =
+            BlockPath::try_from("0!dSip4J0kurN5VhVo_aTipM-ywOOWrqJuRRVQ7aa-bew").unwrap();
+    }
+
+    fn block_path<'a, I: Iterator<Item = &'a str>>(components: I) -> BlockPath {
+        let mut path = BT_ROOT.clone();
+        for component in components {
+            path.push_component(component.to_string());
+        }
+        path
+    }
+
+    struct TestCase {
+        dir: TempDir,
+    }
+
+    impl TestCase {
+        fn new() -> TestCase {
+            Self {
+                dir: TempDir::new("btmsg").unwrap(),
+            }
+        }
+
+        fn endpoint(&self, name: &str) -> (BlockPath, impl for<'a> Sender<'a>, impl Receiver) {
+            let block_path = block_path(["apps", name].into_iter());
+            let (sender, receiver) =
+                local_pair(self.dir.path().to_owned(), block_path.clone()).unwrap();
+            (block_path, sender, receiver)
+        }
+    }
+
+    #[tokio::test]
+    async fn message_received_is_message_sent() {
+        let case = TestCase::new();
+        let (block_path, mut sender, mut receiver) = case.endpoint("social");
+
+        sender
+            .send_msg(&block_path, MsgBodyRef::Ping)
+            .await
+            .unwrap();
+        let actual = receiver.next().await.unwrap().unwrap();
+
+        let matched = if let MsgBodyOwned::Ping = actual.body() {
+            true
+        } else {
+            false
+        };
+        assert!(matched);
+        assert_eq!(&block_path, actual.to());
+        assert_eq!(&block_path, actual.from());
+    }
+
+    #[tokio::test]
+    async fn ping_pong() {
+        let case = TestCase::new();
+        let (block_path_one, mut sender_one, mut receiver_one) = case.endpoint("one");
+        let (block_path_two, mut sender_two, mut receiver_two) = case.endpoint("two");
+
+        let handle = tokio::spawn(async move {
+            let msg = receiver_one.next().await.unwrap().unwrap();
+            let reply_body = if let MsgBodyOwned::Ping = msg.body() {
+                MsgBodyRef::Success
+            } else {
+                MsgBodyRef::Fail(&MsgError::Unknown)
+            };
+            sender_two
+                .send_msg(&block_path_one, reply_body)
+                .await
+                .unwrap();
+        });
+
+        sender_one
+            .send_msg(&block_path_two, MsgBodyRef::Ping)
+            .await
+            .unwrap();
+        handle.await.unwrap();
+        let reply = receiver_two.next().await.unwrap().unwrap();
+        let matched = if let MsgBodyOwned::Success = reply.body() {
+            true
+        } else {
+            false
+        };
+        assert!(matched)
+    }
+
+    #[tokio::test]
+    async fn read_write() {
+        let case = TestCase::new();
+        let (block_path_one, mut sender_one, mut receiver_one) = case.endpoint("one");
+        let (block_path_two, mut sender_two, mut receiver_two) = case.endpoint("two");
+
+        let handle = tokio::spawn(async move {
+            let data: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
+            let msg = receiver_one.next().await.unwrap().unwrap();
+            let reply_body = if let MsgBodyOwned::Read { offset, size } = msg.body() {
+                let offset: usize = (*offset).try_into().unwrap();
+                let size: usize = (*size).try_into().unwrap();
+                let end: usize = offset + size;
+                MsgBodyRef::Write {
+                    offset: offset as u64,
+                    buf: &data[offset..end],
+                }
+            } else {
+                MsgBodyRef::Fail(&MsgError::Unknown)
+            };
+            sender_two
+                .send_msg(&block_path_one, reply_body)
+                .await
+                .unwrap();
+        });
+
+        sender_one
+            .send_msg(&block_path_two, MsgBodyRef::Read { offset: 2, size: 2 })
+            .await
+            .unwrap();
+        handle.await.unwrap();
+        let reply = receiver_two.next().await.unwrap().unwrap();
+        if let MsgBodyOwned::Write { offset, buf } = reply.body() {
+            assert_eq!(2, *offset);
+            assert_eq!([2, 3].as_slice(), buf.as_slice());
+        } else {
+            panic!("replay was not the right type");
+        };
+    }
 }

+ 1 - 1
crates/btnode/src/main.rs

@@ -5,7 +5,7 @@ use std::io::{self, BufWriter, Write};
 
 #[allow(dead_code)]
 fn send<W: Write>(stdout: &mut BufWriter<W>, msg: &Message) {
-    if let Err(err) = write_to(msg, stdout) {
+    if let Err(err) = write_to(msg, &mut *stdout) {
         error!("Failed to serialize message {:?}", err);
         return;
     }

+ 13 - 13
crates/btserde/src/ser.rs

@@ -12,12 +12,12 @@ use super::error::{Error, MapError, Result};
 
 type Ok = ();
 
-pub struct Serializer<'w, W: Write> {
-    output: &'w mut W,
+pub struct Serializer<W: Write> {
+    output: W,
 }
 
-impl<'w, W: Write> Serializer<'w, W> {
-    pub fn new(write: &'w mut W) -> Serializer<'w, W> {
+impl<W: Write> Serializer<W> {
+    pub fn new(write: W) -> Serializer<W> {
         Serializer { output: write }
     }
 }
@@ -28,7 +28,7 @@ pub fn to_vec<T: Serialize + ?Sized>(value: &T) -> Result<Vec<u8>> {
     Ok(vec)
 }
 
-pub fn write_to<T: Serialize + ?Sized, W: Write>(value: &T, write: &mut W) -> Result<()> {
+pub fn write_to<T: Serialize + ?Sized, W: Write>(value: &T, write: W) -> Result<()> {
     let mut serializer = Serializer::new(write);
     value.serialize(&mut serializer)
 }
@@ -47,7 +47,7 @@ fn convert_variant_index(index: u32) -> Result<u16> {
     u16::try_from(index).map_err(|_| Error::TooManyVariants(index))
 }
 
-impl<'a, 'w, T: Write> ser::Serializer for &'a mut Serializer<'w, T> {
+impl<'a, T: Write> ser::Serializer for &'a mut Serializer<T> {
     type Ok = Ok;
     type Error = Error;
     type SerializeSeq = Self;
@@ -291,7 +291,7 @@ impl<'a, 'w, T: Write> ser::Serializer for &'a mut Serializer<'w, T> {
     }
 }
 
-impl<'a, 'w, W: Write> SerializeSeq for &'a mut Serializer<'w, W> {
+impl<'a, W: Write> SerializeSeq for &'a mut Serializer<W> {
     type Ok = Ok;
     type Error = Error;
 
@@ -306,7 +306,7 @@ impl<'a, 'w, W: Write> SerializeSeq for &'a mut Serializer<'w, W> {
     }
 }
 
-impl<'a, 'w, W: Write> SerializeTuple for &'a mut Serializer<'w, W> {
+impl<'a, W: Write> SerializeTuple for &'a mut Serializer<W> {
     type Ok = Ok;
     type Error = Error;
 
@@ -320,7 +320,7 @@ impl<'a, 'w, W: Write> SerializeTuple for &'a mut Serializer<'w, W> {
     }
 }
 
-impl<'a, 'w, W: Write> SerializeTupleStruct for &'a mut Serializer<'w, W> {
+impl<'a, W: Write> SerializeTupleStruct for &'a mut Serializer<W> {
     type Ok = Ok;
     type Error = Error;
 
@@ -334,7 +334,7 @@ impl<'a, 'w, W: Write> SerializeTupleStruct for &'a mut Serializer<'w, W> {
     }
 }
 
-impl<'a, 'w, W: Write> SerializeTupleVariant for &'a mut Serializer<'w, W> {
+impl<'a, W: Write> SerializeTupleVariant for &'a mut Serializer<W> {
     type Ok = Ok;
     type Error = Error;
 
@@ -348,7 +348,7 @@ impl<'a, 'w, W: Write> SerializeTupleVariant for &'a mut Serializer<'w, W> {
     }
 }
 
-impl<'a, 'w, W: Write> SerializeMap for &'a mut Serializer<'w, W> {
+impl<'a, W: Write> SerializeMap for &'a mut Serializer<W> {
     type Ok = Ok;
     type Error = Error;
 
@@ -367,7 +367,7 @@ impl<'a, 'w, W: Write> SerializeMap for &'a mut Serializer<'w, W> {
     }
 }
 
-impl<'a, 'w, W: Write> SerializeStruct for &'a mut Serializer<'w, W> {
+impl<'a, W: Write> SerializeStruct for &'a mut Serializer<W> {
     type Ok = Ok;
     type Error = Error;
 
@@ -385,7 +385,7 @@ impl<'a, 'w, W: Write> SerializeStruct for &'a mut Serializer<'w, W> {
     }
 }
 
-impl<'a, 'w, W: Write> SerializeStructVariant for &'a mut Serializer<'w, W> {
+impl<'a, W: Write> SerializeStructVariant for &'a mut Serializer<W> {
     type Ok = Ok;
     type Error = Error;