Переглянути джерело

Implemented the btfproto client.

Matthew Carr 2 роки тому
батько
коміт
1fc18b39c0

+ 1 - 0
Cargo.lock

@@ -203,6 +203,7 @@ dependencies = [
 name = "btfproto"
 version = "0.1.0"
 dependencies = [
+ "anyhow",
  "btlib",
  "btmsg",
  "log",

+ 1 - 0
crates/btfproto/Cargo.toml

@@ -17,3 +17,4 @@ serde = { version = "^1.0.136", features = ["derive"] }
 paste = "1.0.11"
 log = "0.4.17"
 tokio = { version = "1.24.2", features = ["rt"] }
+anyhow = { version = "1.0.66", features = ["std", "backtrace"] }

+ 267 - 0
crates/btfproto/src/client.rs

@@ -1 +1,268 @@
+use crate::msg::*;
 
+use btlib::{bterr, BlockMeta, Result};
+use btmsg::{DeserCallback, Transmitter};
+
+use core::future::{ready, Future, Ready};
+use paste::paste;
+
+macro_rules! extractor {
+    ($variant:ident) => {
+        paste! {
+            [<Extract $variant>]
+        }
+    };
+}
+
+macro_rules! reply {
+    ($variant:ident) => {
+        paste! {
+            [<$variant Reply>]
+        }
+    };
+}
+
+macro_rules! extractor_callback {
+    ($variant:ident) => {
+        paste! {
+            struct [<Extract $variant>];
+
+            impl DeserCallback for extractor!($variant) {
+                type Arg<'de> = FsReply<'de>;
+                type Return = Result<reply!($variant)>;
+                type CallFut<'de> = Ready<Self::Return>;
+                fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
+                    let result = if let FsReply::$variant(value) = arg {
+                        Ok(value)
+                    } else {
+                        Err(bterr!("wrong message type sent as reply"))
+                    };
+                    ready(result)
+                }
+            }
+        }
+    };
+}
+
+extractor_callback!(Lookup);
+extractor_callback!(Create);
+extractor_callback!(Open);
+extractor_callback!(Write);
+extractor_callback!(Link);
+
+struct AckCallback;
+
+impl DeserCallback for AckCallback {
+    type Arg<'de> = FsReply<'de>;
+    type Return = Result<()>;
+    type CallFut<'de> = Ready<Self::Return>;
+    fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
+        let result = if let FsReply::Ack(_) = arg {
+            Ok(())
+        } else {
+            Err(bterr!("wrong message type sent as reply"))
+        };
+        ready(result)
+    }
+}
+
+struct ExtractReadMeta;
+
+impl DeserCallback for ExtractReadMeta {
+    type Arg<'de> = FsReply<'de>;
+    type Return = Result<BlockMeta>;
+    type CallFut<'de> = Ready<Self::Return>;
+    fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
+        let result = if let FsReply::ReadMeta(value) = arg {
+            Ok(value)
+        } else {
+            Err(bterr!("wrong message type sent as reply"))
+        };
+        ready(result)
+    }
+}
+
+struct ExtractRead<F> {
+    callback: Option<F>,
+}
+
+impl<F> ExtractRead<F> {
+    fn new(callback: F) -> Self {
+        Self {
+            callback: Some(callback),
+        }
+    }
+}
+
+impl<R, Fut, F> DeserCallback for ExtractRead<F>
+where
+    F: 'static + Send + FnOnce(ReadReply<'_>) -> Fut,
+    Fut: Send + Future<Output = R>,
+{
+    type Arg<'de> = FsReply<'de> where F: 'de;
+    type Return = Result<R>;
+    type CallFut<'de> = impl 'de + Send + Future<Output = Self::Return>;
+    fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
+        let callback = self.callback.take().unwrap();
+        async move {
+            if let FsReply::Read(reply) = arg {
+                Ok(callback(reply).await)
+            } else {
+                Err(bterr!("wrong variant sent in reply to Read message"))
+            }
+        }
+    }
+}
+
+pub struct FsClient<T> {
+    tx: T,
+}
+
+impl<T> FsClient<T> {
+    pub fn new(tx: T) -> Self {
+        Self { tx }
+    }
+
+    pub fn into_inner(self) -> T {
+        self.tx
+    }
+
+    pub fn get_ref(&self) -> &T {
+        &self.tx
+    }
+
+    pub fn get_mut(&mut self) -> &mut T {
+        &mut self.tx
+    }
+}
+
+impl<T: Transmitter> FsClient<T> {
+    /// Lookup up the given name under the given inode.
+    pub async fn lookup(&self, parent: Inode, name: &str) -> Result<LookupReply> {
+        let msg = FsMsg::Lookup(Lookup { parent, name });
+        self.tx.call(msg, extractor!(Lookup)).await?
+    }
+
+    pub async fn create(
+        &self,
+        parent: Inode,
+        name: &str,
+        flags: u32,
+        mode: u32,
+        umask: u32,
+    ) -> Result<CreateReply> {
+        let msg = FsMsg::Create(Create {
+            parent,
+            name,
+            flags,
+            mode,
+            umask,
+        });
+        self.tx.call(msg, extractor!(Create)).await?
+    }
+
+    pub async fn open(&self, inode: Inode, flags: u32) -> Result<OpenReply> {
+        let msg = FsMsg::Open(Open { inode, flags });
+        self.tx.call(msg, extractor!(Open)).await?
+    }
+
+    pub async fn read<R, Fut, F>(&self, read: Read, callback: F) -> Result<R>
+    where
+        F: 'static + Send + FnOnce(ReadReply<'_>) -> Fut,
+        Fut: Send + Future<Output = R>,
+    {
+        let callback = ExtractRead::new(callback);
+        self.tx.call(FsMsg::Read(read), callback).await?
+    }
+
+    pub async fn write(
+        &self,
+        inode: Inode,
+        handle: Handle,
+        offset: u64,
+        data: &[u8],
+    ) -> Result<WriteReply> {
+        let msg = FsMsg::Write(Write {
+            inode,
+            handle,
+            offset,
+            data,
+        });
+        self.tx.call(msg, extractor!(Write)).await?
+    }
+
+    pub async fn flush(&self, inode: Inode, handle: Handle) -> Result<()> {
+        let msg = FsMsg::Flush(Flush { inode, handle });
+        self.tx.call(msg, AckCallback).await?
+    }
+
+    pub async fn link(&self, inode: Inode, new_parent: Inode, name: &str) -> Result<LinkReply> {
+        let msg = FsMsg::Link(Link {
+            inode,
+            new_parent,
+            name,
+        });
+        self.tx.call(msg, extractor!(Link)).await?
+    }
+
+    pub async fn unlink(&self, parent: Inode, name: &str) -> Result<()> {
+        let msg = FsMsg::Unlink(Unlink { parent, name });
+        self.tx.call(msg, AckCallback).await?
+    }
+
+    pub async fn read_meta(&self, inode: Inode) -> Result<BlockMeta> {
+        let msg = FsMsg::ReadMeta(ReadMeta { inode });
+        self.tx.call(msg, ExtractReadMeta).await?
+    }
+
+    pub async fn write_meta(&self, inode: Inode, handle: Handle, meta: BlockMeta) -> Result<()> {
+        let msg = FsMsg::WriteMeta(WriteMeta {
+            inode,
+            handle,
+            meta,
+        });
+        self.tx.call(msg, AckCallback).await?
+    }
+
+    pub async fn close(&self, inode: Inode, handle: Handle) -> Result<()> {
+        let msg = FsMsg::Close(Close { inode, handle });
+        self.tx.call(msg, AckCallback).await?
+    }
+
+    pub async fn forget(&self, inode: Inode, count: u64) -> Result<()> {
+        let msg = FsMsg::Forget(Forget { inode, count });
+        self.tx.call(msg, AckCallback).await?
+    }
+
+    pub async fn lock(&self, inode: Inode, handle: Handle, desc: LockDesc) -> Result<()> {
+        let msg = FsMsg::Lock(Lock {
+            inode,
+            handle,
+            desc,
+        });
+        self.tx.call(msg, AckCallback).await?
+    }
+
+    pub async fn unlock(&self, inode: Inode, handle: Handle) -> Result<()> {
+        let msg = FsMsg::Unlock(Unlock { inode, handle });
+        self.tx.call(msg, AckCallback).await?
+    }
+}
+
+impl<T> AsRef<T> for FsClient<T> {
+    fn as_ref(&self) -> &T {
+        self.get_ref()
+    }
+}
+
+impl<T> AsMut<T> for FsClient<T> {
+    fn as_mut(&mut self) -> &mut T {
+        self.get_mut()
+    }
+}
+
+impl<T> From<T> for FsClient<T> {
+    fn from(value: T) -> Self {
+        Self::new(value)
+    }
+}

