use btmsg::*; use btlib::{ crypto::{ConcreteCreds, Creds}, Epoch, Principal, Principaled, Result, }; use ctor::ctor; use futures::stream::StreamExt; use lazy_static::lazy_static; use serde::{Deserialize, Serialize}; use std::{ net::{IpAddr, Ipv6Addr}, sync::Arc, time::Duration, }; use tokio::sync::mpsc; #[ctor] fn setup_logging() { env_logger::init(); } lazy_static! { static ref ROOT_CREDS: ConcreteCreds = ConcreteCreds::generate().unwrap(); static ref NODE_CREDS: ConcreteCreds = { let mut creds = ConcreteCreds::generate().unwrap(); let root_creds = &ROOT_CREDS; let writecap = root_creds .issue_writecap( creds.principal(), vec![], Epoch::now() + Duration::from_secs(3600), ) .unwrap(); creds.set_writecap(writecap); creds }; static ref ROOT_PRINCIPAL: Principal = ROOT_CREDS.principal(); } #[derive(Serialize, Deserialize)] enum MsgError { Unknown, } #[derive(Deserialize)] enum BodyOwned { Ping, Success, Fail(MsgError), Read { offset: u64, size: u64 }, Write { offset: u64, buf: Vec<u8> }, } impl CallRx for BodyOwned { type Reply<'a> = BodyRef<'a>; } impl SendRx for BodyOwned {} #[derive(Serialize)] enum BodyRef<'a> { Ping, Success, Fail(MsgError), Read { offset: u64, size: u64 }, Write { offset: u64, buf: &'a [u8] }, } impl<'a> CallTx for BodyRef<'a> { type Reply = BodyOwned; } impl<'a> SendTx for BodyRef<'a> {} struct TestCase; impl TestCase { fn new() -> TestCase { Self } fn new_process_router(&self) -> (impl Router, Arc<BlockAddr>) { let ip_addr = IpAddr::V6(Ipv6Addr::LOCALHOST); let mut creds = ConcreteCreds::generate().unwrap(); let writecap = NODE_CREDS .issue_writecap( creds.principal(), vec![], Epoch::now() + Duration::from_secs(3600), ) .unwrap(); let addr = Arc::new(BlockAddr::new(ip_addr, Arc::new(writecap.bind_path()))); creds.set_writecap(writecap); (router(ip_addr, Arc::new(creds)).unwrap(), addr) } /// Returns a ([Sender], [Receiver]) pair for a process identified by the given integer. async fn new_process(&self) -> (impl Transmitter, impl Receiver<BodyOwned>) { let (router, addr) = self.new_process_router(); let receiver = router.receiver::<BodyOwned>().await.unwrap(); let sender = router.transmitter(addr).await.unwrap(); (sender, receiver) } } #[tokio::test] async fn message_received_is_message_sent() { let case = TestCase::new(); let (mut sender, mut receiver) = case.new_process().await; sender.send(BodyRef::Ping).await.unwrap(); let actual = receiver.next().await.unwrap().unwrap(); let matched = if let BodyOwned::Ping = actual.body() { true } else { false }; assert!(matched); } #[tokio::test] async fn message_received_from_path_is_correct() { let case = TestCase::new(); let (mut sender, mut receiver) = case.new_process().await; sender.send(BodyRef::Ping).await.unwrap(); let actual = receiver.next().await.unwrap().unwrap(); assert_eq!(receiver.addr().path(), actual.from().as_ref()); } #[tokio::test] async fn ping_pong() { let case = TestCase::new(); let (mut sender_one, mut receiver_one) = case.new_process().await; let (mut sender_two, mut receiver_two) = case.new_process().await; tokio::spawn(async move { let received = receiver_one.next().await.unwrap().unwrap(); let reply_body = if let BodyOwned::Ping = received.body() { BodyRef::Success } else { BodyRef::Fail(MsgError::Unknown) }; let fut = assert_send::<'_, Result<()>>(sender_two.send(reply_body)); fut.await.unwrap(); sender_two.finish().await.unwrap(); }); sender_one.send(BodyRef::Ping).await.unwrap(); let reply = receiver_two.next().await.unwrap().unwrap(); let matched = if let BodyOwned::Success = reply.body() { true } else { false }; assert!(matched); assert_eq!(receiver_two.addr().path(), reply.from().as_ref()); } #[tokio::test] async fn read_write() { let case = TestCase::new(); let (mut sender_one, mut receiver_one) = case.new_process().await; let (mut sender_two, mut receiver_two) = case.new_process().await; let handle = tokio::spawn(async move { let data: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7]; let received = receiver_one.next().await.unwrap().unwrap(); let reply_body = if let BodyOwned::Read { offset, size } = received.body() { let offset: usize = (*offset).try_into().unwrap(); let size: usize = (*size).try_into().unwrap(); let end: usize = offset + size; BodyRef::Write { offset: offset as u64, buf: &data[offset..end], } } else { BodyRef::Fail(MsgError::Unknown) }; let fut = assert_send::<'_, Result<()>>(sender_two.send(reply_body)); fut.await.unwrap(); sender_two.finish().await.unwrap(); }); sender_one .send(BodyRef::Read { offset: 2, size: 2 }) .await .unwrap(); handle.await.unwrap(); let reply = receiver_two.next().await.unwrap().unwrap(); if let BodyOwned::Write { offset, buf } = reply.body() { assert_eq!(2, *offset); assert_eq!([2, 3].as_slice(), buf.as_slice()); } else { panic!("reply was not the right type"); }; } async fn file_server<T: Receiver<BodyOwned> + Unpin>( mut receiver: T, mut stop_rx: mpsc::Receiver<()>, ) { let mut file: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7]; loop { let mut received = tokio::select! { Some(..) = stop_rx.recv() => return, Some(received) = receiver.next() => received.unwrap(), }; let reply_body = match received.body() { BodyOwned::Read { offset, size } => { let offset: usize = (*offset).try_into().unwrap(); let size: usize = (*size).try_into().unwrap(); let end: usize = offset + size; BodyRef::Write { offset: offset as u64, buf: &file[offset..end], } } BodyOwned::Write { offset, ref buf } => { let offset: usize = (*offset).try_into().unwrap(); (&mut file[offset..buf.len()]).copy_from_slice(buf); BodyRef::Success } _ => BodyRef::Fail(MsgError::Unknown), }; received.reply(reply_body).await.unwrap(); } } #[tokio::test] async fn reply_to_read() { let case = TestCase::new(); let (mut sender, receiver) = case.new_process().await; let (stop_tx, stop_rx) = mpsc::channel::<()>(1); let handle = tokio::spawn(file_server(receiver, stop_rx)); let reply = sender .call(BodyRef::Read { offset: 2, size: 2 }) .await .unwrap(); if let BodyOwned::Write { offset, buf } = reply { assert_eq!(2, offset); assert_eq!([2, 3].as_slice(), buf.as_slice()); } else { panic!("reply was not the right type"); }; stop_tx.send(()).await.unwrap(); handle.await.unwrap(); } #[tokio::test] async fn call_twice() { let case = TestCase::new(); let (mut sender, receiver) = case.new_process().await; let (stop_tx, stop_rx) = mpsc::channel::<()>(1); let handle = tokio::spawn(file_server(receiver, stop_rx)); let reply = sender .call(BodyRef::Read { offset: 2, size: 2 }) .await .unwrap(); if let BodyOwned::Write { offset, buf } = reply { assert_eq!(2, offset); assert_eq!([2, 3].as_slice(), buf.as_slice()); } else { panic!("reply was not the right type"); }; let reply = sender .call(BodyRef::Read { offset: 3, size: 5 }) .await .unwrap(); if let BodyOwned::Write { offset, buf } = reply { assert_eq!(3, offset); assert_eq!([3, 4, 5, 6, 7].as_slice(), buf.as_slice()); } else { panic!("second reply was not the right type"); } stop_tx.send(()).await.unwrap(); handle.await.unwrap(); } #[tokio::test] async fn replies_sent_out_of_order() { let case = TestCase::new(); let (sender_one, mut receiver_one) = case.new_process().await; let (router, ..) = case.new_process_router(); let sender_two = router .transmitter(Arc::new(sender_one.addr().clone())) .await .unwrap(); let handle = tokio::spawn(async move { const EMPTY_SLICE: &[u8] = &[]; fn reply(body: &BodyOwned) -> BodyRef<'static> { match body { BodyOwned::Write { offset, .. } => BodyRef::Write { offset: *offset, buf: EMPTY_SLICE, }, _ => panic!("message was the wrong variant"), } } let mut received_one = receiver_one.next().await.unwrap().unwrap(); let mut received_two = receiver_one.next().await.unwrap().unwrap(); received_two .reply(reply(received_two.body())) .await .unwrap(); received_one .reply(reply(received_one.body())) .await .unwrap(); }); async fn client(num: u64, mut tx: impl Transmitter) { let fut = assert_send::<'_, Result<BodyOwned>>(tx.call(BodyRef::Write { offset: num, buf: [].as_slice(), })); let reply = fut.await.unwrap(); if let BodyOwned::Write { offset, .. } = reply { assert_eq!(num, offset); } else { panic!("reply was the wrong variant"); } } let handle_one = tokio::spawn(client(1, sender_one)); let handle_two = tokio::spawn(client(2, sender_two)); handle.await.unwrap(); handle_one.await.unwrap(); handle_two.await.unwrap(); }