use btmsg::*;

use btlib::{
    crypto::{ConcreteCreds, Creds},
    Epoch, Principal, Principaled, Result,
};
use ctor::ctor;
use futures::stream::StreamExt;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use std::{
    net::{IpAddr, Ipv6Addr},
    sync::Arc,
    time::Duration,
};
use tokio::sync::mpsc;

#[ctor]
fn setup_logging() {
    env_logger::init();
}

lazy_static! {
    static ref ROOT_CREDS: ConcreteCreds = ConcreteCreds::generate().unwrap();
    static ref NODE_CREDS: ConcreteCreds = {
        let mut creds = ConcreteCreds::generate().unwrap();
        let root_creds = &ROOT_CREDS;
        let writecap = root_creds
            .issue_writecap(
                creds.principal(),
                vec![],
                Epoch::now() + Duration::from_secs(3600),
            )
            .unwrap();
        creds.set_writecap(writecap);
        creds
    };
    static ref ROOT_PRINCIPAL: Principal = ROOT_CREDS.principal();
}

#[derive(Serialize, Deserialize)]
enum MsgError {
    Unknown,
}

#[derive(Deserialize)]
enum BodyOwned {
    Ping,
    Success,
    Fail(MsgError),
    Read { offset: u64, size: u64 },
    Write { offset: u64, buf: Vec<u8> },
}

impl CallRx for BodyOwned {
    type Reply<'a> = BodyRef<'a>;
}

impl SendRx for BodyOwned {}

#[derive(Serialize)]
enum BodyRef<'a> {
    Ping,
    Success,
    Fail(MsgError),
    Read { offset: u64, size: u64 },
    Write { offset: u64, buf: &'a [u8] },
}

impl<'a> CallTx for BodyRef<'a> {
    type Reply = BodyOwned;
}

impl<'a> SendTx for BodyRef<'a> {}

struct TestCase;

impl TestCase {
    fn new() -> TestCase {
        Self
    }

    fn new_process_router(&self) -> (impl Router, Arc<BlockAddr>) {
        let ip_addr = IpAddr::V6(Ipv6Addr::LOCALHOST);
        let mut creds = ConcreteCreds::generate().unwrap();
        let writecap = NODE_CREDS
            .issue_writecap(
                creds.principal(),
                vec![],
                Epoch::now() + Duration::from_secs(3600),
            )
            .unwrap();
        let addr = Arc::new(BlockAddr::new(ip_addr, Arc::new(writecap.bind_path())));
        creds.set_writecap(writecap);
        (router(ip_addr, Arc::new(creds)).unwrap(), addr)
    }

    /// Returns a ([Sender], [Receiver]) pair for a process identified by the given integer.
    async fn new_process(&self) -> (impl Transmitter, impl Receiver<BodyOwned>) {
        let (router, addr) = self.new_process_router();
        let receiver = router.receiver::<BodyOwned>().await.unwrap();
        let sender = router.transmitter(addr).await.unwrap();
        (sender, receiver)
    }
}

#[tokio::test]
async fn message_received_is_message_sent() {
    let case = TestCase::new();
    let (mut sender, mut receiver) = case.new_process().await;

    sender.send(BodyRef::Ping).await.unwrap();
    let actual = receiver.next().await.unwrap().unwrap();

    let matched = if let BodyOwned::Ping = actual.body() {
        true
    } else {
        false
    };
    assert!(matched);
}

#[tokio::test]
async fn message_received_from_path_is_correct() {
    let case = TestCase::new();
    let (mut sender, mut receiver) = case.new_process().await;

    sender.send(BodyRef::Ping).await.unwrap();
    let actual = receiver.next().await.unwrap().unwrap();

    assert_eq!(receiver.addr().path(), actual.from().as_ref());
}

#[tokio::test]
async fn ping_pong() {
    let case = TestCase::new();
    let (mut sender_one, mut receiver_one) = case.new_process().await;
    let (mut sender_two, mut receiver_two) = case.new_process().await;

    tokio::spawn(async move {
        let received = receiver_one.next().await.unwrap().unwrap();
        let reply_body = if let BodyOwned::Ping = received.body() {
            BodyRef::Success
        } else {
            BodyRef::Fail(MsgError::Unknown)
        };
        let fut = assert_send::<'_, Result<()>>(sender_two.send(reply_body));
        fut.await.unwrap();
        sender_two.finish().await.unwrap();
    });

    sender_one.send(BodyRef::Ping).await.unwrap();
    let reply = receiver_two.next().await.unwrap().unwrap();
    let matched = if let BodyOwned::Success = reply.body() {
        true
    } else {
        false
    };
    assert!(matched);
    assert_eq!(receiver_two.addr().path(), reply.from().as_ref());
}