+ 2 - 2
crates/btfproto/src/lib.rs

@@ -1,7 +1,7 @@
 #![feature(type_alias_impl_trait)]
 
-#[cfg(feature = "client")]
-mod client;
 mod msg;
+#[cfg(feature = "client")]
+pub mod client;
 #[cfg(feature = "server")]
 pub mod server;

+ 0 - 1
crates/btfproto/src/msg.rs

@@ -162,7 +162,6 @@ pub struct Unlink<'a> {
 #[derive(Serialize, Deserialize)]
 pub struct ReadMeta {
     pub inode: Inode,
-    pub handle: Handle,
 }
 
 #[derive(Serialize, Deserialize)]

+ 7 - 7
crates/btmsg/src/lib.rs

@@ -221,19 +221,19 @@ pub trait Transmitter {
     where
         Self: 'call,
         T: 'call + CallMsg<'call>,
-        F: 'static + Send + Sync + DeserCallback;
+        F: 'static + Send + DeserCallback;
 
     /// Transmit a message to the connected [Receiver], waits for a reply, then calls the given
     /// [DeserCallback] with the deserialized reply.
     fn call<'call, T, F>(&'call self, msg: T, callback: F) -> Self::CallFut<'call, T, F>
     where
         T: 'call + CallMsg<'call>,
