123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- use btmsg::*;
- use btlib::{
- crypto::{ConcreteCreds, Creds},
- Epoch, Principal, Principaled, Result,
- };
- use ctor::ctor;
- use futures::stream::StreamExt;
- use lazy_static::lazy_static;
- use serde::{Deserialize, Serialize};
- use std::{
- net::{IpAddr, Ipv6Addr},
- sync::Arc,
- time::Duration,
- };
- use tokio::sync::mpsc;
- #[ctor]
- fn setup_logging() {
- env_logger::init();
- }
- lazy_static! {
- static ref ROOT_CREDS: ConcreteCreds = ConcreteCreds::generate().unwrap();
- static ref NODE_CREDS: ConcreteCreds = {
- let mut creds = ConcreteCreds::generate().unwrap();
- let root_creds = &ROOT_CREDS;
- let writecap = root_creds
- .issue_writecap(
- creds.principal(),
- vec![],
- Epoch::now() + Duration::from_secs(3600),
- )
- .unwrap();
- creds.set_writecap(writecap);
- creds
- };
- static ref ROOT_PRINCIPAL: Principal = ROOT_CREDS.principal();
- }
- #[derive(Serialize, Deserialize)]
- enum MsgError {
- Unknown,
- }
- #[derive(Deserialize)]
- enum BodyOwned {
- Ping,
- Success,
- Fail(MsgError),
- Read { offset: u64, size: u64 },
- Write { offset: u64, buf: Vec<u8> },
- }
- impl CallRx for BodyOwned {
- type Reply<'a> = BodyRef<'a>;
- }
- impl SendRx for BodyOwned {}
- #[derive(Serialize)]
- enum BodyRef<'a> {
- Ping,
- Success,
- Fail(MsgError),
- Read { offset: u64, size: u64 },
- Write { offset: u64, buf: &'a [u8] },
- }
- impl<'a> CallTx for BodyRef<'a> {
- type Reply = BodyOwned;
- }
- impl<'a> SendTx for BodyRef<'a> {}
- struct TestCase;
- impl TestCase {
- fn new() -> TestCase {
- Self
- }
- fn new_process_router(&self) -> (impl Router, Arc<BlockAddr>) {
- let ip_addr = IpAddr::V6(Ipv6Addr::LOCALHOST);
- let mut creds = ConcreteCreds::generate().unwrap();
- let writecap = NODE_CREDS
- .issue_writecap(
- creds.principal(),
- vec![],
- Epoch::now() + Duration::from_secs(3600),
- )
- .unwrap();
- let addr = Arc::new(BlockAddr::new(ip_addr, Arc::new(writecap.bind_path())));
- creds.set_writecap(writecap);
- (router(ip_addr, Arc::new(creds)).unwrap(), addr)
- }
- /// Returns a ([Sender], [Receiver]) pair for a process identified by the given integer.
- async fn new_process(&self) -> (impl Transmitter, impl Receiver<BodyOwned>) {
- let (router, addr) = self.new_process_router();
- let receiver = router.receiver::<BodyOwned>().await.unwrap();
- let sender = router.transmitter(addr).await.unwrap();
- (sender, receiver)
- }
- }
- #[tokio::test]
- async fn message_received_is_message_sent() {
- let case = TestCase::new();
- let (mut sender, mut receiver) = case.new_process().await;
- sender.send(BodyRef::Ping).await.unwrap();
- let actual = receiver.next().await.unwrap().unwrap();
- let matched = if let BodyOwned::Ping = actual.body() {
- true
- } else {
- false
- };
- assert!(matched);
- }
- #[tokio::test]
- async fn message_received_from_path_is_correct() {
- let case = TestCase::new();
- let (mut sender, mut receiver) = case.new_process().await;
- sender.send(BodyRef::Ping).await.unwrap();
- let actual = receiver.next().await.unwrap().unwrap();
- assert_eq!(receiver.addr().path(), actual.from().as_ref());
- }
- #[tokio::test]
- async fn ping_pong() {
- let case = TestCase::new();
- let (mut sender_one, mut receiver_one) = case.new_process().await;
- let (mut sender_two, mut receiver_two) = case.new_process().await;
- tokio::spawn(async move {
- let received = receiver_one.next().await.unwrap().unwrap();
- let reply_body = if let BodyOwned::Ping = received.body() {
- BodyRef::Success
- } else {
- BodyRef::Fail(MsgError::Unknown)
- };
- let fut = assert_send::<'_, Result<()>>(sender_two.send(reply_body));
- fut.await.unwrap();
- sender_two.finish().await.unwrap();
- });
- sender_one.send(BodyRef::Ping).await.unwrap();
- let reply = receiver_two.next().await.unwrap().unwrap();
- let matched = if let BodyOwned::Success = reply.body() {
- true
- } else {
- false
- };
- assert!(matched);
- assert_eq!(receiver_two.addr().path(), reply.from().as_ref());
- }
- #[tokio::test]
- async fn read_write() {
- let case = TestCase::new();
- let (mut sender_one, mut receiver_one) = case.new_process().await;
- let (mut sender_two, mut receiver_two) = case.new_process().await;
- let handle = tokio::spawn(async move {
- let data: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
- let received = receiver_one.next().await.unwrap().unwrap();
- let reply_body = if let BodyOwned::Read { offset, size } = received.body() {
- let offset: usize = (*offset).try_into().unwrap();
- let size: usize = (*size).try_into().unwrap();
- let end: usize = offset + size;
- BodyRef::Write {
- offset: offset as u64,
- buf: &data[offset..end],
- }
- } else {
- BodyRef::Fail(MsgError::Unknown)
- };
- let fut = assert_send::<'_, Result<()>>(sender_two.send(reply_body));
- fut.await.unwrap();
- sender_two.finish().await.unwrap();
- });
- sender_one
- .send(BodyRef::Read { offset: 2, size: 2 })
- .await
- .unwrap();
- handle.await.unwrap();
- let reply = receiver_two.next().await.unwrap().unwrap();
- if let BodyOwned::Write { offset, buf } = reply.body() {
- assert_eq!(2, *offset);
- assert_eq!([2, 3].as_slice(), buf.as_slice());
- } else {
- panic!("reply was not the right type");
- };
- }
- async fn file_server<T: Receiver<BodyOwned> + Unpin>(
- mut receiver: T,
- mut stop_rx: mpsc::Receiver<()>,
- ) {
- let mut file: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
- loop {
- let mut received = tokio::select! {
- Some(..) = stop_rx.recv() => return,
- Some(received) = receiver.next() => received.unwrap(),
- };
- let reply_body = match received.body() {
- BodyOwned::Read { offset, size } => {
- let offset: usize = (*offset).try_into().unwrap();
- let size: usize = (*size).try_into().unwrap();
- let end: usize = offset + size;
- BodyRef::Write {
- offset: offset as u64,
- buf: &file[offset..end],
- }
- }
- BodyOwned::Write { offset, ref buf } => {
- let offset: usize = (*offset).try_into().unwrap();
- (&mut file[offset..buf.len()]).copy_from_slice(buf);
- BodyRef::Success
- }
- _ => BodyRef::Fail(MsgError::Unknown),
- };
- received.reply(reply_body).await.unwrap();
- }
- }
- #[tokio::test]
- async fn reply_to_read() {
- let case = TestCase::new();
- let (mut sender, receiver) = case.new_process().await;
- let (stop_tx, stop_rx) = mpsc::channel::<()>(1);
- let handle = tokio::spawn(file_server(receiver, stop_rx));
- let reply = sender
- .call(BodyRef::Read { offset: 2, size: 2 })
- .await
- .unwrap();
- if let BodyOwned::Write { offset, buf } = reply {
- assert_eq!(2, offset);
- assert_eq!([2, 3].as_slice(), buf.as_slice());
- } else {
- panic!("reply was not the right type");
- };
- stop_tx.send(()).await.unwrap();
- handle.await.unwrap();
- }
- #[tokio::test]
- async fn call_twice() {
- let case = TestCase::new();
- let (mut sender, receiver) = case.new_process().await;
- let (stop_tx, stop_rx) = mpsc::channel::<()>(1);
- let handle = tokio::spawn(file_server(receiver, stop_rx));
- let reply = sender
- .call(BodyRef::Read { offset: 2, size: 2 })
- .await
- .unwrap();
- if let BodyOwned::Write { offset, buf } = reply {
- assert_eq!(2, offset);
- assert_eq!([2, 3].as_slice(), buf.as_slice());
- } else {
- panic!("reply was not the right type");
- };
- let reply = sender
- .call(BodyRef::Read { offset: 3, size: 5 })
- .await
- .unwrap();
- if let BodyOwned::Write { offset, buf } = reply {
- assert_eq!(3, offset);
- assert_eq!([3, 4, 5, 6, 7].as_slice(), buf.as_slice());
- } else {
- panic!("second reply was not the right type");
- }
- stop_tx.send(()).await.unwrap();
- handle.await.unwrap();
- }
- #[tokio::test]
- async fn replies_sent_out_of_order() {
- let case = TestCase::new();
- let (sender_one, mut receiver_one) = case.new_process().await;
- let (router, ..) = case.new_process_router();
- let sender_two = router
- .transmitter(Arc::new(sender_one.addr().clone()))
- .await
- .unwrap();
- let handle = tokio::spawn(async move {
- const EMPTY_SLICE: &[u8] = &[];
- fn reply(body: &BodyOwned) -> BodyRef<'static> {
- match body {
- BodyOwned::Write { offset, .. } => BodyRef::Write {
- offset: *offset,
- buf: EMPTY_SLICE,
- },
- _ => panic!("message was the wrong variant"),
- }
- }
- let mut received_one = receiver_one.next().await.unwrap().unwrap();
- let mut received_two = receiver_one.next().await.unwrap().unwrap();
- received_two
- .reply(reply(received_two.body()))
- .await
- .unwrap();
- received_one
- .reply(reply(received_one.body()))
- .await
- .unwrap();
- });
- async fn client(num: u64, mut tx: impl Transmitter) {
- let fut = assert_send::<'_, Result<BodyOwned>>(tx.call(BodyRef::Write {
- offset: num,
- buf: [].as_slice(),
- }));
- let reply = fut.await.unwrap();
- if let BodyOwned::Write { offset, .. } = reply {
- assert_eq!(num, offset);
- } else {
- panic!("reply was the wrong variant");
- }
- }
- let handle_one = tokio::spawn(client(1, sender_one));
- let handle_two = tokio::spawn(client(2, sender_two));
- handle.await.unwrap();
- handle_one.await.unwrap();
- handle_two.await.unwrap();
- }
|