123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- use btmsg::*;
- use btlib::{
- crypto::{ConcreteCreds, Creds, CredsPriv},
- BlockPath, Epoch, Principal, Principaled,
- };
- use core::future::Future;
- use ctor::ctor;
- use lazy_static::lazy_static;
- use serde::{Deserialize, Serialize};
- use std::{
- net::{IpAddr, Ipv6Addr},
- sync::{Arc, Mutex},
- time::Duration,
- };
- use tokio::sync::mpsc::{self, Sender};
- #[ctor]
- fn setup_logging() {
- use env_logger::Env;
- let env = Env::default().default_filter_or("ERROR");
- env_logger::init_from_env(env);
- }
- lazy_static! {
- static ref ROOT_CREDS: ConcreteCreds = ConcreteCreds::generate().unwrap();
- static ref NODE_CREDS: ConcreteCreds = {
- let mut creds = ConcreteCreds::generate().unwrap();
- let root_creds = &ROOT_CREDS;
- let writecap = root_creds
- .issue_writecap(
- creds.principal(),
- vec![],
- Epoch::now() + Duration::from_secs(3600),
- )
- .unwrap();
- creds.set_writecap(writecap);
- creds
- };
- static ref ROOT_PRINCIPAL: Principal = ROOT_CREDS.principal();
- }
- #[derive(Debug, Serialize, Deserialize)]
- enum Reply {
- Success,
- Fail,
- ReadReply { offset: u64, buf: Vec<u8> },
- }
- #[derive(Serialize, Deserialize)]
- enum Msg<'a> {
- Ping,
- Success,
- Fail,
- Read { offset: u64, size: u64 },
- Write { offset: u64, buf: &'a [u8] },
- }
- impl<'a> CallMsg<'a> for Msg<'a> {
- type Reply = Reply;
- }
- impl<'a> SendMsg<'a> for Msg<'a> {}
- trait TestFunc<S: 'static + Send, Fut: Send + Future>:
- Send + Sync + Fn(MsgReceived<Msg<'_>>, Sender<S>) -> Fut
- {
- }
- impl<
- S: 'static + Send,
- Fut: Send + Future,
- T: Send + Sync + Fn(MsgReceived<Msg<'_>>, Sender<S>) -> Fut,
- > TestFunc<S, Fut> for T
- {
- }
- struct Delegate<S, Fut> {
- func: Arc<dyn TestFunc<S, Fut>>,
- sender: Sender<S>,
- }
- impl<S, Fut> Clone for Delegate<S, Fut> {
- fn clone(&self) -> Self {
- Self {
- func: self.func.clone(),
- sender: self.sender.clone(),
- }
- }
- }
- impl<S: 'static + Send, Fut: Send + Future> Delegate<S, Fut> {
- fn new<F: 'static + TestFunc<S, Fut>>(sender: Sender<S>, func: F) -> Self {
- Self {
- func: Arc::new(func),
- sender,
- }
- }
- }
- impl<S: 'static + Send, Fut: Send + Future> MsgCallback for Delegate<S, Fut> {
- type Arg<'de> = Msg<'de> where Self: 'de;
- type Return = Fut::Output;
- type CallFut<'s> = Fut where Fut: 's;
- fn call<'de>(&'de self, arg: MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de> {
- (self.func)(arg, self.sender.clone())
- }
- }
- fn proc_creds() -> impl Creds {
- let mut creds = ConcreteCreds::generate().unwrap();
- let writecap = NODE_CREDS
- .issue_writecap(
- creds.principal(),
- vec![],
- Epoch::now() + Duration::from_secs(3600),
- )
- .unwrap();
- creds.set_writecap(writecap);
- creds
- }
- fn proc_rx<F: 'static + MsgCallback>(callback: F) -> (impl Receiver, Arc<BlockAddr>) {
- let ip_addr = IpAddr::V6(Ipv6Addr::LOCALHOST);
- let creds = proc_creds();
- let writecap = creds.writecap().unwrap();
- let addr = Arc::new(BlockAddr::new(ip_addr, Arc::new(writecap.bind_path())));
- (receiver(ip_addr, Arc::new(creds), callback).unwrap(), addr)
- }
- async fn proc_tx_rx<F: 'static + MsgCallback>(func: F) -> (impl Transmitter, impl Receiver) {
- let (receiver, addr) = proc_rx(func);
- let sender = receiver.transmitter(addr).await.unwrap();
- (sender, receiver)
- }
- async fn file_server() -> (impl Transmitter, impl Receiver) {
- let (sender, _) = mpsc::channel::<()>(1);
- let file = Arc::new(Mutex::new([0, 1, 2, 3, 4, 5, 6, 7]));
- proc_tx_rx(Delegate::new(
- sender,
- move |mut received: MsgReceived<Msg<'_>>, _| {
- let mut guard = file.lock().unwrap();
- let reply_body = match received.body() {
- Msg::Read { offset, size } => {
- let offset: usize = (*offset).try_into().unwrap();
- let size: usize = (*size).try_into().unwrap();
- let end: usize = offset + size;
- let mut buf = Vec::with_capacity(end - offset);
- buf.extend_from_slice(&guard[offset..end]);
- Reply::ReadReply {
- offset: offset as u64,
- buf,
- }
- }
- Msg::Write { offset, ref buf } => {
- let offset: usize = (*offset).try_into().unwrap();
- let end: usize = offset + buf.len();
- (&mut guard[offset..end]).copy_from_slice(buf);
- Reply::Success
- }
- _ => Reply::Fail,
- };
- let replier = received.take_replier().unwrap();
- async move { replier.reply(reply_body).await }
- },
- ))
- .await
- }
- async fn timeout<F: Future>(future: F) -> F::Output {
- tokio::time::timeout(Duration::from_millis(1000), future)
- .await
- .unwrap()
- }
- macro_rules! recv {
- ($rx:expr) => {
- timeout($rx.recv()).await.unwrap()
- };
- }
- #[tokio::test]
- async fn message_received_is_message_sent() {
- let (sender, mut passed) = mpsc::channel(1);
- let (mut sender, _receiver) = proc_tx_rx(Delegate::new(
- sender,
- |msg: MsgReceived<Msg<'_>>, sender: Sender<bool>| {
- let passed = if let Msg::Ping = msg.body() {
- true
- } else {
- false
- };
- let sender = sender.clone();
- async move {
- sender.send(passed).await.unwrap();
- }
- },
- ))
- .await;
- sender.send(Msg::Ping).await.unwrap();
- assert!(recv!(passed));
- }
- #[tokio::test]
- async fn message_received_from_path_is_correct() {
- let (sender, mut path) = mpsc::channel(1);
- let (mut sender, receiver) = proc_tx_rx(Delegate::new(
- sender,
- |msg: MsgReceived<Msg<'_>>, sender: Sender<Arc<BlockPath>>| {
- let path = msg.from().clone();
- let sender = sender.clone();
- async move {
- sender.send(path).await.unwrap();
- }
- },
- ))
- .await;
- sender.send(Msg::Ping).await.unwrap();
- assert_eq!(receiver.addr().path(), recv!(path).as_ref());
- }
- #[tokio::test]
- async fn reply_to_read() {
- let (mut sender, _receiver) = file_server().await;
- let reply = sender
- .call(Msg::Read { offset: 2, size: 2 })
- .await
- .unwrap();
- if let Reply::ReadReply { offset, buf } = reply {
- assert_eq!(2, offset);
- assert_eq!([2, 3].as_slice(), buf.as_slice());
- } else {
- panic!("reply was not the right type");
- };
- }
- #[tokio::test]
- async fn call_twice() {
- let (mut sender, _receiver) = file_server().await;
- let reply = sender
- .call(Msg::Write {
- offset: 1,
- buf: &[1, 1],
- })
- .await
- .unwrap();
- if let Reply::Success = reply {
- ()
- } else {
- panic!("reply was not the right type");
- };
- let reply = sender
- .call(Msg::Read { offset: 1, size: 2 })
- .await
- .unwrap();
- if let Reply::ReadReply { offset, buf } = reply {
- assert_eq!(1, offset);
- assert_eq!([1, 1].as_slice(), buf.as_slice());
- } else {
- panic!("second reply was not the right type");
- }
- }
- #[tokio::test]
- async fn separate_transmitter() {
- let (_senderx, receiver) = file_server().await;
- let creds = proc_creds();
- let mut transmitter = transmitter(receiver.addr().clone(), Arc::new(creds))
- .await
- .unwrap();
- let reply = transmitter
- .call(Msg::Write {
- offset: 5,
- buf: &[7, 7, 7],
- })
- .await
- .unwrap();
- let matched = if let Reply::Success = reply {
- true
- } else {
- false
- };
- assert!(matched);
- }
|