فهرست منبع

* Removed the `Transmitter` and `Receiver` traits changed all usages to
`QuicTransmitter` and `QuicReceiver`. This was motivated by the fact that
`btmsg` is only responsible for delivering remote messages, and so the orignal
motivation using a trait is no longer present.
* Renamed `QuicTransmitter` to `Transmitter` and `QuicReceiver` to `Receiver`.
* Factored the types out of `lib.rs` in `btmsg` into spepates modules in different
files.

Matthew Carr 1 سال پیش
والد
کامیت
41715dbf47

+ 33 - 33
crates/btfproto/src/client.rs

@@ -119,50 +119,50 @@ where
     }
 }
 
-pub struct FsClient<T> {
-    tx: T,
+pub struct FsClient {
+    tx: Transmitter,
 }
 
-impl<T> FsClient<T> {
-    pub fn new(tx: T) -> Self {
+impl FsClient {
+    pub fn new(tx: Transmitter) -> Self {
         Self { tx }
     }
 
-    pub fn into_inner(self) -> T {
+    pub fn into_inner(self) -> Transmitter {
         self.tx
     }
 
-    pub fn get_ref(&self) -> &T {
+    pub fn get_ref(&self) -> &Transmitter {
         &self.tx
     }
 
-    pub fn get_mut(&mut self) -> &mut T {
+    pub fn get_mut(&mut self) -> &mut Transmitter {
         &mut self.tx
     }
 }
 
-impl<T: Send + Sync + Transmitter> FsProvider for FsClient<T> {
-    type LookupFut<'c> = impl 'c  + Send + Future<Output = Result<LookupReply>> where T: 'c;
+impl FsProvider for FsClient {
+    type LookupFut<'c> = impl 'c + Send + Future<Output = Result<LookupReply>>;
     fn lookup<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Lookup<'c>) -> Self::LookupFut<'c> {
         self.tx
             .call(FsMsg::Lookup(msg), extractor!(Lookup))
             .map(|e| e?)
     }
 
-    type CreateFut<'c> = impl 'c + Send + Future<Output = Result<CreateReply>> where T: 'c;
+    type CreateFut<'c> = impl 'c + Send + Future<Output = Result<CreateReply>>;
     fn create<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Create<'c>) -> Self::CreateFut<'c> {
         self.tx
             .call(FsMsg::Create(msg), extractor!(Create))
             .map(|e| e?)
     }
 
-    type OpenFut<'c> = impl 'c + Send + Future<Output = Result<OpenReply>> where T: 'c;
+    type OpenFut<'c> = impl 'c + Send + Future<Output = Result<OpenReply>>;
     fn open<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c> {
         self.tx.call(FsMsg::Open(msg), extractor!(Open)).map(|e| e?)
     }
 
     type ReadGuard = Vec<u8>;
-    type ReadFut<'c> = impl 'c + Send + Future<Output = Result<Self::ReadGuard>> where T: 'c;
+    type ReadFut<'c> = impl 'c + Send + Future<Output = Result<Self::ReadGuard>>;
     fn read<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Read) -> Self::ReadFut<'c> {
         let callback = ExtractRead::new(|reply: ReadReply<'_>| {
             // TODO: Use a pool of buffers rather than allocating a new one each read.
@@ -171,43 +171,43 @@ impl<T: Send + Sync + Transmitter> FsProvider for FsClient<T> {
         self.tx.call(FsMsg::Read(msg), callback).map(|e| e?)
     }
 
-    type WriteFut<'r> = impl 'r + Send + Future<Output = Result<WriteReply>> where T: 'r;
+    type WriteFut<'r> = impl 'r + Send + Future<Output = Result<WriteReply>>;
     fn write<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Write<&'c [u8]>) -> Self::WriteFut<'c> {
         self.tx
             .call(FsMsg::Write(msg), extractor!(Write))
             .map(|e| e?)
     }
 
-    type FlushFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
+    type FlushFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
     fn flush<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Flush) -> Self::FlushFut<'c> {
         self.tx.call(FsMsg::Flush(msg), AckCallback).map(|e| e?)
     }
 
-    type ReadDirFut<'c> = impl 'c + Send + Future<Output = Result<ReadDirReply>> where T: 'c;
+    type ReadDirFut<'c> = impl 'c + Send + Future<Output = Result<ReadDirReply>>;
     fn read_dir<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: ReadDir) -> Self::ReadDirFut<'c> {
         self.tx
             .call(FsMsg::ReadDir(msg), extractor!(ReadDir))
             .map(|e| e?)
     }
 
-    type LinkFut<'c> = impl 'c + Send + Future<Output = Result<LinkReply>> where T: 'c;
+    type LinkFut<'c> = impl 'c + Send + Future<Output = Result<LinkReply>>;
     fn link<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Link<'c>) -> Self::LinkFut<'c> {
         self.tx.call(FsMsg::Link(msg), extractor!(Link)).map(|e| e?)
     }
 
-    type UnlinkFut<'c> = impl 'c +  Send + Future<Output = Result<()>> where T: 'c;
+    type UnlinkFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
     fn unlink<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Unlink<'c>) -> Self::UnlinkFut<'c> {
         self.tx.call(FsMsg::Unlink(msg), AckCallback).map(|e| e?)
     }
 
-    type ReadMetaFut<'c> = impl 'c + Send + Future<Output = Result<ReadMetaReply>> where T: 'c;
+    type ReadMetaFut<'c> = impl 'c + Send + Future<Output = Result<ReadMetaReply>>;
     fn read_meta<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: ReadMeta) -> Self::ReadMetaFut<'c> {
         self.tx
             .call(FsMsg::ReadMeta(msg), extractor!(ReadMeta))
             .map(|e| e?)
     }
 
-    type WriteMetaFut<'c> = impl 'c + Send + Future<Output = Result<WriteMetaReply>> where T: 'c;
+    type WriteMetaFut<'c> = impl 'c + Send + Future<Output = Result<WriteMetaReply>>;
     fn write_meta<'c>(
         &'c self,
         _from: &'c Arc<BlockPath>,
@@ -218,32 +218,32 @@ impl<T: Send + Sync + Transmitter> FsProvider for FsClient<T> {
             .map(|e| e?)
     }
 
-    type AllocateFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
+    type AllocateFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
     fn allocate<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Allocate) -> Self::AllocateFut<'c> {
         self.tx.call(FsMsg::Allocate(msg), AckCallback).map(|e| e?)
     }
 
-    type CloseFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
+    type CloseFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
     fn close<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Close) -> Self::CloseFut<'c> {
         self.tx.call(FsMsg::Close(msg), AckCallback).map(|e| e?)
     }
 
-    type ForgetFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
+    type ForgetFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
     fn forget<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Forget) -> Self::ForgetFut<'c> {
         self.tx.call(FsMsg::Forget(msg), AckCallback).map(|e| e?)
     }
 
-    type LockFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
+    type LockFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
     fn lock<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Lock) -> Self::LockFut<'c> {
         self.tx.call(FsMsg::Lock(msg), AckCallback).map(|e| e?)
     }
 
-    type UnlockFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
+    type UnlockFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
     fn unlock<'c>(&'c self, _from: &'c Arc<BlockPath>, msg: Unlock) -> Self::UnlockFut<'c> {
         self.tx.call(FsMsg::Unlock(msg), AckCallback).map(|e| e?)
     }
 
-    type AddReacapFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
+    type AddReacapFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
     fn add_readcap<'c>(
         &'c self,
         _from: &'c Arc<BlockPath>,
@@ -254,7 +254,7 @@ impl<T: Send + Sync + Transmitter> FsProvider for FsClient<T> {
             .map(|e| e?)
     }
 
-    type GrantAccessFut<'c> = impl 'c + Send + Future<Output = Result<()>> where T: 'c;
+    type GrantAccessFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
     fn grant_access<'c>(
         &'c self,
         _from: &'c Arc<BlockPath>,
@@ -266,7 +266,7 @@ impl<T: Send + Sync + Transmitter> FsProvider for FsClient<T> {
     }
 }
 