#[tokio::test]
async fn read_write() {
    let case = TestCase::new();
    let (mut sender_one, mut receiver_one) = case.new_process().await;
    let (mut sender_two, mut receiver_two) = case.new_process().await;

    let handle = tokio::spawn(async move {
        let data: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
        let received = receiver_one.next().await.unwrap().unwrap();
        let reply_body = if let BodyOwned::Read { offset, size } = received.body() {
            let offset: usize = (*offset).try_into().unwrap();
            let size: usize = (*size).try_into().unwrap();
            let end: usize = offset + size;
            BodyRef::Write {
                offset: offset as u64,
                buf: &data[offset..end],
            }
        } else {
            BodyRef::Fail(MsgError::Unknown)
        };
        let fut = assert_send::<'_, Result<()>>(sender_two.send(reply_body));
        fut.await.unwrap();
        sender_two.finish().await.unwrap();
    });

    sender_one
        .send(BodyRef::Read { offset: 2, size: 2 })
        .await
        .unwrap();
    handle.await.unwrap();
    let reply = receiver_two.next().await.unwrap().unwrap();
    if let BodyOwned::Write { offset, buf } = reply.body() {
        assert_eq!(2, *offset);
        assert_eq!([2, 3].as_slice(), buf.as_slice());
    } else {
        panic!("reply was not the right type");
    };
}

async fn file_server<T: Receiver<BodyOwned> + Unpin>(
    mut receiver: T,
    mut stop_rx: mpsc::Receiver<()>,
) {
    let mut file: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
    loop {
        let mut received = tokio::select! {
            Some(..) = stop_rx.recv() => return,
            Some(received) = receiver.next() => received.unwrap(),
        };
        let reply_body = match received.body() {
            BodyOwned::Read { offset, size } => {
                let offset: usize = (*offset).try_into().unwrap();
                let size: usize = (*size).try_into().unwrap();
                let end: usize = offset + size;
                BodyRef::Write {
                    offset: offset as u64,
                    buf: &file[offset..end],
                }
            }
            BodyOwned::Write { offset, ref buf } => {
                let offset: usize = (*offset).try_into().unwrap();
                (&mut file[offset..buf.len()]).copy_from_slice(buf);
                BodyRef::Success
            }
            _ => BodyRef::Fail(MsgError::Unknown),
        };
        received.reply(reply_body).await.unwrap();
    }
}

#[tokio::test]
async fn reply_to_read() {
    let case = TestCase::new();
    let (mut sender, receiver) = case.new_process().await;
    let (stop_tx, stop_rx) = mpsc::channel::<()>(1);

    let handle = tokio::spawn(file_server(receiver, stop_rx));

    let reply = sender
        .call(BodyRef::Read { offset: 2, size: 2 })
        .await
        .unwrap();
    if let BodyOwned::Write { offset, buf } = reply {
        assert_eq!(2, offset);
        assert_eq!([2, 3].as_slice(), buf.as_slice());
    } else {
        panic!("reply was not the right type");
    };

    stop_tx.send(()).await.unwrap();
    handle.await.unwrap();
}

#[tokio::test]
async fn call_twice() {
    let case = TestCase::new();
    let (mut sender, receiver) = case.new_process().await;
    let (stop_tx, stop_rx) = mpsc::channel::<()>(1);

    let handle = tokio::spawn(file_server(receiver, stop_rx));

    let reply = sender
        .call(BodyRef::Read { offset: 2, size: 2 })
        .await
        .unwrap();
    if let BodyOwned::Write { offset, buf } = reply {
        assert_eq!(2, offset);
        assert_eq!([2, 3].as_slice(), buf.as_slice());
    } else {
        panic!("reply was not the right type");
    };
    let reply = sender
        .call(BodyRef::Read { offset: 3, size: 5 })
        .await
        .unwrap();
    if let BodyOwned::Write { offset, buf } = reply {
        assert_eq!(3, offset);
        assert_eq!([3, 4, 5, 6, 7].as_slice(), buf.as_slice());
    } else {
        panic!("second reply was not the right type");
    }

    stop_tx.send(()).await.unwrap();
    handle.await.unwrap();
}

#[tokio::test]
async fn replies_sent_out_of_order() {
    let case = TestCase::new();
    let (sender_one, mut receiver_one) = case.new_process().await;
    let (router, ..) = case.new_process_router();
    let sender_two = router
        .transmitter(Arc::new(sender_one.addr().clone()))
        .await
        .unwrap();

    let handle = tokio::spawn(async move {
        const EMPTY_SLICE: &[u8] = &[];
        fn reply(body: &BodyOwned) -> BodyRef<'static> {
            match body {
                BodyOwned::Write { offset, .. } => BodyRef::Write {
                    offset: *offset,
                    buf: EMPTY_SLICE,
                },
                _ => panic!("message was the wrong variant"),
            }
        }
        let mut received_one = receiver_one.next().await.unwrap().unwrap();
        let mut received_two = receiver_one.next().await.unwrap().unwrap();
        received_two
            .reply(reply(received_two.body()))
            .await
            .unwrap();
        received_one
            .reply(reply(received_one.body()))
            .await
            .unwrap();
    });

    async fn client(num: u64, mut tx: impl Transmitter) {
        let fut = assert_send::<'_, Result<BodyOwned>>(tx.call(BodyRef::Write {
            offset: num,
            buf: [].as_slice(),
        }));
        let reply = fut.await.unwrap();
        if let BodyOwned::Write { offset, .. } = reply {
            assert_eq!(num, offset);
        } else {
            panic!("reply was the wrong variant");
        }
    }

    let handle_one = tokio::spawn(client(1, sender_one));
    let handle_two = tokio::spawn(client(2, sender_two));

    handle.await.unwrap();
    handle_one.await.unwrap();
    handle_two.await.unwrap();
}