|
@@ -1,5 +1,10 @@
|
|
|
//! Code which enables sending messages between processes in the blocktree system.
|
|
|
-use btlib::{crypto::rand_array, error::BoxInIoErr, Principal, Result};
|
|
|
+use btlib::{
|
|
|
+ bterr,
|
|
|
+ crypto::{rand_array, ConcreteCreds, CredsPriv, CredsPub},
|
|
|
+ error::BoxInIoErr,
|
|
|
+ Principal, Result,
|
|
|
+};
|
|
|
use btserde::{read_from, write_to};
|
|
|
use bytes::{BufMut, BytesMut};
|
|
|
use core::{
|
|
@@ -8,24 +13,33 @@ use core::{
|
|
|
task::{Context, Poll},
|
|
|
};
|
|
|
use futures::{
|
|
|
- sink::{Send, Sink},
|
|
|
+ sink::{Close, Send, Sink},
|
|
|
stream::Stream,
|
|
|
SinkExt, StreamExt,
|
|
|
};
|
|
|
use lazy_static::lazy_static;
|
|
|
+use log::error;
|
|
|
+use quinn::{ClientConfig, Endpoint, SendStream, ServerConfig};
|
|
|
+use rustls::{Certificate, PrivateKey, ConfigBuilder, ConfigSide, WantsCipherSuites, WantsVerifier};
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
use std::{
|
|
|
+ collections::hash_map::DefaultHasher,
|
|
|
+ hash::{Hash, Hasher},
|
|
|
io,
|
|
|
marker::PhantomData,
|
|
|
- net::Shutdown,
|
|
|
+ net::{IpAddr, Ipv6Addr, Shutdown, SocketAddr},
|
|
|
path::PathBuf,
|
|
|
- collections::hash_map::DefaultHasher,
|
|
|
- hash::{Hash, Hasher},
|
|
|
+ sync::Arc,
|
|
|
};
|
|
|
use tokio::{
|
|
|
io::{AsyncRead, AsyncWrite, ReadBuf},
|
|
|
net::UnixDatagram,
|
|
|
+ sync::{
|
|
|
+ broadcast::{self, error::TryRecvError},
|
|
|
+ mpsc,
|
|
|
+ },
|
|
|
};
|
|
|
+use tokio_stream::wrappers::ReceiverStream;
|
|
|
use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
|
|
|
use zerocopy::FromBytes;
|
|
|
|
|
@@ -35,6 +49,24 @@ mod private {
|
|
|
|
|
|
use super::*;
|
|
|
|
|
|
+ /// Returns a [Receiver] which can be used to receive messages addressed to the given path.
|
|
|
+ /// The `fs_path` argument specifies the filesystem directory under which the receiver's socket
|
|
|
+ /// will be stored.
|
|
|
+ pub fn local_receiver<T: for<'de> Deserialize<'de> + core::marker::Send + 'static>(
|
|
|
+ addr: BlockAddr,
|
|
|
+ creds: &ConcreteCreds,
|
|
|
+ ) -> Result<impl Receiver<T>> {
|
|
|
+ QuicReceiver::new(addr, creds)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Returns a [Sender] which can be used to send messages to the given Blocktree path.
|
|
|
+ /// The `fs_path` argument specifies the filesystem directory in which to locate the
|
|
|
+ /// socket of the recipient.
|
|
|
+ pub async fn local_sender(addr: BlockAddr) -> Result<impl Sender> {
|
|
|
+ let result = QuicSender::new(addr).await;
|
|
|
+ result
|
|
|
+ }
|
|
|
+
|
|
|
lazy_static! {
|
|
|
/// The default directory in which to place blocktree sockets.
|
|
|
static ref SOCK_DIR: PathBuf = {
|
|
@@ -45,10 +77,42 @@ mod private {
|
|
|
}
|
|
|
|
|
|
/// Appends the given Blocktree path to the path of the given directory.
|
|
|
+ #[allow(dead_code)]
|
|
|
fn socket_path(fs_path: &mut PathBuf, addr: &BlockAddr) {
|
|
|
fs_path.push(addr.num.value().to_string());
|
|
|
}
|
|
|
|
|
|
+ fn common_config<Side: ConfigSide>(
|
|
|
+ builder: ConfigBuilder<Side, WantsCipherSuites>,
|
|
|
+ ) -> Result<ConfigBuilder<Side, WantsVerifier>> {
|
|
|
+ builder
|
|
|
+ .with_cipher_suites(&[rustls::cipher_suite::TLS13_AES_128_GCM_SHA256])
|
|
|
+ .with_kx_groups(&[&rustls::kx_group::SECP256R1])
|
|
|
+ .with_protocol_versions(&[&rustls::version::TLS13])
|
|
|
+ .map_err(|err| err.into())
|
|
|
+ }
|
|
|
+
|
|
|
+ fn server_config(creds: &ConcreteCreds) -> Result<ServerConfig> {
|
|
|
+ let writecap = creds.writecap().ok_or(btlib::BlockError::MissingWritecap)?;
|
|
|
+ let chain = writecap.to_cert_chain(creds.public_sign())?;
|
|
|
+ let mut cert_chain = Vec::with_capacity(chain.len());
|
|
|
+ for cert in chain {
|
|
|
+ cert_chain.push(Certificate(cert.encode_der()?))
|
|
|
+ }
|
|
|
+ let key = PrivateKey(creds.private_sign().to_der()?);
|
|
|
+ let server_config = common_config(rustls::ServerConfig::builder())?
|
|
|
+ .with_no_client_auth()
|
|
|
+ .with_single_cert(cert_chain, key)?;
|
|
|
+ Ok(ServerConfig::with_crypto(Arc::new(server_config)))
|
|
|
+ }
|
|
|
+
|
|
|
+ fn client_config() -> Result<ClientConfig> {
|
|
|
+ let client_config = common_config(rustls::ClientConfig::builder())?
|
|
|
+ .with_custom_certificate_verifier(CertVerifier::new())
|
|
|
+ .with_no_client_auth();
|
|
|
+ Ok(ClientConfig::new(Arc::new(client_config)))
|
|
|
+ }
|
|
|
+
|
|
|
/// An identifier for a block. Persistent blocks (files, directories, and servers) are
|
|
|
/// identified by the `Inode` variant and transient blocks (processes) are identified by the
|
|
|
/// PID variant.
|
|
@@ -132,6 +196,8 @@ mod private {
|
|
|
}
|
|
|
|
|
|
/// A type which can be used to send messages.
|
|
|
+ /// Once the "Permit impl Trait in type aliases" https://github.com/rust-lang/rust/issues/63063
|
|
|
+ /// feature lands the future types in this trait should be rewritten to use it.
|
|
|
pub trait Sender {
|
|
|
type SendFut<'a, T>: 'a + Future<Output = Result<()>> + core::marker::Send
|
|
|
where
|
|
@@ -143,14 +209,18 @@ mod private {
|
|
|
msg: Msg<T>,
|
|
|
) -> Self::SendFut<'a, T>;
|
|
|
|
|
|
+ type FinishFut: Future<Output = Result<()>> + core::marker::Send;
|
|
|
+
|
|
|
+ fn finish(self) -> Self::FinishFut;
|
|
|
+
|
|
|
fn addr(&self) -> &BlockAddr;
|
|
|
|
|
|
fn send_msg<'a, T: 'a + Serialize + core::marker::Send>(
|
|
|
&'a mut self,
|
|
|
- to: BlockAddr,
|
|
|
+ from: BlockAddr,
|
|
|
body: T,
|
|
|
) -> Self::SendFut<'a, T> {
|
|
|
- let msg = Msg::with_rand_id(to, self.addr().clone(), body).unwrap();
|
|
|
+ let msg = Msg::with_rand_id(self.addr().clone(), from, body).unwrap();
|
|
|
self.send(msg)
|
|
|
}
|
|
|
}
|
|
@@ -163,6 +233,12 @@ mod private {
|
|
|
/// Encodes and decodes messages using [btserde].
|
|
|
struct MsgEncoder;
|
|
|
|
|
|
+ impl MsgEncoder {
|
|
|
+ fn new() -> Self {
|
|
|
+ Self
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
impl<T: Serialize> Encoder<Msg<T>> for MsgEncoder {
|
|
|
type Error = btlib::Error;
|
|
|
|
|
@@ -183,6 +259,12 @@ mod private {
|
|
|
|
|
|
struct MsgDecoder<T>(PhantomData<T>);
|
|
|
|
|
|
+ impl<T> MsgDecoder<T> {
|
|
|
+ fn new() -> Self {
|
|
|
+ Self(PhantomData)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
impl<T: for<'de> Deserialize<'de>> Decoder for MsgDecoder<T> {
|
|
|
type Item = Msg<T>;
|
|
|
type Error = btlib::Error;
|
|
@@ -218,6 +300,7 @@ mod private {
|
|
|
}
|
|
|
|
|
|
impl DatagramAdapter {
|
|
|
+ #[allow(dead_code)]
|
|
|
fn new(socket: UnixDatagram) -> Self {
|
|
|
Self { socket }
|
|
|
}
|
|
@@ -278,6 +361,7 @@ mod private {
|
|
|
}
|
|
|
|
|
|
impl<T: for<'de> Deserialize<'de>> UnixReceiver<T> {
|
|
|
+ #[allow(dead_code)]
|
|
|
fn new(mut fs_path: PathBuf, addr: BlockAddr) -> Result<Self> {
|
|
|
socket_path(&mut fs_path, &addr);
|
|
|
let socket = DatagramAdapter::new(UnixDatagram::bind(fs_path)?);
|
|
@@ -300,16 +384,6 @@ mod private {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// Returns a [Receiver] which can be used to receive messages addressed to the given path.
|
|
|
- /// The `fs_path` argument specifies the filesystem directory under which the receiver's socket
|
|
|
- /// will be stored.
|
|
|
- pub fn local_receiver<T: for<'de> Deserialize<'de>>(
|
|
|
- fs_path: PathBuf,
|
|
|
- addr: BlockAddr,
|
|
|
- ) -> Result<impl Receiver<T>> {
|
|
|
- UnixReceiver::new(fs_path, addr)
|
|
|
- }
|
|
|
-
|
|
|
/// An implementation of [Sender] which uses a Unix datagram socket to send messages.
|
|
|
struct UnixSender {
|
|
|
addr: BlockAddr,
|
|
@@ -317,6 +391,7 @@ mod private {
|
|
|
}
|
|
|
|
|
|
impl UnixSender {
|
|
|
+ #[allow(dead_code)]
|
|
|
fn new(mut fs_path: PathBuf, addr: BlockAddr) -> Result<Self> {
|
|
|
let socket = UnixDatagram::unbound()?;
|
|
|
socket_path(&mut fs_path, &addr);
|
|
@@ -367,13 +442,182 @@ mod private {
|
|
|
) -> Self::SendFut<'a, T> {
|
|
|
self.socket.send(msg)
|
|
|
}
|
|
|
+
|
|
|
+ type FinishFut = Pin<Box<dyn Future<Output = Result<()>> + core::marker::Send>>;
|
|
|
+
|
|
|
+ fn finish(mut self) -> Self::FinishFut {
|
|
|
+ Box::pin(async move {
|
|
|
+ let fut: Close<'_, _, Msg<()>> = self.socket.close();
|
|
|
+ fut.await
|
|
|
+ })
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- /// Returns a [Sender] which can be used to send messages to the given Blocktree path.
|
|
|
- /// The `fs_path` argument specifies the filesystem directory in which to locate the
|
|
|
- /// socket of the recipient.
|
|
|
- pub fn local_sender(fs_path: PathBuf, addr: BlockAddr) -> Result<impl Sender> {
|
|
|
- UnixSender::new(fs_path, addr)
|
|
|
+ /// Causes the current function to return if the given `rx` has received a stop signal.
|
|
|
+ macro_rules! check_stop {
|
|
|
+ ($rx:expr) => {
|
|
|
+ match $rx.try_recv() {
|
|
|
+ Ok(_) => return,
|
|
|
+ Err(err) => {
|
|
|
+ if let TryRecvError::Closed = err {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ struct QuicReceiver<T> {
|
|
|
+ addr: BlockAddr,
|
|
|
+ stop_tx: broadcast::Sender<()>,
|
|
|
+ stream: ReceiverStream<Result<Msg<T>>>,
|
|
|
+ }
|
|
|
+
|
|
|
+ impl<T: for<'de> Deserialize<'de> + core::marker::Send + 'static> QuicReceiver<T> {
|
|
|
+ /// The size of the buffer to store received messages in.
|
|
|
+ const MSG_BUF_SZ: usize = 64;
|
|
|
+
|
|
|
+ fn new(addr: BlockAddr, creds: &ConcreteCreds) -> Result<Self> {
|
|
|
+ let config = server_config(creds)?;
|
|
|
+ let port = addr.port();
|
|
|
+ let socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port);
|
|
|
+ let endpoint = Endpoint::server(config, socket_addr)?;
|
|
|
+ let (stop_tx, mut stop_rx) = broadcast::channel(1);
|
|
|
+ let (msg_tx, msg_rx) = mpsc::channel(Self::MSG_BUF_SZ);
|
|
|
+ tokio::spawn(async move {
|
|
|
+ loop {
|
|
|
+ check_stop!(stop_rx);
|
|
|
+ let connecting = match endpoint.accept().await {
|
|
|
+ Some(connection) => connection,
|
|
|
+ None => break,
|
|
|
+ };
|
|
|
+ let connection = match connecting.await {
|
|
|
+ Ok(connection) => connection,
|
|
|
+ Err(err) => {
|
|
|
+ error!("error accepting QUIC connection: {err}");
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ let conn_msg_tx = msg_tx.clone();
|
|
|
+ let mut conn_stop_rx = stop_rx.resubscribe();
|
|
|
+ tokio::spawn(async move {
|
|
|
+ let recv_stream = match connection.accept_uni().await {
|
|
|
+ Ok(recv_stream) => recv_stream,
|
|
|
+ Err(err) => {
|
|
|
+ error!("error accepting receive stream: {err}");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ let mut msg_stream = FramedRead::new(recv_stream, MsgDecoder::new());
|
|
|
+ loop {
|
|
|
+ check_stop!(conn_stop_rx);
|
|
|
+ let result = match msg_stream.next().await {
|
|
|
+ Some(result) => result,
|
|
|
+ None => return,
|
|
|
+ };
|
|
|
+ if let Err(err) = conn_msg_tx.send(result).await {
|
|
|
+ error!("error sending message to mpsc queue: {err}");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ });
|
|
|
+ Ok(Self {
|
|
|
+ addr,
|
|
|
+ stop_tx,
|
|
|
+ stream: ReceiverStream::new(msg_rx),
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ impl<T> Drop for QuicReceiver<T> {
|
|
|
+ fn drop(&mut self) {
|
|
|
+ // This result will be a failure if the tasks have already returned, which is not a
|
|
|
+ // problem.
|
|
|
+ let _ = self.stop_tx.send(());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ impl<T: for<'de> Deserialize<'de> + core::marker::Send + 'static> Stream for QuicReceiver<T> {
|
|
|
+ type Item = Result<Msg<T>>;
|
|
|
+
|
|
|
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
|
+ self.stream.poll_next_unpin(cx)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ impl<T: for<'de> Deserialize<'de> + core::marker::Send + 'static> Receiver<T> for QuicReceiver<T> {
|
|
|
+ fn addr(&self) -> &BlockAddr {
|
|
|
+ &self.addr
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ struct CertVerifier;
|
|
|
+
|
|
|
+ impl CertVerifier {
|
|
|
+ fn new() -> Arc<Self> {
|
|
|
+ Arc::new(Self)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ impl rustls::client::ServerCertVerifier for CertVerifier {
|
|
|
+ fn verify_server_cert(
|
|
|
+ &self,
|
|
|
+ _end_entity: &Certificate,
|
|
|
+ _intermediates: &[Certificate],
|
|
|
+ _server_name: &rustls::ServerName,
|
|
|
+ _scts: &mut dyn Iterator<Item = &[u8]>,
|
|
|
+ _ocsp_response: &[u8],
|
|
|
+ _now: std::time::SystemTime,
|
|
|
+ ) -> std::result::Result<rustls::client::ServerCertVerified, rustls::Error> {
|
|
|
+ // TODO: Implement certificate verification.
|
|
|
+ Ok(rustls::client::ServerCertVerified::assertion())
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ struct QuicSender {
|
|
|
+ addr: BlockAddr,
|
|
|
+ sink: FramedWrite<SendStream, MsgEncoder>,
|
|
|
+ }
|
|
|
+
|
|
|
+ impl QuicSender {
|
|
|
+ async fn new(addr: BlockAddr) -> Result<Self> {
|
|
|
+ let mut endpoint =
|
|
|
+ Endpoint::client(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0))?;
|
|
|
+ endpoint.set_default_client_config(client_config()?);
|
|
|
+ let port = addr.port();
|
|
|
+ let socket_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port);
|
|
|
+ let connecting = endpoint.connect(socket_addr, "localhost")?;
|
|
|
+ let connection = connecting.await?;
|
|
|
+ let send_stream = connection.open_uni().await?;
|
|
|
+ let sink = FramedWrite::new(send_stream, MsgEncoder::new());
|
|
|
+ Ok(Self { addr, sink })
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ impl Sender for QuicSender {
|
|
|
+ fn addr(&self) -> &BlockAddr {
|
|
|
+ &self.addr
|
|
|
+ }
|
|
|
+
|
|
|
+ type SendFut<'a, T> = Send<'a, FramedWrite<SendStream, MsgEncoder>, Msg<T>>
|
|
|
+ where T: 'a + Serialize + core::marker::Send;
|
|
|
+
|
|
|
+ fn send<'a, T: 'a + Serialize + core::marker::Send>(
|
|
|
+ &'a mut self,
|
|
|
+ msg: Msg<T>,
|
|
|
+ ) -> Self::SendFut<'a, T> {
|
|
|
+ self.sink.send(msg)
|
|
|
+ }
|
|
|
+
|
|
|
+ type FinishFut = Pin<Box<dyn Future<Output = Result<()>> + core::marker::Send>>;
|
|
|
+
|
|
|
+ fn finish(mut self) -> Self::FinishFut {
|
|
|
+ Box::pin(async move {
|
|
|
+ let steam: &mut SendStream = self.sink.get_mut();
|
|
|
+ steam.finish().await.map_err(|err| bterr!(err))
|
|
|
+ })
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/// This is an identify function which allows you to specify a type parameter for the output
|
|
@@ -392,11 +636,34 @@ mod private {
|
|
|
mod tests {
|
|
|
use super::*;
|
|
|
|
|
|
- use tempdir::TempDir;
|
|
|
+ use btlib::{crypto::Creds, Epoch, Principaled};
|
|
|
+ use ctor::ctor;
|
|
|
+ use std::{
|
|
|
+ sync::atomic::{AtomicU64, Ordering},
|
|
|
+ time::Duration,
|
|
|
+ };
|
|
|
+
|
|
|
+ #[ctor]
|
|
|
+ fn setup_logging() {
|
|
|
+ env_logger::init();
|
|
|
+ }
|
|
|
|
|
|
lazy_static! {
|
|
|
- static ref ROOT_PRINCIPAL: Principal =
|
|
|
- Principal::try_from("0!dSip4J0kurN5VhVo_aTipM-ywOOWrqJuRRVQ7aa-bew").unwrap();
|
|
|
+ static ref ROOT_CREDS: ConcreteCreds = ConcreteCreds::generate().unwrap();
|
|
|
+ static ref NODE_CREDS: ConcreteCreds = {
|
|
|
+ let mut creds = ConcreteCreds::generate().unwrap();
|
|
|
+ let root_creds = &ROOT_CREDS;
|
|
|
+ let writecap = root_creds
|
|
|
+ .issue_writecap(
|
|
|
+ creds.principal(),
|
|
|
+ vec![],
|
|
|
+ Epoch::now() + Duration::from_secs(3600),
|
|
|
+ )
|
|
|
+ .unwrap();
|
|
|
+ creds.set_writecap(writecap);
|
|
|
+ creds
|
|
|
+ };
|
|
|
+ static ref ROOT_PRINCIPAL: Principal = ROOT_CREDS.principal();
|
|
|
}
|
|
|
|
|
|
fn block_addr(generation: u64, inode: u64) -> BlockAddr {
|
|
@@ -427,38 +694,28 @@ mod tests {
|
|
|
}
|
|
|
|
|
|
struct TestCase {
|
|
|
- dir: TempDir,
|
|
|
+ instance_num: u64,
|
|
|
}
|
|
|
|
|
|
impl TestCase {
|
|
|
fn new() -> TestCase {
|
|
|
- Self {
|
|
|
- dir: TempDir::new("btmsg").unwrap(),
|
|
|
- }
|
|
|
+ static INSTANCE_NUM: AtomicU64 = AtomicU64::new(0);
|
|
|
+ let instance_num = INSTANCE_NUM.fetch_add(1, Ordering::SeqCst);
|
|
|
+ Self { instance_num }
|
|
|
}
|
|
|
|
|
|
- fn endpoint(&self, inode: u64) -> (BlockAddr, impl Sender, impl Receiver<BodyOwned>) {
|
|
|
- let addr = block_addr(1, inode);
|
|
|
- let fs_path = self.dir.path().to_owned();
|
|
|
- let receiver = local_receiver(fs_path.clone(), addr.clone()).unwrap();
|
|
|
- let sender = local_sender(fs_path, addr.clone()).unwrap();
|
|
|
+ async fn endpoint(&self, inode: u64) -> (BlockAddr, impl Sender, impl Receiver<BodyOwned>) {
|
|
|
+ let addr = block_addr(self.instance_num, inode);
|
|
|
+ let receiver = local_receiver(addr.clone(), &NODE_CREDS).unwrap();
|
|
|
+ let sender = local_sender(addr.clone()).await.unwrap();
|
|
|
(addr, sender, receiver)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /// This tests just server to ensure that changes to the port hashing algorithm are made
|
|
|
- /// deliberately.
|
|
|
- #[test]
|
|
|
- fn hash_block_addr() {
|
|
|
- let block_addr = BlockAddr::new(ROOT_PRINCIPAL.clone(), 1, BlockNum::Inode(1));
|
|
|
- let port = block_addr.port();
|
|
|
- assert_eq!(64984, port);
|
|
|
- }
|
|
|
-
|
|
|
#[tokio::test]
|
|
|
async fn message_received_is_message_sent() {
|
|
|
let case = TestCase::new();
|
|
|
- let (addr, mut sender, mut receiver) = case.endpoint(1);
|
|
|
+ let (addr, mut sender, mut receiver) = case.endpoint(1).await;
|
|
|
|
|
|
sender.send_msg(addr.clone(), BodyRef::Ping).await.unwrap();
|
|
|
let actual = receiver.next().await.unwrap().unwrap();
|
|
@@ -476,10 +733,10 @@ mod tests {
|
|
|
#[tokio::test]
|
|
|
async fn ping_pong() {
|
|
|
let case = TestCase::new();
|
|
|
- let (addr_one, mut sender_one, mut receiver_one) = case.endpoint(1);
|
|
|
- let (addr_two, mut sender_two, mut receiver_two) = case.endpoint(2);
|
|
|
+ let (addr_one, mut sender_one, mut receiver_one) = case.endpoint(1).await;
|
|
|
+ let (addr_two, mut sender_two, mut receiver_two) = case.endpoint(2).await;
|
|
|
|
|
|
- let handle = tokio::spawn(async move {
|
|
|
+ tokio::spawn(async move {
|
|
|
let msg = receiver_one.next().await.unwrap().unwrap();
|
|
|
let reply_body = if let BodyOwned::Ping = msg.body {
|
|
|
BodyRef::Success
|
|
@@ -488,10 +745,10 @@ mod tests {
|
|
|
};
|
|
|
let fut = assert_send::<'_, Result<()>>(sender_two.send_msg(addr_one, reply_body));
|
|
|
fut.await.unwrap();
|
|
|
+ sender_two.finish().await.unwrap();
|
|
|
});
|
|
|
|
|
|
sender_one.send_msg(addr_two, BodyRef::Ping).await.unwrap();
|
|
|
- handle.await.unwrap();
|
|
|
let reply = receiver_two.next().await.unwrap().unwrap();
|
|
|
let matched = if let BodyOwned::Success = reply.body {
|
|
|
true
|
|
@@ -504,8 +761,8 @@ mod tests {
|
|
|
#[tokio::test]
|
|
|
async fn read_write() {
|
|
|
let case = TestCase::new();
|
|
|
- let (addr_one, mut sender_one, mut receiver_one) = case.endpoint(1);
|
|
|
- let (addr_two, mut sender_two, mut receiver_two) = case.endpoint(2);
|
|
|
+ let (addr_one, mut sender_one, mut receiver_one) = case.endpoint(1).await;
|
|
|
+ let (addr_two, mut sender_two, mut receiver_two) = case.endpoint(2).await;
|
|
|
|
|
|
let handle = tokio::spawn(async move {
|
|
|
let data: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
|
|
@@ -524,6 +781,7 @@ mod tests {
|
|
|
let msg = Msg::new(msg.from, addr_one, msg.id, reply_body);
|
|
|
let fut = assert_send::<'_, Result<()>>(sender_two.send(msg));
|
|
|
fut.await.unwrap();
|
|
|
+ sender_two.finish().await.unwrap();
|
|
|
});
|
|
|
|
|
|
sender_one
|