-impl<T: Transmitter> FsClient<T> {
+impl FsClient {
     /// Lookup up the given name under the given inode.
     pub async fn lookup(&self, parent: Inode, name: &str) -> Result<LookupReply> {
         let msg = FsMsg::Lookup(Lookup { parent, name });
@@ -444,20 +444,20 @@ impl<T: Transmitter> FsClient<T> {
     }
 }
 
-impl<T> AsRef<T> for FsClient<T> {
-    fn as_ref(&self) -> &T {
+impl AsRef<Transmitter> for FsClient {
+    fn as_ref(&self) -> &Transmitter {
         self.get_ref()
     }
 }
 
-impl<T> AsMut<T> for FsClient<T> {
-    fn as_mut(&mut self) -> &mut T {
+impl AsMut<Transmitter> for FsClient {
+    fn as_mut(&mut self) -> &mut Transmitter {
         self.get_mut()
     }
 }
 
-impl<T> From<T> for FsClient<T> {
-    fn from(value: T) -> Self {
+impl From<Transmitter> for FsClient {
+    fn from(value: Transmitter) -> Self {
         Self::new(value)
     }
 }

+ 3 - 7
crates/btfproto/src/server.rs

@@ -2,7 +2,7 @@
 use crate::msg::{Read as ReadMsg, *};
 
 use btlib::{crypto::Creds, BlockPath, Result};
-use btmsg::{receiver, MsgCallback, MsgReceived, Receiver};
+use btmsg::{MsgCallback, MsgReceived, Receiver};
 use core::future::Future;
 use std::{net::IpAddr, ops::Deref, sync::Arc};
 
@@ -286,14 +286,10 @@ impl<P: 'static + Send + Sync + FsProvider> MsgCallback for ServerCallback<P> {
     }
 }
 
-pub fn new_fs_server<C, P>(
-    ip_addr: IpAddr,
-    creds: Arc<C>,
-    provider: Arc<P>,
-) -> Result<impl Receiver>
+pub fn new_fs_server<C, P>(ip_addr: IpAddr, creds: Arc<C>, provider: Arc<P>) -> Result<Receiver>
 where
     C: 'static + Creds,
     P: 'static + FsProvider,
 {
-    receiver(ip_addr, creds, ServerCallback::new(provider))
+    Receiver::new(ip_addr, creds, ServerCallback::new(provider))
 }

+ 16 - 25
crates/btfsd/src/main.rs

@@ -55,7 +55,7 @@ async fn provider(block_dir: PathBuf, creds: Arc<dyn Creds>) -> Result<impl FsPr
     }
 }
 
-async fn receiver(config: BtfsdConfig) -> Result<impl Receiver> {
+async fn receiver(config: BtfsdConfig) -> Result<Receiver> {
     let node_creds = config.cred_store.consume(NodeCredConsumer)??;
     let provider = Arc::new(provider(config.block_dir, node_creds.clone()).await?);
     new_fs_server(config.ip_addr, Arc::new(node_creds), provider)
@@ -95,7 +95,7 @@ mod tests {
         AuthzAttrs, BlockMetaSecrets, Epoch, IssuedProcRec, Principaled, ProcRec,
     };
     use btlib_tests::{CredStoreTestingExt, TpmCredStoreHarness};
-    use btmsg::{BlockAddr, Transmitter};
+    use btmsg::BlockAddr;
     use btserde::from_slice;
     use std::net::{IpAddr, Ipv4Addr};
     use std::{future::ready, net::Ipv6Addr, time::Duration};
@@ -110,16 +110,13 @@ mod tests {
         env_logger::Builder::from_default_env().btformat().init();
     }
 
-    struct FileTestCase<R, T> {
-        client: FsClient<T>,
-        _rx: R,
+    struct FileTestCase {
+        client: FsClient,
+        _rx: Receiver,
         _dir: TempDir,
     }
 
-    async fn file_test_case(
-        dir: TempDir,
-        ipaddr: IpAddr,
-    ) -> FileTestCase<impl Receiver, impl Transmitter> {
+    async fn file_test_case(dir: TempDir, ipaddr: IpAddr) -> FileTestCase {
         let file_store_path = dir.path().join("cred_store");
         let credstore = CredStoreConfig::File {
             path: file_store_path.clone(),
@@ -145,14 +142,14 @@ mod tests {
         }
     }
 
-    async fn new_file_test_case() -> FileTestCase<impl Receiver, impl Transmitter> {
+    async fn new_file_test_case() -> FileTestCase {
         let dir = TempDir::new("btfsd").unwrap();
         file_test_case(dir, LOCALHOST).await
     }
 
-    struct TestCase<R, T> {
-        client: FsClient<T>,
-        rx: R,
+    struct TestCase {
+        client: FsClient,
+        rx: Receiver,
         harness: TpmCredStoreHarness,
         _dir: TempDir,
     }
@@ -161,11 +158,7 @@ mod tests {
     const LOCALHOST: IpAddr = IpAddr::V6(Ipv6Addr::LOCALHOST);
     const BT_DIR: &str = "bt";
 
-    async fn tpm_test_case(
-        dir: TempDir,
-        harness: TpmCredStoreHarness,
-        ipaddr: IpAddr,
-    ) -> TestCase<impl Receiver, impl Transmitter> {
+    async fn tpm_test_case(dir: TempDir, harness: TpmCredStoreHarness, ipaddr: IpAddr) -> TestCase {
         let swtpm = harness.swtpm();
         let credstore = CredStoreConfig::Tpm {
             path: swtpm.state_path().to_owned(),
@@ -188,16 +181,14 @@ mod tests {
         }
     }
 
-    async fn new_tpm_test_case() -> TestCase<impl Receiver, impl Transmitter> {
+    async fn new_tpm_test_case() -> TestCase {
         let dir = TempDir::new("btfsd").unwrap();
         let harness = TpmCredStoreHarness::new(ROOT_PASSWD.to_owned()).unwrap();
         tpm_test_case(dir, harness, LOCALHOST).await
     }
 
-    async fn existing_case<R: Receiver, T: Transmitter>(
-        case: TestCase<R, T>,
-    ) -> TestCase<impl Receiver, impl Transmitter> {
-        case.rx.stop().await.unwrap();
+    async fn existing_case(case: TestCase) -> TestCase {
+        case.rx.stop().unwrap();
         case.rx.complete().unwrap().await.unwrap();
         let TestCase {
             _dir,
@@ -213,7 +204,7 @@ mod tests {
         case.rx.complete().unwrap().await.unwrap();
     }
 
-    async fn create_write_read(client: FsClient<impl Transmitter>) {
+    async fn create_write_read(client: FsClient) {
         const FILENAME: &str = "file.txt";
         const EXPECTED: &[u8] = b"potato";
 
@@ -905,7 +896,7 @@ mod tests {
         let node_creds = case.harness.cred_store().node_creds().unwrap();
         let bind_path = node_creds.writecap().unwrap().bind_path();
         let block_addr = Arc::new(BlockAddr::new(LOCALHOST, Arc::new(bind_path)));
-        let tx = btmsg::transmitter(block_addr, Arc::new(user_creds))
+        let tx = btmsg::Transmitter::new(block_addr, Arc::new(user_creds))
             .await
             .unwrap();
         let client = FsClient::new(tx);

+ 23 - 23
crates/btfuse/src/main.rs

@@ -11,7 +11,7 @@ use btlib::{
     crypto::{Creds, CredsPriv},
     Result,
 };
-use btmsg::{transmitter, BlockAddr};
+use btmsg::{BlockAddr, Transmitter};
 use serde::{Deserialize, Serialize};
 use std::{
     fs::{self},
@@ -111,7 +111,7 @@ async fn remote_provider<C: 'static + Creds + Send + Sync>(
     remote_addr: BlockAddr,
     node_creds: C,
 ) -> Result<impl FsProvider> {
-    let tx = transmitter(Arc::new(remote_addr), Arc::new(node_creds)).await?;
+    let tx = Transmitter::new(Arc::new(remote_addr), Arc::new(node_creds)).await?;
     let client = FsClient::new(tx);
     Ok(client)
 }
@@ -283,27 +283,27 @@ mod test {
 
     const ROOT_PASSWD: &str = "password";
 
-    struct TestCase<R: Receiver> {
+    struct TestCase {
         config: BtfuseConfig,
         handle: Option<JoinHandle<()>>,
         node_principal: OsString,
         stop_flag: Option<()>,
         // Note that the drop order of these fields is significant.
-        _receiver: Option<R>,
+        _receiver: Option<Receiver>,
         _cred_store: TpmCredStore,
         _swtpm: SwtpmHarness,
         _temp_dir: TempDir,
     }
 
-    async fn new_local() -> TestCase<impl Receiver> {
+    async fn new_local() -> TestCase {
         new(false).await
     }
 
-    async fn new_remote() -> TestCase<impl Receiver> {
+    async fn new_remote() -> TestCase {
         new(true).await
     }
 
-    async fn new(remote: bool) -> TestCase<impl Receiver> {
+    async fn new(remote: bool) -> TestCase {
         let tmp = TempDir::new("btfuse").unwrap();
         let (mounted_tx, mounted_rx) = oneshot::channel();
         let (swtpm, cred_store) = swtpm();
@@ -379,7 +379,7 @@ mod test {
         (swtpm, cred_store)
     }
 
-    impl<R: Receiver> TestCase<R> {
+    impl TestCase {
         fn mnt_dir(&self) -> &Path {
             &self.config.mnt_dir
         }
@@ -410,7 +410,7 @@ mod test {
         }
     }
 
-    impl<R: Receiver> Drop for TestCase<R> {
+    impl Drop for TestCase {
         fn drop(&mut self) {
             self.signal_stop()
         }
@@ -425,7 +425,7 @@ mod test {
         case.stopped().await
     }
 
-    async fn write_read(mut case: TestCase<impl Receiver>) -> Result<()> {
+    async fn write_read(mut case: TestCase) -> Result<()> {
         const EXPECTED: &[u8] =
             b"The paths to failure are uncountable, yet to success there are few.";
         let file_path = case.mnt_dir().join("file");
@@ -452,7 +452,7 @@ mod test {
         write_read(new_remote().await).await
     }
 
-    async fn create_file_then_readdir(mut case: TestCase<impl Receiver>) {
+    async fn create_file_then_readdir(mut case: TestCase) {
         const DATA: &[u8] = b"Au revoir Shoshanna!";
         let file_name = OsStr::new("landa_dialog.txt");
         let mut expected = case.initial_contents();
@@ -480,7 +480,7 @@ mod test {
         create_file_then_readdir(new_remote().await).await
     }
 
-    async fn create_then_delete_file(mut case: TestCase<impl Receiver>) {
+    async fn create_then_delete_file(mut case: TestCase) {
         const DATA: &[u8] = b"The universe is hostile, so impersonal. Devour to survive";
         let file_name = OsStr::new("tool_lyrics.txt");
         let mnt_path = case.mnt_dir();
@@ -506,7 +506,7 @@ mod test {
         create_then_delete_file(new_remote().await).await
     }
 
-    async fn hard_link_then_remove(mut case: TestCase<impl Receiver>) {
+    async fn hard_link_then_remove(mut case: TestCase) {
         const EXPECTED: &[u8] = b"And the lives we've reclaimed";
         let name1 = OsStr::new("refugee_lyrics.txt");
         let name2 = OsStr::new("rise_against_lyrics.txt");
@@ -534,7 +534,7 @@ mod test {
         hard_link_then_remove(new_remote().await).await
     }
 
-    async fn hard_link_then_remove_both(mut case: TestCase<impl Receiver>) {
+    async fn hard_link_then_remove_both(mut case: TestCase) {
         const EXPECTED: &[u8] = b"And the lives we've reclaimed";
         let name1 = OsStr::new("refugee_lyrics.txt");
         let name2 = OsStr::new("rise_against_lyrics.txt");
@@ -568,7 +568,7 @@ mod test {
         hard_link_then_remove_both(new_remote().await).await
     }
 
-    async fn set_mode_bits(mut case: TestCase<impl Receiver>) {
+    async fn set_mode_bits(mut case: TestCase) {
         const EXPECTED: u32 = libc::S_IFREG | 0o777;
         let file_path = case.mnt_dir().join("bagobits");
         write(&file_path, []).await.expect("write failed");
@@ -603,7 +603,7 @@ mod test {
         set_mode_bits(new_remote().await).await
     }
 
-    async fn create_directory(mut case: TestCase<impl Receiver>) {
+    async fn create_directory(mut case: TestCase) {
         const EXPECTED: &str = "etc";
         let mnt_path = case.mnt_dir();
         let dir_path = mnt_path.join(EXPECTED);
@@ -628,7 +628,7 @@ mod test {
         create_directory(new_remote().await).await
     }
 
-    async fn create_file_under_new_directory(mut case: TestCase<impl Receiver>) {
+    async fn create_file_under_new_directory(mut case: TestCase) {
         const DIR_NAME: &str = "etc";
         const FILE_NAME: &str = "file";
         let mnt_path = case.mnt_dir();
@@ -654,7 +654,7 @@ mod test {
         create_file_under_new_directory(new_remote().await).await
     }
 
-    async fn create_then_remove_directory(mut case: TestCase<impl Receiver>) {
+    async fn create_then_remove_directory(mut case: TestCase) {
         const DIR_NAME: &str = "etc";
         let mnt_path = case.mnt_dir();
         let dir_path = mnt_path.join(DIR_NAME);
@@ -678,7 +678,7 @@ mod test {
         create_then_remove_directory(new_remote().await).await
     }
 
-    async fn read_only_dir_cant_create_subdir(mut case: TestCase<impl Receiver>) {
+    async fn read_only_dir_cant_create_subdir(mut case: TestCase) {
         const DIR_NAME: &str = "etc";
         let dir_path = case.mnt_dir().join(DIR_NAME);
         create_dir(&dir_path).await.expect("create_dir failed");
@@ -705,7 +705,7 @@ mod test {
         read_only_dir_cant_create_subdir(new_remote().await).await
     }
 
-    async fn read_only_dir_cant_remove_subdir(mut case: TestCase<impl Receiver>) {
+    async fn read_only_dir_cant_remove_subdir(mut case: TestCase) {
         const DIR_NAME: &str = "etc";
         let dir_path = case.mnt_dir().join(DIR_NAME);
         let sub_path = dir_path.join("sub");
@@ -734,7 +734,7 @@ mod test {
         read_only_dir_cant_remove_subdir(new_remote().await).await
     }
 
-    async fn rename_file(mut case: TestCase<impl Receiver>) {
+    async fn rename_file(mut case: TestCase) {
         const FILE_NAME: &str = "parabola.txt";
         const EXPECTED: &[u8] = b"We are eternal all this pain is an illusion";
         let src_path = case.mnt_dir().join(FILE_NAME);
@@ -759,7 +759,7 @@ mod test {
         rename_file(new_remote().await).await
     }
 
-    async fn write_read_with_file_struct(mut case: TestCase<impl Receiver>) {
+    async fn write_read_with_file_struct(mut case: TestCase) {
         const FILE_NAME: &str = "big.dat";
         const LEN: usize = btlib::SECTOR_SZ_DEFAULT + 1;
         fn fill(buf: &mut Vec<u8>, value: u8) {
@@ -803,7 +803,7 @@ mod test {
 
     /// KMC: This test is currently not working, and I've not been able to figure out why, nor
     /// reproduce it at a lower layer of the stack.
-    async fn read_more_than_whats_buffered(mut case: TestCase<impl Receiver>) {
+    async fn read_more_than_whats_buffered(mut case: TestCase) {
         const FILE_NAME: &str = "big.dat";
         const SECT_SZ: usize = btlib::SECTOR_SZ_DEFAULT;
         const DIVISOR: usize = 8;

+ 62 - 0
crates/btmsg/src/common.rs

@@ -0,0 +1,62 @@
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//! Common types that are used in multiple modules.
+
+use std::{
+    fmt::Display,
+    net::{IpAddr, SocketAddr},
+    sync::Arc,
+};
+
+use btlib::BlockPath;
+use btserde::field_helpers::smart_ptr;
+use serde::{Deserialize, Serialize};
+
+use crate::Result;
+
+/// Trait for messages which can be transmitted using the call method.
+pub trait CallMsg<'de>: Serialize + Deserialize<'de> + Send + Sync {
+    type Reply<'r>: Serialize + Deserialize<'r> + Send;
+}
+
+/// Trait for messages which can be transmitted using the send method.
+/// Types which implement this trait should specify `()` as their reply type.
+pub trait SendMsg<'de>: CallMsg<'de> {}
+
+/// An address which identifies a block on the network. An instance of this struct can be
+/// used to get a socket address for the block this address refers to.
+#[derive(PartialEq, Eq, Hash, Clone, Debug, Serialize, Deserialize)]
+pub struct BlockAddr {
+    #[serde(rename = "ipaddr")]
+    ip_addr: IpAddr,
+    #[serde(with = "smart_ptr")]
+    path: Arc<BlockPath>,
+}
+
+impl BlockAddr {
+    pub fn new(ip_addr: IpAddr, path: Arc<BlockPath>) -> Self {
+        Self { ip_addr, path }
+    }
+
+    pub fn ip_addr(&self) -> IpAddr {
+        self.ip_addr
+    }
+
+    pub fn path(&self) -> &Arc<BlockPath> {
+        &self.path
+    }
+
+    fn port(&self) -> Result<u16> {
+        self.path.port()
+    }
+
+    /// Returns the socket address of the block this instance refers to.
+    pub fn socket_addr(&self) -> Result<SocketAddr> {
+        Ok(SocketAddr::new(self.ip_addr, self.port()?))
+    }
+}
+
+impl Display for BlockAddr {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}@{}", self.path, self.ip_addr)
+    }
+}

+ 10 - 756
crates/btmsg/src/lib.rs

@@ -2,764 +2,18 @@
 //! Code which enables sending messages between processes in the blocktree system.
 #![feature(impl_trait_in_assoc_type)]
 
-mod tls;
-use tls::*;
-mod callback_framed;
-use callback_framed::CallbackFramed;
-pub use callback_framed::DeserCallback;
-
-use btlib::{bterr, crypto::Creds, error::DisplayErr, BlockPath, Result, Writecap};
-use btserde::{field_helpers::smart_ptr, write_to};
-use bytes::{BufMut, BytesMut};
-use core::{
-    future::{ready, Future, Ready},
-    marker::Send,
-    pin::Pin,
-};
-use futures::{FutureExt, SinkExt};
-use log::{debug, error};
-use quinn::{Connection, ConnectionError, Endpoint, RecvStream, SendStream};
-use serde::{de::DeserializeOwned, Deserialize, Serialize};
-use std::{
-    any::Any,
-    fmt::Display,
-    hash::Hash,
-    io,
-    marker::PhantomData,
-    net::{IpAddr, Ipv6Addr, SocketAddr},
-    result::Result as StdResult,
-    sync::{Arc, Mutex as StdMutex},
-};
-use tokio::{
-    select,
-    sync::{broadcast, Mutex},
-    task::{JoinError, JoinHandle},
-};
-use tokio_util::codec::{Encoder, Framed, FramedParts, FramedWrite};
-
-/// Returns a [Receiver] bound to the given [IpAddr] which receives messages at the bind path of
-/// the [Writecap] in the given credentials. The returned type can be used to make
-/// [Transmitter]s for any path.
-pub fn receiver<C: 'static + Creds + Send + Sync, F: 'static + MsgCallback>(
-    ip_addr: IpAddr,
-    creds: Arc<C>,
-    callback: F,
-) -> Result<impl Receiver> {
-    let writecap = creds.writecap().ok_or(btlib::BlockError::MissingWritecap)?;
-    let addr = Arc::new(BlockAddr::new(ip_addr, Arc::new(writecap.bind_path())));
-    QuicReceiver::new(addr, Arc::new(CertResolver::new(creds)?), callback)
-}
-
-pub async fn transmitter<C: 'static + Creds + Send + Sync>(
-    addr: Arc<BlockAddr>,
-    creds: Arc<C>,
-) -> Result<impl Transmitter> {
-    let resolver = Arc::new(CertResolver::new(creds)?);
-    let endpoint = Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0))?;
-    QuicTransmitter::from_endpoint(endpoint, addr, resolver).await
-}
-
-pub trait MsgCallback: Clone + Send + Sync + Unpin {
-    type Arg<'de>: CallMsg<'de>
-    where
-        Self: 'de;
-    type CallFut<'de>: Future<Output = Result<()>> + Send
-    where
-        Self: 'de;
-    fn call<'de>(&'de self, arg: MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de>;
-}
-
-impl<T: MsgCallback> MsgCallback for &T {
-    type Arg<'de> = T::Arg<'de> where Self: 'de;
-    type CallFut<'de> = T::CallFut<'de> where Self: 'de;
-    fn call<'de>(&'de self, arg: MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de> {
-        (*self).call(arg)
-    }
-}
-
-/// Trait for messages which can be transmitted using the call method.
-pub trait CallMsg<'de>: Serialize + Deserialize<'de> + Send + Sync {
-    type Reply<'r>: Serialize + Deserialize<'r> + Send;
-}
-
-/// Trait for messages which can be transmitted using the send method.
-/// Types which implement this trait should specify `()` as their reply type.
-pub trait SendMsg<'de>: CallMsg<'de> {}
-
-/// An address which identifies a block on the network. An instance of this struct can be
-/// used to get a socket address for the block this address refers to.
-#[derive(PartialEq, Eq, Hash, Clone, Debug, Serialize, Deserialize)]
-pub struct BlockAddr {
-    #[serde(rename = "ipaddr")]
-    ip_addr: IpAddr,
-    #[serde(with = "smart_ptr")]
-    path: Arc<BlockPath>,
-}
-
-impl BlockAddr {
-    pub fn new(ip_addr: IpAddr, path: Arc<BlockPath>) -> Self {
-        Self { ip_addr, path }
-    }
-
-    pub fn ip_addr(&self) -> IpAddr {
-        self.ip_addr
-    }
-
-    pub fn path(&self) -> &BlockPath {
-        self.path.as_ref()
-    }
-
-    fn port(&self) -> Result<u16> {
-        self.path.port()
-    }
-
-    /// Returns the socket address of the block this instance refers to.
-    pub fn socket_addr(&self) -> Result<SocketAddr> {
-        Ok(SocketAddr::new(self.ip_addr, self.port()?))
-    }
-}
-
-impl Display for BlockAddr {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "{}@{}", self.path, self.ip_addr)
-    }
-}
-
-/// Indicates whether a message was sent using `call` or `send`.
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
-enum MsgKind {
-    /// This message expects exactly one reply.
-    Call,
-    /// This message expects exactly zero replies.
-    Send,
-}
-
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
-struct Envelope<T> {
-    kind: MsgKind,
-    msg: T,
-}
-
-impl<T> Envelope<T> {
-    fn send(msg: T) -> Self {
-        Self {
-            msg,
-            kind: MsgKind::Send,
-        }
-    }
-
-    fn call(msg: T) -> Self {
-        Self {
-            msg,
-            kind: MsgKind::Call,
-        }
-    }
-
-    fn msg(&self) -> &T {
-        &self.msg
-    }
-}
-
-#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
-enum ReplyEnvelope<T> {
-    Ok(T),
-    Err {
-        message: String,
-        os_code: Option<i32>,
-    },
-}
-
-impl<T> ReplyEnvelope<T> {
-    fn err(message: String, os_code: Option<i32>) -> Self {
-        Self::Err { message, os_code }
-    }
-}
-
-/// A message tagged with the block path that it was sent from.
-pub struct MsgReceived<T> {
-    from: Arc<BlockPath>,
-    msg: Envelope<T>,
-    replier: Option<Replier>,
-}
-
-impl<T> MsgReceived<T> {
-    fn new(from: Arc<BlockPath>, msg: Envelope<T>, replier: Option<Replier>) -> Self {
-        Self { from, msg, replier }
-    }
-
-    pub fn into_parts(self) -> (Arc<BlockPath>, T, Option<Replier>) {
-        (self.from, self.msg.msg, self.replier)
-    }
-
-    /// The path from which this message was received.
-    pub fn from(&self) -> &Arc<BlockPath> {
-        &self.from
-    }
-
-    /// Payload contained in this message.
-    pub fn body(&self) -> &T {
-        self.msg.msg()
-    }
-
-    /// Returns true if and only if this messages needs to be replied to.
-    pub fn needs_reply(&self) -> bool {
-        self.replier.is_some()
-    }
-
-    /// Takes the replier out of this struct and returns it, if it has not previously been returned.
-    pub fn take_replier(&mut self) -> Option<Replier> {
-        self.replier.take()
-    }
-}
-
-/// Trait for receiving messages and creating [Transmitter]s.
-pub trait Receiver {
-    /// The address at which messages will be received.
-    fn addr(&self) -> &Arc<BlockAddr>;
-
-    type Transmitter: Transmitter + Send;
-    type TransmitterFut<'a>: 'a + Future<Output = Result<Self::Transmitter>> + Send
-    where
-        Self: 'a;
-
-    /// Creates a [Transmitter] which is connected to the given address.
-    fn transmitter(&self, addr: Arc<BlockAddr>) -> Self::TransmitterFut<'_>;
-
-    type CompleteErr: std::error::Error + Send;
-    type CompleteFut<'a>: 'a + Future<Output = StdResult<(), Self::CompleteErr>> + Send
-    where
-        Self: 'a;
-    /// Returns a future which completes when this [Receiver] has completed (which may be never).
-    fn complete(&self) -> Result<Self::CompleteFut<'_>>;
-
-    type StopFut<'a>: 'a + Future<Output = Result<()>> + Send
-    where
-        Self: 'a;
-    fn stop(&self) -> Self::StopFut<'_>;
-}
-
-/// A type which can be used to transmit messages.
-pub trait Transmitter {
-    type SendFut<'call, T>: 'call + Future<Output = Result<()>> + Send
-    where
-        Self: 'call,
-        T: 'call + SendMsg<'call>;
-
-    /// Transmit a message to the connected [Receiver] without waiting for a reply.
-    fn send<'call, T: 'call + SendMsg<'call>>(&'call self, msg: T) -> Self::SendFut<'call, T>;
-
-    type CallFut<'call, T, F>: 'call + Future<Output = Result<F::Return>> + Send
-    where
-        Self: 'call,
-        T: 'call + CallMsg<'call>,
-        F: 'static + Send + DeserCallback;
-
-    /// Transmit a message to the connected [Receiver], waits for a reply, then calls the given
-    /// [DeserCallback] with the deserialized reply.
-    ///
-    /// ## WARNING
-    /// The callback must be such that `F::Arg<'a> = T::Reply<'a>` for any `'a`. If this
-    /// is violated, then a deserilization error will occur at runtime.
-    ///
-    /// ## TODO
-    /// This issue needs to be fixed. Due to the fact that
-    /// `F::Arg` is a Generic Associated Type (GAT) I have been unable to express this constraint in
-    /// the where clause of this method. I'm not sure if the errors I've encountered are due to a
-    /// lack of understanding on my part or due to the current limitations of the borrow checker in
-    /// its handling of GATs.
-    fn call<'call, T, F>(&'call self, msg: T, callback: F) -> Self::CallFut<'call, T, F>
-    where
-        T: 'call + CallMsg<'call>,
-        F: 'static + Send + DeserCallback;
-
-    /// Transmits a message to the connected [Receiver], waits for a reply, then passes back the
-    /// the reply to the caller.
-    fn call_through<'call, T>(
-        &'call self,
-        msg: T,
-    ) -> Self::CallFut<'call, T, Passthrough<T::Reply<'call>>>
-    where
-        T: 'call + CallMsg<'call>,
-        T::Reply<'call>: 'static + Send + Sync + DeserializeOwned,
-    {
-        self.call(msg, Passthrough::new())
-    }
-
-    /// Returns the address that this instance is transmitting to.
-    fn addr(&self) -> &Arc<BlockAddr>;
-}
-
-pub struct Passthrough<T> {
-    phantom: PhantomData<T>,
-}
+pub use btlib::Result;
 
-impl<T> Passthrough<T> {
-    pub fn new() -> Self {
-        Self {
-            phantom: PhantomData,
-        }
-    }
-}
+mod common;
+pub use common::{BlockAddr, CallMsg, SendMsg};
 
-impl<T> Default for Passthrough<T> {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl<T> Clone for Passthrough<T> {
-    fn clone(&self) -> Self {
-        Self::new()
-    }
-}
-
-impl<T: 'static + Send + DeserializeOwned> DeserCallback for Passthrough<T> {
-    type Arg<'de> = T;
-    type Return = T;
-    type CallFut<'de> = Ready<T>;
-    fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
-        ready(arg)
-    }
-}
-
-/// Encodes messages using [btserde].
-#[derive(Debug)]
-struct MsgEncoder;
-
-impl MsgEncoder {
-    fn new() -> Self {
-        Self
-    }
-}
-
-impl<T: Serialize> Encoder<T> for MsgEncoder {
-    type Error = btlib::Error;
-
-    fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<()> {
-        const U64_LEN: usize = std::mem::size_of::<u64>();
-        let payload = dst.split_off(U64_LEN);
-        let mut writer = payload.writer();
-        write_to(&item, &mut writer)?;
-        let payload = writer.into_inner();
-        let payload_len = payload.len() as u64;
-        let mut writer = dst.writer();
-        write_to(&payload_len, &mut writer)?;
-        let dst = writer.into_inner();
-        dst.unsplit(payload);
-        Ok(())
-    }
-}
-
-type FramedMsg = FramedWrite<SendStream, MsgEncoder>;
-type ArcMutex<T> = Arc<Mutex<T>>;
-
-#[derive(Clone)]
-pub struct Replier {
-    stream: ArcMutex<FramedMsg>,
-}
-
-impl Replier {
-    fn new(stream: ArcMutex<FramedMsg>) -> Self {
-        Self { stream }
-    }
-
-    pub async fn reply<T: Serialize + Send>(&mut self, reply: T) -> Result<()> {
-        let mut guard = self.stream.lock().await;
-        guard.send(ReplyEnvelope::Ok(reply)).await?;
-        Ok(())
-    }
-
-    pub async fn reply_err(&mut self, err: String, os_code: Option<i32>) -> Result<()> {
-        let mut guard = self.stream.lock().await;
-        guard.send(ReplyEnvelope::<()>::err(err, os_code)).await?;
-        Ok(())
-    }
-}
-
-struct MsgRecvdCallback<F> {
-    path: Arc<BlockPath>,
-    replier: Replier,
-    inner: F,
-}
-
-impl<F: MsgCallback> MsgRecvdCallback<F> {
-    fn new(path: Arc<BlockPath>, framed_msg: Arc<Mutex<FramedMsg>>, inner: F) -> Self {
-        Self {
-            path,
-            replier: Replier::new(framed_msg),
-            inner,
-        }
-    }
-}
-
-impl<F: 'static + MsgCallback> DeserCallback for MsgRecvdCallback<F> {
-    type Arg<'de> = Envelope<F::Arg<'de>> where Self: 'de;
-    type Return = Result<()>;
-    type CallFut<'de> = impl 'de + Future<Output = Self::Return> + Send where F: 'de, Self: 'de;
-    fn call<'de>(&'de mut self, arg: Envelope<F::Arg<'de>>) -> Self::CallFut<'de> {
-        let replier = match arg.kind {
-            MsgKind::Call => Some(self.replier.clone()),
-            MsgKind::Send => None,
-        };
-        async move {
-            let result = self
-                .inner
-                .call(MsgReceived::new(self.path.clone(), arg, replier))
-                .await;
-            match result {
-                Ok(value) => Ok(value),
-                Err(err) => match err.downcast::<io::Error>() {
-                    Ok(err) => {
-                        self.replier
-                            .reply_err(err.to_string(), err.raw_os_error())
-                            .await
-                    }
-                    Err(err) => self.replier.reply_err(err.to_string(), None).await,
-                },
-            }
-        }
-    }
-}
-
-macro_rules! handle_err {
-    ($result:expr, $on_err:expr, $control_flow:expr) => {
-        match $result {
-            Ok(inner) => inner,
-            Err(err) => {
-                $on_err(err);
-                $control_flow;
-            }
-        }
-    };
-}
-
-/// Unwraps the given result, or if the result is an error, returns from the enclosing function.
-macro_rules! unwrap_or_return {
-    ($result:expr, $on_err:expr) => {
-        handle_err!($result, $on_err, return)
-    };
-    ($result:expr) => {
-        unwrap_or_return!($result, |err| error!("{err}"))
-    };
-}
-
-/// Unwraps the given result, or if the result is an error, continues the enclosing loop.
-macro_rules! unwrap_or_continue {
-    ($result:expr, $on_err:expr) => {
-        handle_err!($result, $on_err, continue)
-    };
-    ($result:expr) => {
-        unwrap_or_continue!($result, |err| error!("{err}"))
-    };
-}
-
-/// Awaits its first argument, unless interrupted by its second argument, in which case the
-/// enclosing function returns. The second argument needs to be cancel safe, but the first
-/// need not be if it is discarded when the enclosing function returns (because losing messages
-/// from the first argument doesn't matter in this case).
-macro_rules! await_or_stop {
-    ($future:expr, $stop_fut:expr) => {
-        select! {
-            Some(connecting) = $future => connecting,
-            _ = $stop_fut => break,
-        }
-    };
-}
-
-struct QuicReceiver {
-    recv_addr: Arc<BlockAddr>,
-    stop_tx: broadcast::Sender<()>,
-    endpoint: Endpoint,
-    resolver: Arc<CertResolver>,
-    join_handle: StdMutex<Option<JoinHandle<()>>>,
-}
-
-impl QuicReceiver {
-    fn new<F: 'static + MsgCallback>(
-        recv_addr: Arc<BlockAddr>,
-        resolver: Arc<CertResolver>,
-        callback: F,
-    ) -> Result<Self> {
-        log::info!("starting QuicReceiver with address {}", recv_addr);
-        let socket_addr = recv_addr.socket_addr()?;
-        let endpoint = Endpoint::server(server_config(resolver.clone())?, socket_addr)?;
-        let (stop_tx, stop_rx) = broadcast::channel(1);
-        let join_handle = tokio::spawn(Self::server_loop(endpoint.clone(), callback, stop_rx));
-        Ok(Self {
-            recv_addr,
-            stop_tx,
-            endpoint,
-            resolver,
-            join_handle: StdMutex::new(Some(join_handle)),
-        })
-    }
-
-    async fn server_loop<F: 'static + MsgCallback>(
-        endpoint: Endpoint,
-        callback: F,
-        mut stop_rx: broadcast::Receiver<()>,
-    ) {
-        loop {
-            let connecting = await_or_stop!(endpoint.accept(), stop_rx.recv());
-            let connection = unwrap_or_continue!(connecting.await, |err| error!(
-                "error accepting QUIC connection: {err}"
-            ));
-            tokio::spawn(Self::handle_connection(
-                connection,
-                callback.clone(),
-                stop_rx.resubscribe(),
-            ));
-        }
-    }
-
-    async fn handle_connection<F: 'static + MsgCallback>(
-        connection: Connection,
-        callback: F,
-        mut stop_rx: broadcast::Receiver<()>,
-    ) {
-        let client_path = unwrap_or_return!(
-            Self::client_path(connection.peer_identity()),
-            |err| error!("failed to get client path from peer identity: {err}")
-        );
-        loop {
-            let result = await_or_stop!(connection.accept_bi().map(Some), stop_rx.recv());
-            let (send_stream, recv_stream) = match result {
-                Ok(pair) => pair,
-                Err(err) => match err {
-                    ConnectionError::ApplicationClosed(app) => {
-                        debug!("connection closed: {app}");
-                        return;
-                    }
-                    _ => {
-                        error!("error accepting stream: {err}");
-                        continue;
-                    }
-                },
-            };
-            let client_path = client_path.clone();
-            let callback = callback.clone();
-            tokio::task::spawn(Self::handle_message(
-                client_path,
-                send_stream,
-                recv_stream,
-                callback,
-            ));
-        }
-    }
-
-    async fn handle_message<F: 'static + MsgCallback>(
-        client_path: Arc<BlockPath>,
-        send_stream: SendStream,
-        recv_stream: RecvStream,
-        callback: F,
-    ) {
-        let framed_msg = Arc::new(Mutex::new(FramedWrite::new(send_stream, MsgEncoder::new())));
-        let callback =
-            MsgRecvdCallback::new(client_path.clone(), framed_msg.clone(), callback.clone());
-        let mut msg_stream = CallbackFramed::new(recv_stream);
-        let result = msg_stream
-            .next(callback)
-            .await
-            .ok_or_else(|| bterr!("client closed stream before sending a message"));
-        match unwrap_or_return!(result) {
-            Err(err) => error!("msg_stream produced an error: {err}"),
-            Ok(result) => {
-                if let Err(err) = result {
-                    error!("callback returned an error: {err}");
-                }
-            }
-        }
-    }
-
-    /// Returns the path the client is bound to.
-    fn client_path(peer_identity: Option<Box<dyn Any>>) -> Result<Arc<BlockPath>> {
-        let peer_identity =
-            peer_identity.ok_or_else(|| bterr!("connection did not contain a peer identity"))?;
-        let client_certs = peer_identity
-            .downcast::<Vec<rustls::Certificate>>()
-            .map_err(|_| bterr!("failed to downcast peer_identity to certificate chain"))?;
-        let first = client_certs
-            .first()
-            .ok_or_else(|| bterr!("no certificates were presented by the client"))?;
-        let (writecap, ..) = Writecap::from_cert_chain(first, &client_certs[1..])?;
-        Ok(Arc::new(writecap.bind_path()))
-    }
-}
-
-impl Drop for QuicReceiver {
-    fn drop(&mut self) {
-        // This result will be a failure if the tasks have already returned, which is not a
-        // problem.
-        let _ = self.stop_tx.send(());
-    }
-}
-
-impl Receiver for QuicReceiver {
-    fn addr(&self) -> &Arc<BlockAddr> {
-        &self.recv_addr
-    }
-
-    type Transmitter = QuicTransmitter;
-    type TransmitterFut<'a> = Pin<Box<dyn 'a + Future<Output = Result<QuicTransmitter>> + Send>>;
-
-    fn transmitter(&self, addr: Arc<BlockAddr>) -> Self::TransmitterFut<'_> {
-        Box::pin(async {
-            QuicTransmitter::from_endpoint(self.endpoint.clone(), addr, self.resolver.clone()).await
-        })
-    }
-
-    type CompleteErr = JoinError;
-    type CompleteFut<'a> = JoinHandle<()>;
-    fn complete(&self) -> Result<Self::CompleteFut<'_>> {
-        let mut guard = self.join_handle.lock().display_err()?;
-        let handle = guard
-            .take()
-            .ok_or_else(|| bterr!("join handle has already been taken"))?;
-        Ok(handle)
-    }
-
-    type StopFut<'a> = Ready<Result<()>>;
-    fn stop(&self) -> Self::StopFut<'_> {
-        ready(self.stop_tx.send(()).map(|_| ()).map_err(|err| err.into()))
-    }
-}
-
-macro_rules! cleanup_on_err {
-    ($result:expr, $guard:ident, $parts:ident) => {
-        match $result {
-            Ok(value) => value,
-            Err(err) => {
-                *$guard = Some($parts);
-                return Err(err.into());
-            }
-        }
-    };
-}
-
-struct QuicTransmitter {
-    addr: Arc<BlockAddr>,
-    connection: Connection,
-    send_parts: Mutex<Option<FramedParts<SendStream, MsgEncoder>>>,
-    recv_buf: Mutex<Option<BytesMut>>,
-}
-
-impl QuicTransmitter {
-    async fn from_endpoint(
-        endpoint: Endpoint,
-        addr: Arc<BlockAddr>,
-        resolver: Arc<CertResolver>,
-    ) -> Result<Self> {
-        let socket_addr = addr.socket_addr()?;
-        let connecting = endpoint.connect_with(
-            client_config(addr.path.clone(), resolver)?,
-            socket_addr,
-            // The ServerCertVerifier ensures we connect to the correct path.
-            "UNIMPORTANT",
-        )?;
-        let connection = connecting.await?;
-        let send_parts = Mutex::new(None);
-        let recv_buf = Mutex::new(Some(BytesMut::new()));
-        Ok(Self {
-            addr,
-            connection,
-            send_parts,
-            recv_buf,
-        })
-    }
-
-    async fn transmit<T: Serialize>(&self, envelope: Envelope<T>) -> Result<RecvStream> {
-        let mut guard = self.send_parts.lock().await;
-        let (send_stream, recv_stream) = self.connection.open_bi().await?;
-        let parts = match guard.take() {
-            Some(mut parts) => {
-                parts.io = send_stream;
-                parts
-            }
-            None => FramedParts::new::<Envelope<T>>(send_stream, MsgEncoder::new()),
-        };
-        let mut sink = Framed::from_parts(parts);
-        let result = sink.send(envelope).await;
-        let parts = sink.into_parts();
-        cleanup_on_err!(result, guard, parts);
-        *guard = Some(parts);
-        Ok(recv_stream)
-    }
-
-    async fn call<'ser, T, F>(&'ser self, msg: T, callback: F) -> Result<F::Return>
-    where
-        T: 'ser + CallMsg<'ser>,
-        F: 'static + Send + DeserCallback,
-    {
-        let recv_stream = self.transmit(Envelope::call(msg)).await?;
-        let mut guard = self.recv_buf.lock().await;
-        let buffer = guard.take().unwrap();
-        let mut callback_framed = CallbackFramed::from_parts(recv_stream, buffer);
-        let result = callback_framed
-            .next(ReplyCallback::new(callback))
-            .await
-            .ok_or_else(|| bterr!("server hung up before sending reply"));
-        let (_, buffer) = callback_framed.into_parts();
-        let output = cleanup_on_err!(result, guard, buffer);
-        *guard = Some(buffer);
-        output?
-    }
-}
-
-impl Transmitter for QuicTransmitter {
-    fn addr(&self) -> &Arc<BlockAddr> {
-        &self.addr
-    }
-
-    type SendFut<'ser, T> = impl 'ser + Future<Output = Result<()>> + Send
-        where T: 'ser + SendMsg<'ser>;
-
-    fn send<'ser, 'r, T: 'ser + SendMsg<'ser>>(&'ser self, msg: T) -> Self::SendFut<'ser, T> {
-        self.transmit(Envelope::send(msg))
-            .map(|result| result.map(|_| ()))
-    }
-
-    type CallFut<'ser, T, F> = impl 'ser + Future<Output = Result<F::Return>> + Send
-    where
-        Self: 'ser,
-        T: 'ser + CallMsg<'ser>,
-        F: 'static + Send + DeserCallback;
-
-    fn call<'ser, T, F>(&'ser self, msg: T, callback: F) -> Self::CallFut<'ser, T, F>
-    where
-        T: 'ser + CallMsg<'ser>,
-        F: 'static + Send + DeserCallback,
-    {
-        self.call(msg, callback)
-    }
-}
+mod tls;
 
-struct ReplyCallback<F> {
-    inner: F,
-}
+mod serialization;
+pub use serialization::DeserCallback;
 
-impl<F> ReplyCallback<F> {
-    fn new(inner: F) -> Self {
-        Self { inner }
-    }
-}
+mod transmitter;
+pub use transmitter::Transmitter;
 
-impl<F: 'static + Send + DeserCallback> DeserCallback for ReplyCallback<F> {
-    type Arg<'de> = ReplyEnvelope<F::Arg<'de>>;
-    type Return = Result<F::Return>;
-    type CallFut<'de> = impl 'de + Future<Output = Self::Return> + Send;
-    fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
-        async move {
-            match arg {
-                ReplyEnvelope::Ok(msg) => Ok(self.inner.call(msg).await),
-                ReplyEnvelope::Err { message, os_code } => {
-                    if let Some(os_code) = os_code {
-                        let err = bterr!(io::Error::from_raw_os_error(os_code)).context(message);
-                        Err(err)
-                    } else {
-                        Err(bterr!(message))
-                    }
-                }
-            }
-        }
-    }
-}
+mod receiver;
+pub use receiver::{MsgCallback, MsgReceived, Receiver, Replier};

+ 414 - 0
crates/btmsg/src/receiver.rs

@@ -0,0 +1,414 @@
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//! Types used for receiving messages over the network. Chiefly, the [Receiver] type.
+
+use std::{
+    any::Any,
+    future::Future,
+    io,
+    net::IpAddr,
+    sync::{Arc, Mutex as StdMutex},
+};
+
+use btlib::{bterr, crypto::Creds, error::DisplayErr, BlockPath, Writecap};
+use futures::{FutureExt, SinkExt};
+use log::{debug, error};
+use quinn::{Connection, ConnectionError, Endpoint, RecvStream, SendStream};
+use serde::{Deserialize, Serialize};
+use tokio::{
+    select,
+    sync::{broadcast, Mutex},
+    task::JoinHandle,
+};
+use tokio_util::codec::FramedWrite;
+
+use crate::{
+    serialization::{CallbackFramed, MsgEncoder},
+    tls::{server_config, CertResolver},
+    BlockAddr, CallMsg, DeserCallback, Result, Transmitter,
+};
+
+macro_rules! handle_err {
+    ($result:expr, $on_err:expr, $control_flow:expr) => {
+        match $result {
+            Ok(inner) => inner,
+            Err(err) => {
+                $on_err(err);
+                $control_flow;
+            }
+        }
+    };
+}
+
+/// Unwraps the given result, or if the result is an error, returns from the enclosing function.
+macro_rules! unwrap_or_return {
+    ($result:expr, $on_err:expr) => {
+        handle_err!($result, $on_err, return)
+    };
+    ($result:expr) => {
+        unwrap_or_return!($result, |err| error!("{err}"))
+    };
+}
+
+/// Unwraps the given result, or if the result is an error, continues the enclosing loop.
+macro_rules! unwrap_or_continue {
+    ($result:expr, $on_err:expr) => {
+        handle_err!($result, $on_err, continue)
+    };
+    ($result:expr) => {
+        unwrap_or_continue!($result, |err| error!("{err}"))
+    };
+}
+
+/// Awaits its first argument, unless interrupted by its second argument, in which case the
+/// enclosing function returns. The second argument needs to be cancel safe, but the first
+/// need not be if it is discarded when the enclosing function returns (because losing messages
+/// from the first argument doesn't matter in this case).
+macro_rules! await_or_stop {
+    ($future:expr, $stop_fut:expr) => {
+        select! {
+            Some(connecting) = $future => connecting,
+            _ = $stop_fut => break,
+        }
+    };
+}
+
+/// Type which receives messages sent over the network sent by a [Transmitter].
+pub struct Receiver {
+    recv_addr: Arc<BlockAddr>,
+    stop_tx: broadcast::Sender<()>,
+    endpoint: Endpoint,
+    resolver: Arc<CertResolver>,
+    join_handle: StdMutex<Option<JoinHandle<()>>>,
+}
+
+impl Receiver {
+    /// Returns a [Receiver] bound to the given [IpAddr] which receives messages at the bind path of
+    /// the [Writecap] in the given credentials. The returned type can be used to make
+    /// [Transmitter]s for any path.
+    pub fn new<C: 'static + Creds + Send + Sync, F: 'static + MsgCallback>(
+        ip_addr: IpAddr,
+        creds: Arc<C>,
+        callback: F,
+    ) -> Result<Receiver> {
+        let writecap = creds.writecap().ok_or(btlib::BlockError::MissingWritecap)?;
+        let recv_addr = Arc::new(BlockAddr::new(ip_addr, Arc::new(writecap.bind_path())));
+        log::info!("starting Receiver with address {}", recv_addr);
+        let socket_addr = recv_addr.socket_addr()?;
+        let resolver = Arc::new(CertResolver::new(creds)?);
+        let endpoint = Endpoint::server(server_config(resolver.clone())?, socket_addr)?;
+        let (stop_tx, stop_rx) = broadcast::channel(1);
+        let join_handle = tokio::spawn(Self::server_loop(endpoint.clone(), callback, stop_rx));
+        Ok(Self {
+            recv_addr,
+            stop_tx,
+            endpoint,
+            resolver,
+            join_handle: StdMutex::new(Some(join_handle)),
+        })
+    }
+
+    async fn server_loop<F: 'static + MsgCallback>(
+        endpoint: Endpoint,
+        callback: F,
+        mut stop_rx: broadcast::Receiver<()>,
+    ) {
+        loop {
+            let connecting = await_or_stop!(endpoint.accept(), stop_rx.recv());
+            let connection = unwrap_or_continue!(connecting.await, |err| error!(
+                "error accepting QUIC connection: {err}"
+            ));
+            tokio::spawn(Self::handle_connection(
+                connection,
+                callback.clone(),
+                stop_rx.resubscribe(),
+            ));
+        }
+    }
+
+    async fn handle_connection<F: 'static + MsgCallback>(
+        connection: Connection,
+        callback: F,
+        mut stop_rx: broadcast::Receiver<()>,
+    ) {
+        let client_path = unwrap_or_return!(
+            Self::client_path(connection.peer_identity()),
+            |err| error!("failed to get client path from peer identity: {err}")
+        );
+        loop {
+            let result = await_or_stop!(connection.accept_bi().map(Some), stop_rx.recv());
+            let (send_stream, recv_stream) = match result {
+                Ok(pair) => pair,
+                Err(err) => match err {
+                    ConnectionError::ApplicationClosed(app) => {
+                        debug!("connection closed: {app}");
+                        return;
+                    }
+                    _ => {
+                        error!("error accepting stream: {err}");
+                        continue;
+                    }
+                },
+            };
+            let client_path = client_path.clone();
+            let callback = callback.clone();
+            tokio::task::spawn(Self::handle_message(
+                client_path,
+                send_stream,
+                recv_stream,
+                callback,
+            ));
+        }
+    }
+
+    async fn handle_message<F: 'static + MsgCallback>(
+        client_path: Arc<BlockPath>,
+        send_stream: SendStream,
+        recv_stream: RecvStream,
+        callback: F,
+    ) {
+        let framed_msg = Arc::new(Mutex::new(FramedWrite::new(send_stream, MsgEncoder::new())));
+        let callback =
+            MsgRecvdCallback::new(client_path.clone(), framed_msg.clone(), callback.clone());
+        let mut msg_stream = CallbackFramed::new(recv_stream);
+        let result = msg_stream
+            .next(callback)
+            .await
+            .ok_or_else(|| bterr!("client closed stream before sending a message"));
+        match unwrap_or_return!(result) {
+            Err(err) => error!("msg_stream produced an error: {err}"),
+            Ok(result) => {
+                if let Err(err) = result {
+                    error!("callback returned an error: {err}");
+                }
+            }
+        }
+    }
+
+    /// Returns the path the client is bound to.
+    fn client_path(peer_identity: Option<Box<dyn Any>>) -> Result<Arc<BlockPath>> {
+        let peer_identity =
+            peer_identity.ok_or_else(|| bterr!("connection did not contain a peer identity"))?;
+        let client_certs = peer_identity
+            .downcast::<Vec<rustls::Certificate>>()
+            .map_err(|_| bterr!("failed to downcast peer_identity to certificate chain"))?;
+        let first = client_certs
+            .first()
+            .ok_or_else(|| bterr!("no certificates were presented by the client"))?;
+        let (writecap, ..) = Writecap::from_cert_chain(first, &client_certs[1..])?;
+        Ok(Arc::new(writecap.bind_path()))
+    }
+
+    /// The address at which messages will be received.
+    pub fn addr(&self) -> &Arc<BlockAddr> {
+        &self.recv_addr
+    }
+
+    /// Creates a [QuicTransmitter] which is connected to the given address.
+    pub async fn transmitter(&self, addr: Arc<BlockAddr>) -> Result<Transmitter> {
+        Transmitter::from_endpoint(self.endpoint.clone(), addr, self.resolver.clone()).await
+    }
+
+    /// Returns a future which completes when this [QuicReceiver] has completed
+    /// (which may be never).
+    pub fn complete(&self) -> Result<JoinHandle<()>> {
+        let mut guard = self.join_handle.lock().display_err()?;
+        let handle = guard
+            .take()
+            .ok_or_else(|| bterr!("join handle has already been taken"))?;
+        Ok(handle)
+    }
+
+    /// Sends a signal indicating that the task running the server loop should return.
+    pub fn stop(&self) -> Result<()> {
+        self.stop_tx.send(()).map(|_| ()).map_err(|err| err.into())
+    }
+}
+
+impl Drop for Receiver {
+    fn drop(&mut self) {
+        // This result will be a failure if the tasks have already returned, which is not a
+        // problem.
+        let _ = self.stop_tx.send(());
+    }
+}
+
+/// Trait for types which can be called to handle messages received over the network. The
+/// server loop in [Receiver] uses a type that implements this trait to react to messages it
+/// receives.
+pub trait MsgCallback: Clone + Send + Sync + Unpin {
+    type Arg<'de>: CallMsg<'de>
+    where
+        Self: 'de;
+    type CallFut<'de>: Future<Output = Result<()>> + Send
+    where
+        Self: 'de;
+    fn call<'de>(&'de self, arg: MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de>;
+}
+
+impl<T: MsgCallback> MsgCallback for &T {
+    type Arg<'de> = T::Arg<'de> where Self: 'de;
+    type CallFut<'de> = T::CallFut<'de> where Self: 'de;
+    fn call<'de>(&'de self, arg: MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de> {
+        (*self).call(arg)
+    }
+}
+
+struct MsgRecvdCallback<F> {
+    path: Arc<BlockPath>,
+    replier: Replier,
+    inner: F,
+}
+
+impl<F: MsgCallback> MsgRecvdCallback<F> {
+    fn new(path: Arc<BlockPath>, framed_msg: Arc<Mutex<FramedMsg>>, inner: F) -> Self {
+        Self {
+            path,
+            replier: Replier::new(framed_msg),
+            inner,
+        }
+    }
+}
+
+impl<F: 'static + MsgCallback> DeserCallback for MsgRecvdCallback<F> {
+    type Arg<'de> = Envelope<F::Arg<'de>> where Self: 'de;
+    type Return = Result<()>;
+    type CallFut<'de> = impl 'de + Future<Output = Self::Return> + Send where F: 'de, Self: 'de;
+    fn call<'de>(&'de mut self, arg: Envelope<F::Arg<'de>>) -> Self::CallFut<'de> {
+        let replier = match arg.kind {
+            MsgKind::Call => Some(self.replier.clone()),
+            MsgKind::Send => None,
+        };
+        async move {
+            let result = self
+                .inner
+                .call(MsgReceived::new(self.path.clone(), arg, replier))
+                .await;
+            match result {
+                Ok(value) => Ok(value),
+                Err(err) => match err.downcast::<io::Error>() {
+                    Ok(err) => {
+                        self.replier
+                            .reply_err(err.to_string(), err.raw_os_error())
+                            .await
+                    }
+                    Err(err) => self.replier.reply_err(err.to_string(), None).await,
+                },
+            }
+        }
+    }
+}
+
+/// Indicates whether a message was sent using `call` or `send`.
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
+enum MsgKind {
+    /// This message expects exactly one reply.
+    Call,
+    /// This message expects exactly zero replies.
+    Send,
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
+pub(crate) struct Envelope<T> {
+    kind: MsgKind,
+    msg: T,
+}
+
+impl<T> Envelope<T> {
+    pub(crate) fn send(msg: T) -> Self {
+        Self {
+            msg,
+            kind: MsgKind::Send,
+        }
+    }
+
+    pub(crate) fn call(msg: T) -> Self {
+        Self {
+            msg,
+            kind: MsgKind::Call,
+        }
+    }
+
+    fn msg(&self) -> &T {
+        &self.msg
+    }
+}
+
+#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
+pub(crate) enum ReplyEnvelope<T> {
+    Ok(T),
+    Err {
+        message: String,
+        os_code: Option<i32>,
+    },
+}
+
+impl<T> ReplyEnvelope<T> {
+    fn err(message: String, os_code: Option<i32>) -> Self {
+        Self::Err { message, os_code }
+    }
+}
+
+/// A message tagged with the block path that it was sent from.
+pub struct MsgReceived<T> {
+    from: Arc<BlockPath>,
+    msg: Envelope<T>,
+    replier: Option<Replier>,
+}
+
+impl<T> MsgReceived<T> {
+    fn new(from: Arc<BlockPath>, msg: Envelope<T>, replier: Option<Replier>) -> Self {
+        Self { from, msg, replier }
+    }
+
+    pub fn into_parts(self) -> (Arc<BlockPath>, T, Option<Replier>) {
+        (self.from, self.msg.msg, self.replier)
+    }
+
+    /// The path from which this message was received.
+    pub fn from(&self) -> &Arc<BlockPath> {
+        &self.from
+    }
+
+    /// Payload contained in this message.
+    pub fn body(&self) -> &T {
+        self.msg.msg()
+    }
+
+    /// Returns true if and only if this messages needs to be replied to.
+    pub fn needs_reply(&self) -> bool {
+        self.replier.is_some()
+    }
+
+    /// Takes the replier out of this struct and returns it, if it has not previously been returned.
+    pub fn take_replier(&mut self) -> Option<Replier> {
+        self.replier.take()
+    }
+}
+
+type FramedMsg = FramedWrite<SendStream, MsgEncoder>;
+type ArcMutex<T> = Arc<Mutex<T>>;
+
+/// A type for sending a reply to a message. Replies are sent over their own streams, so no two
+/// messages can interfere with one another.
+#[derive(Clone)]
+pub struct Replier {
+    stream: ArcMutex<FramedMsg>,
+}
+
+impl Replier {
+    fn new(stream: ArcMutex<FramedMsg>) -> Self {
+        Self { stream }
+    }
+
+    pub async fn reply<T: Serialize + Send>(&mut self, reply: T) -> Result<()> {
+        let mut guard = self.stream.lock().await;
+        guard.send(ReplyEnvelope::Ok(reply)).await?;
+        Ok(())
+    }
+
+    pub async fn reply_err(&mut self, err: String, os_code: Option<i32>) -> Result<()> {
+        let mut guard = self.stream.lock().await;
+        guard.send(ReplyEnvelope::<()>::err(err, os_code)).await?;
+        Ok(())
+    }
+}

+ 44 - 8
crates/btmsg/src/callback_framed.rs → crates/btmsg/src/serialization.rs

@@ -1,12 +1,17 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
-use btlib::{error::BoxInIoErr, Result};
-use btserde::{from_slice, read_from};
-use bytes::BytesMut;
+//! Types for serializing and deserializing messages.
+
+use btlib::error::BoxInIoErr;
+use btserde::{from_slice, read_from, write_to};
+use bytes::{BufMut, BytesMut};
 use futures::Future;
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
 use tokio::io::{AsyncRead, AsyncReadExt};
+use tokio_util::codec::Encoder;
+
+use crate::Result;
 
-pub struct CallbackFramed<I> {
+pub(crate) struct CallbackFramed<I> {
     io: I,
     buffer: BytesMut,
 }
@@ -66,7 +71,10 @@ macro_rules! attempt {
 }
 
 impl<S: AsyncRead + Unpin> CallbackFramed<S> {
-    pub async fn next<F: DeserCallback>(&mut self, mut callback: F) -> Option<Result<F::Return>> {
+    pub(crate) async fn next<F: DeserCallback>(
+        &mut self,
+        mut callback: F,
+    ) -> Option<Result<F::Return>> {
         let mut total_read = 0;
         loop {
             if self.buffer.capacity() - self.buffer.len() == 0 {
@@ -102,6 +110,8 @@ enum DecodeStatus {
     Consume(usize),
 }
 
+/// A trait for types which can be called to asynchronously handle deserialization. This trait is
+/// what enables zero-copy handling of messages which support borrowing data during deserialization.
 pub trait DeserCallback {
     type Arg<'de>: 'de + Deserialize<'de> + Send
     where
@@ -122,12 +132,38 @@ impl<'a, T: DeserCallback> DeserCallback for &'a mut T {
     }
 }
 
+/// Encodes messages using [btserde].
+#[derive(Debug)]
+pub(crate) struct MsgEncoder;
+
+impl MsgEncoder {
+    pub(crate) fn new() -> Self {
+        Self
+    }
+}
+
+impl<T: Serialize> Encoder<T> for MsgEncoder {
+    type Error = btlib::Error;
+
+    fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<()> {
+        const U64_LEN: usize = std::mem::size_of::<u64>();
+        let payload = dst.split_off(U64_LEN);
+        let mut writer = payload.writer();
+        write_to(&item, &mut writer)?;
+        let payload = writer.into_inner();
+        let payload_len = payload.len() as u64;
+        let mut writer = dst.writer();
+        write_to(&payload_len, &mut writer)?;
+        let dst = writer.into_inner();
+        dst.unsplit(payload);
+        Ok(())
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
 
-    use crate::MsgEncoder;
-
     use futures::{future::Ready, SinkExt};
     use serde::Serialize;
     use std::{

+ 5 - 3
crates/btmsg/src/tls.rs

@@ -1,7 +1,8 @@
 // SPDX-License-Identifier: AGPL-3.0-or-later
-//! This module is for types which contribute to the configuration of TLS.
+//! This module contains types which contribute to the configuration of TLS.
+
+use std::sync::Arc;
 
-use crate::Result;
 use btlib::{
     crypto::{Creds, CredsPriv, HashKind, Scheme, Sign, Verifier},
     BlockPath, Principal, Writecap,
@@ -17,7 +18,8 @@ use rustls::{
     Certificate, ConfigBuilder, ConfigSide, SignatureAlgorithm, SignatureScheme, WantsCipherSuites,
     WantsVerifier,
 };
-use std::sync::Arc;
+
+use crate::Result;
 
 pub(crate) fn server_config(resolver: Arc<CertResolver>) -> Result<ServerConfig> {
     let server_config = common_config(rustls::ServerConfig::builder())?

+ 220 - 0
crates/btmsg/src/transmitter.rs

@@ -0,0 +1,220 @@
+// SPDX-License-Identifier: AGPL-3.0-or-later
+//! Contains the [Transmitter] type.
+
+use std::{
+    future::{ready, Future, Ready},
+    io,
+    marker::PhantomData,
+    net::{IpAddr, Ipv6Addr, SocketAddr},
+    sync::Arc,
+};
+
+use btlib::{bterr, crypto::Creds};
+use bytes::BytesMut;
+use futures::SinkExt;
+use quinn::{Connection, Endpoint, RecvStream, SendStream};
+use serde::{de::DeserializeOwned, Serialize};
+use tokio::sync::Mutex;
+use tokio_util::codec::{Framed, FramedParts};
+
+use crate::{
+    receiver::{Envelope, ReplyEnvelope},
+    serialization::{CallbackFramed, MsgEncoder},
+    tls::{client_config, CertResolver},
+    BlockAddr, CallMsg, DeserCallback, Result, SendMsg,
+};
+
+/// A type which can be used to transmit messages over the network to a [crate::Receiver].
+pub struct Transmitter {
+    addr: Arc<BlockAddr>,
+    connection: Connection,
+    send_parts: Mutex<Option<FramedParts<SendStream, MsgEncoder>>>,
+    recv_buf: Mutex<Option<BytesMut>>,
+}
+
+macro_rules! cleanup_on_err {
+    ($result:expr, $guard:ident, $parts:ident) => {
+        match $result {
+            Ok(value) => value,
+            Err(err) => {
+                *$guard = Some($parts);
+                return Err(err.into());
+            }
+        }
+    };
+}
+
+impl Transmitter {
+    pub async fn new<C: 'static + Creds + Send + Sync>(
+        addr: Arc<BlockAddr>,
+        creds: Arc<C>,
+    ) -> Result<Transmitter> {
+        let resolver = Arc::new(CertResolver::new(creds)?);
+        let endpoint = Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0))?;
+        Transmitter::from_endpoint(endpoint, addr, resolver).await
+    }
+
+    pub(crate) async fn from_endpoint(
+        endpoint: Endpoint,
+        addr: Arc<BlockAddr>,
+        resolver: Arc<CertResolver>,
+    ) -> Result<Self> {
+        let socket_addr = addr.socket_addr()?;
+        let connecting = endpoint.connect_with(
+            client_config(addr.path().clone(), resolver)?,
+            socket_addr,
+            // The ServerCertVerifier ensures we connect to the correct path.
+            "UNIMPORTANT",
+        )?;
+        let connection = connecting.await?;
+        let send_parts = Mutex::new(None);
+        let recv_buf = Mutex::new(Some(BytesMut::new()));
+        Ok(Self {
+            addr,
+            connection,
+            send_parts,
+            recv_buf,
+        })
+    }
+
+    async fn transmit<T: Serialize>(&self, envelope: Envelope<T>) -> Result<RecvStream> {
+        let mut guard = self.send_parts.lock().await;
+        let (send_stream, recv_stream) = self.connection.open_bi().await?;
+        let parts = match guard.take() {
+            Some(mut parts) => {
+                parts.io = send_stream;
+                parts
+            }
+            None => FramedParts::new::<Envelope<T>>(send_stream, MsgEncoder::new()),
+        };
+        let mut sink = Framed::from_parts(parts);
+        let result = sink.send(envelope).await;
+        let parts = sink.into_parts();
+        cleanup_on_err!(result, guard, parts);
+        *guard = Some(parts);
+        Ok(recv_stream)
+    }
+
+    /// Returns the address that this instance is transmitting to.
+    pub fn addr(&self) -> &Arc<BlockAddr> {
+        &self.addr
+    }
+
+    /// Transmit a message to the connected [QuicReceiver] without waiting for a reply.
+    pub async fn send<'ser, 'de, T>(&self, msg: T) -> Result<()>
+    where
+        T: 'ser + SendMsg<'de>,
+    {
+        self.transmit(Envelope::send(msg)).await?;
+        Ok(())
+    }
+
+    /// Transmit a message to the connected [QuicReceiver], waits for a reply, then calls the given
+    /// [DeserCallback] with the deserialized reply.
+    ///
+    /// ## WARNING
+    /// The callback must be such that `F::Arg<'a> = T::Reply<'a>` for any `'a`. If this
+    /// is violated, then a deserilization error will occur at runtime.
+    ///
+    /// ## TODO
+    /// This issue needs to be fixed. Due to the fact that
+    /// `F::Arg` is a Generic Associated Type (GAT) I have been unable to express this constraint in
+    /// the where clause of this method. I'm not sure if the errors I've encountered are due to a
+    /// lack of understanding on my part or due to the current limitations of the borrow checker in
+    /// its handling of GATs.
+    pub async fn call<'ser, 'de, T, F>(&self, msg: T, callback: F) -> Result<F::Return>
+    where
+        T: 'ser + CallMsg<'de>,
+        F: 'static + Send + DeserCallback,
+    {
+        let recv_stream = self.transmit(Envelope::call(msg)).await?;
+        let mut guard = self.recv_buf.lock().await;
+        let buffer = guard.take().unwrap();
+        let mut callback_framed = CallbackFramed::from_parts(recv_stream, buffer);
+        let result = callback_framed
+            .next(ReplyCallback::new(callback))
+            .await
+            .ok_or_else(|| bterr!("server hung up before sending reply"));
+        let (_, buffer) = callback_framed.into_parts();
+        let output = cleanup_on_err!(result, guard, buffer);
+        *guard = Some(buffer);
+        output?
+    }
+
+    /// Transmits a message to the connected [QuicReceiver], waits for a reply, then passes back the
+    /// the reply to the caller. This only works for messages whose reply doesn't borrow any data,
+    /// otherwise the `call` method must be used.
+    pub async fn call_through<'ser, T>(&self, msg: T) -> Result<T::Reply<'static>>
+    where
+        // TODO: CallMsg must take a static lifetime until this issue is resolved:
+        //     https://github.com/rust-lang/rust/issues/103532
+        T: 'ser + CallMsg<'static>,
+        T::Reply<'static>: 'static + Send + Sync + DeserializeOwned,
+    {
+        self.call(msg, Passthrough::new()).await
+    }
+}
+
+struct ReplyCallback<F> {
+    inner: F,
+}
+
+impl<F> ReplyCallback<F> {
+    fn new(inner: F) -> Self {
+        Self { inner }
+    }
+}
+
+impl<F: 'static + Send + DeserCallback> DeserCallback for ReplyCallback<F> {
+    type Arg<'de> = ReplyEnvelope<F::Arg<'de>>;
+    type Return = Result<F::Return>;
+    type CallFut<'de> = impl 'de + Future<Output = Self::Return> + Send;
+    fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
+        async move {
+            match arg {
+                ReplyEnvelope::Ok(msg) => Ok(self.inner.call(msg).await),
+                ReplyEnvelope::Err { message, os_code } => {
+                    if let Some(os_code) = os_code {
+                        let err = bterr!(io::Error::from_raw_os_error(os_code)).context(message);
+                        Err(err)
+                    } else {
+                        Err(bterr!(message))
+                    }
+                }
+            }
+        }
+    }
+}
+
+pub struct Passthrough<T> {
+    phantom: PhantomData<T>,
+}
+
+impl<T> Passthrough<T> {
+    pub fn new() -> Self {
+        Self {
+            phantom: PhantomData,
+        }
+    }
+}
+
+impl<T> Default for Passthrough<T> {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl<T> Clone for Passthrough<T> {
+    fn clone(&self) -> Self {
+        Self::new()
+    }
+}
+
+impl<T: 'static + Send + DeserializeOwned> DeserCallback for Passthrough<T> {
+    type Arg<'de> = T;
+    type Return = T;
+    type CallFut<'de> = Ready<T>;
+    fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
+        ready(arg)
+    }
+}

+ 10 - 7
crates/btmsg/tests/tests.rs

@@ -139,21 +139,24 @@ fn proc_creds() -> impl Creds {
     creds
 }
 
-fn proc_rx<F: 'static + MsgCallback>(callback: F) -> (impl Receiver, Arc<BlockAddr>) {
+fn proc_rx<F: 'static + MsgCallback>(callback: F) -> (Receiver, Arc<BlockAddr>) {
     let ip_addr = IpAddr::V6(Ipv6Addr::LOCALHOST);
     let creds = proc_creds();
     let writecap = creds.writecap().unwrap();
     let addr = Arc::new(BlockAddr::new(ip_addr, Arc::new(writecap.bind_path())));
-    (receiver(ip_addr, Arc::new(creds), callback).unwrap(), addr)
+    (
+        Receiver::new(ip_addr, Arc::new(creds), callback).unwrap(),
+        addr,
+    )
 }
 
-async fn proc_tx_rx<F: 'static + MsgCallback>(func: F) -> (impl Transmitter, impl Receiver) {
+async fn proc_tx_rx<F: 'static + MsgCallback>(func: F) -> (Transmitter, Receiver) {
     let (receiver, addr) = proc_rx(func);
     let sender = receiver.transmitter(addr).await.unwrap();
     (sender, receiver)
 }
 
-async fn file_server() -> (impl Transmitter, impl Receiver) {
+async fn file_server() -> (Transmitter, Receiver) {
     let (sender, _) = mpsc::channel::<()>(1);
     let file = Arc::new(SyncMutex::new([0, 1, 2, 3, 4, 5, 6, 7]));
     proc_tx_rx(Delegate::new(
@@ -242,7 +245,7 @@ async fn message_received_from_path_is_correct() {
 
     sender.send(Msg::Ping).await.unwrap();
 
-    assert_eq!(receiver.addr().path(), recv!(path).as_ref());
+    assert_eq!(receiver.addr().path().as_ref(), recv!(path).as_ref());
 }
 
 #[tokio::test]
@@ -292,7 +295,7 @@ async fn call_twice() {
 async fn separate_transmitter() {
     let (_sender, receiver) = file_server().await;
     let creds = proc_creds();
-    let transmitter = transmitter(receiver.addr().clone(), Arc::new(creds))
+    let transmitter = Transmitter::new(receiver.addr().clone(), Arc::new(creds))
         .await
         .unwrap();
 
@@ -383,7 +386,7 @@ impl<Arg: for<'a> CallMsg<'a>, Fut: Send + Future<Output = btlib::Result<()>>> M
     }
 }
 
-async fn read_server() -> (impl Transmitter, impl Receiver) {
+async fn read_server() -> (Transmitter, Receiver) {
     let file = [0, 1, 2, 3, 4, 5, 6, 7];
     proc_tx_rx(Action::new(move |mut msg: MsgReceived<Read>| async move {
         let body = msg.body();

+ 7 - 7
crates/btrun/src/lib.rs

@@ -26,11 +26,11 @@ use uuid::Uuid;
 pub fn new_runtime<C: 'static + Send + Sync + Creds>(
     ip_addr: IpAddr,
     creds: Arc<C>,
-) -> Result<Runtime<impl Receiver>> {
+) -> Result<Runtime> {
     let path = Arc::new(creds.bind_path()?);
     let handles = Arc::new(RwLock::new(HashMap::new()));
     let callback = RuntimeCallback::new(handles.clone());
-    let rx = btmsg::receiver(ip_addr, creds, callback)?;
+    let rx = Receiver::new(ip_addr, creds, callback)?;
     Ok(Runtime {
         _rx: rx,
         path,
@@ -45,14 +45,14 @@ pub fn new_runtime<C: 'static + Send + Sync + Creds>(
 /// can be sent messages using the `send` method, which does not wait for a response from the
 /// recipient. If a reply is needed, then `call` can be used, which returns a future that will not
 /// be ready until the reply has been received.
-pub struct Runtime<Rx: Receiver> {
-    _rx: Rx,
+pub struct Runtime {
+    _rx: Receiver,
     path: Arc<BlockPath>,
     handles: Arc<RwLock<HashMap<Uuid, ActorHandle>>>,
-    peers: RwLock<HashMap<Arc<BlockPath>, Rx::Transmitter>>,
+    peers: RwLock<HashMap<Arc<BlockPath>, Transmitter>>,
 }
 
-impl<Rx: Receiver> Runtime<Rx> {
+impl Runtime {
     pub fn path(&self) -> &Arc<BlockPath> {
         &self.path
     }
@@ -494,7 +494,7 @@ mod tests {
         let actor_name = runtime.activate(echo).await;
         let bind_path = Arc::new(creds.bind_path().unwrap());
         let block_addr = Arc::new(BlockAddr::new(ip_addr, bind_path));
-        let transmitter = btmsg::transmitter(block_addr, creds).await.unwrap();
+        let transmitter = Transmitter::new(block_addr, creds).await.unwrap();
         let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
         let wire_msg = WireMsg {
             to: actor_name.act_id,