-        F: 'static + Send + Sync + DeserCallback;
+        F: 'static + Send + DeserCallback;
 
-    /// Transmits a message to the connected [Reciever], waits for a reply, then passes back the
+    /// Transmits a message to the connected [Receiver], waits for a reply, then passes back the
     /// the reply to the caller.
     fn call_through<'call, T>(
-        &'call mut self,
+        &'call self,
         msg: T,
     ) -> Self::CallFut<'call, T, Passthrough<T::Reply<'call>>>
     where
@@ -609,7 +609,7 @@ impl QuicTransmitter {
     async fn call<'ser, T, F>(&'ser self, msg: T, callback: F) -> Result<F::Return>
     where
         T: 'ser + CallMsg<'ser>,
-        F: 'static + Send + Sync + DeserCallback,
+        F: 'static + Send + DeserCallback,
     {
         let recv_stream = self.transmit(Envelope::call(msg)).await?;
         let mut guard = self.recv_buf.lock().await;
@@ -643,12 +643,12 @@ impl Transmitter for QuicTransmitter {
     where
         Self: 'ser,
         T: 'ser + CallMsg<'ser>,
-        F: 'static + Send + Sync + DeserCallback;
+        F: 'static + Send + DeserCallback;
 
     fn call<'ser, T, F>(&'ser self, msg: T, callback: F) -> Self::CallFut<'ser, T, F>
     where
         T: 'ser + CallMsg<'ser>,
-        F: 'static + Send + Sync + DeserCallback,
+        F: 'static + Send + DeserCallback,
     {
         self.call(msg, callback)
     }

+ 3 - 3
crates/btmsg/tests/tests.rs

@@ -246,7 +246,7 @@ async fn message_received_from_path_is_correct() {
 
 #[tokio::test]
 async fn reply_to_read() {
-    let (mut sender, _receiver) = file_server().await;
+    let (sender, _receiver) = file_server().await;
     let reply = sender
         .call_through::<Msg>(Msg::Read { offset: 2, size: 2 })
         .await
@@ -261,7 +261,7 @@ async fn reply_to_read() {
 
 #[tokio::test]
 async fn call_twice() {
-    let (mut sender, _receiver) = file_server().await;
+    let (sender, _receiver) = file_server().await;
 
     let reply = sender
         .call_through::<Msg>(Msg::Write {
@@ -291,7 +291,7 @@ async fn call_twice() {
 async fn separate_transmitter() {
     let (_sender, receiver) = file_server().await;
     let creds = proc_creds();
-    let mut transmitter = transmitter(receiver.addr().clone(), Arc::new(creds))
+    let transmitter = transmitter(receiver.addr().clone(), Arc::new(creds))
         .await
         .unwrap();