Matthew Carr преди 2 години
родител
ревизия
aa3a89ee9d
променени са 3 файла, в които са добавени 79 реда и са изтрити 81 реда
  1. 26 19
      crates/btfproto/src/local_fs.rs
  2. 3 3
      crates/btfproto/src/msg.rs
  3. 50 59
      crates/btfproto/src/server.rs

+ 26 - 19
crates/btfproto/src/local_fs.rs

@@ -969,7 +969,7 @@ mod private {
     }
 
     impl<'a, C: 'a + Send + Sync + Principaled + Decrypter + Signer> ReadingGuard<'a> for TopGuard<'a, C> {
-        type ReadGuard<'b> = InodeGuard<'b, RwLockReadGuard<'b, InodeTableValue<C>>> where Self: 'b;
+        type ReadGuard<'b> = InodeGuard<'b, RwLockReadGuard<'b, InodeTableValue<C>>> where C: 'b, Self: 'b;
         type ReadFut<'b> = impl 'b + Send + Future<Output = Result<Self::ReadGuard<'b>>> where Self: 'b;
         fn read(&self) -> Self::ReadFut<'_> {
             async move {
@@ -984,9 +984,9 @@ mod private {
             }
         }
 
-        type WriteGuard<'b> = InodeGuard<'b, RwLockWriteGuard<'b, InodeTableValue<C>>> where Self: 'b;
-        type WriteFut<'b> = impl 'b + Send + Future<Output = Result<Self::WriteGuard<'b>>> where Self: 'b;
-        fn write(&self) -> Self::WriteFut<'_> {
+        type WriteGuard = InodeGuard<'a, RwLockWriteGuard<'a, InodeTableValue<C>>>;
+        type WriteFut = impl 'a + Send + Future<Output = Result<Self::WriteGuard>>;
+        fn write(&'a self) -> Self::WriteFut {
             async move {
                 let guard = self.guard.write(self.inode).await?;
                 Ok(InodeGuard {
@@ -1008,7 +1008,7 @@ mod private {
         size: u64,
     }
 
-    impl<'a, C: 'a + Sync + Principaled + Decrypter + Signer, T: Send + Sync + Deref<Target = InodeTableValue<C>>> ReadGuard<'a> for InodeGuard<'a, T> {
+    impl<'a, C: 'a + Sync + Principaled + Decrypter + Signer, T: 'a + Send + Sync + Deref<Target = InodeTableValue<C>>> ReadGuard<'a> for InodeGuard<'a, T> {
         type BufGuard<'b> = BufferGuard<'b, C> where Self: 'b;
         type BufFut<'b> = impl 'b + Send + Future<Output = Result<Option<Self::BufGuard<'b>>>> where Self: 'b;
         fn get_buf(&self) -> Self::BufFut<'_>
@@ -1254,7 +1254,21 @@ mod private {
         }
 
         type ReadingGuard<'a> = TopGuard<'a, C> where Self: 'a;
-        //type ReadFut<'c> = Ready<Result<Self::ReadingGuard<'c>>>;
+        type ReadFut<'c> = impl 'c + Send + Future<Output = Result<Self::ReadingGuard<'c>>>;
+        fn read2<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Read) -> Self::ReadFut<'c> {
+            async move {
+                let Read { inode, handle, offset, size } = msg;
+                let guard = self.table_guard().await;
+                Ok(TopGuard {
+                    guard,
+                    from,
+                    inode,
+                    handle,
+                    offset,
+                    size,
+                })
+            }
+        }
 
         fn read<'c, R, F>(&'c self, from: &'c Arc<BlockPath>, msg: Read, callback: F) -> Result<R>
         where
@@ -1308,21 +1322,15 @@ mod private {
             //Ok(output)
         }
 
-        type WriteFut<'r, R> = impl 'r + Send + Future<Output = Result<WriteReply>> where R: 'r + Send + IoRead;
-        fn write<'c, 'r, R>(
+        type WriteFut<'r> = impl 'r + Send + Future<Output = Result<WriteReply>>;
+        fn write<'c>(
             &'c self,
             from: &'c Arc<BlockPath>,
-            inode: Inode,
-            handle: Handle,
-            offset: u64,
-            size: u64,
-            reader: R,
-        ) -> Self::WriteFut<'r, R>
-        where
-            'c: 'r,
-            R: 'r + IoRead + Send,
+            write: Write<&'c [u8]>,
+        ) -> Self::WriteFut<'c>
         {
             async move {
+                let Write { inode, handle, offset, mut data } = write;
                 debug!("write: inode {inode}, handle {handle}, offset {offset}");
                 let table_guard = self.table_guard().await;
                 let mut value_guard = table_guard.write(inode).await?;
@@ -1332,8 +1340,7 @@ mod private {
                 if offset != pos {
                     block.seek(SeekFrom::Start(offset))?;
                 }
-                let size = size.try_into().display_err()?;
-                let written = block.write_from(reader, size)?;
+                let written = std::io::copy(&mut data, block.deref_mut())?;
                 Ok(WriteReply {
                     written: written as u64,
                 })

+ 3 - 3
crates/btfproto/src/msg.rs

@@ -25,7 +25,7 @@ pub enum FsMsg<'a> {
     Open(Open),
     Read(Read),
     #[serde(borrow)]
-    Write(Write<'a>),
+    Write(Write<&'a [u8]>),
     Flush(Flush),
     ReadDir(ReadDir),
     #[serde(borrow)]
@@ -459,11 +459,11 @@ pub struct ReadReply<'a> {
 }
 
 #[derive(Serialize, Deserialize)]
-pub struct Write<'a> {
+pub struct Write<R> {
     pub inode: Inode,
     pub handle: Handle,
     pub offset: u64,
-    pub data: &'a [u8],
+    pub data: R,
 }
 
 #[derive(Serialize, Deserialize)]

+ 50 - 59
crates/btfproto/src/server.rs

@@ -9,19 +9,19 @@ use btmsg::{receiver, MsgCallback, MsgReceived, Receiver};
 use core::future::Future;
 use std::{io::Read, net::IpAddr, sync::Arc, ops::Deref};
 
-pub trait ReadingGuard<'a> {
-    type ReadGuard<'b>: ReadGuard<'b> where Self: 'b;
-    type ReadFut<'b>: Send + Future<Output = Result<Self::ReadGuard<'b>>> where Self: 'b;
+pub trait ReadingGuard<'a>: Send + Sync {
+    type ReadGuard<'b>: 'b + ReadGuard<'b> where Self: 'b;
+    type ReadFut<'b>: 'b + Send + Future<Output = Result<Self::ReadGuard<'b>>> where Self: 'b;
     fn read(&self) -> Self::ReadFut<'_>;
 
-    type WriteGuard<'b>: WriteGuard<'b> where Self: 'b;
-    type WriteFut<'b>: Send + Future<Output = Result<Self::WriteGuard<'b>>> where Self: 'b;
-    fn write(&self) -> Self::WriteFut<'_>;
+    type WriteGuard: WriteGuard<'a>;
+    type WriteFut: Send + Future<Output = Result<Self::WriteGuard>>;
+    fn write(&'a self) -> Self::WriteFut;
 }
 
 pub trait ReadGuard<'a> {
-    type BufGuard<'b>: 'b + Deref<Target = [u8]> where Self: 'b;
-    type BufFut<'b>: Send + Future<Output = Result<Option<Self::BufGuard<'b>>>> where Self: 'b;
+    type BufGuard<'b>: 'b + Send + Deref<Target = [u8]> where Self: 'b;
+    type BufFut<'b>: 'b + Send + Future<Output = Result<Option<Self::BufGuard<'b>>>> where Self: 'b;
     fn get_buf(&self) -> Self::BufFut<'_>;
 }
 
@@ -47,29 +47,21 @@ pub trait FsProvider: Send + Sync {
     fn open<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c>;
 
     type ReadingGuard<'a>: 'a + ReadingGuard<'a> where Self: 'a;
-    //type ReadFut<'c>: Send + Future<Output = Result<Self::ReadingGuard<'c>>> where Self: 'c;
-    //fn read2<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg) -> Self::ReadFut<'c>;
+    type ReadFut<'c>: Send + Future<Output = Result<()>> where Self: 'c;
+    fn read2<'c>(&'c self, from: &'c Arc<BlockPath>, guard: &mut Self::ReadingGuard<'c>, msg: ReadMsg) -> Self::ReadFut<'c>;
 
     fn read<'c, R, F>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg, callback: F) -> Result<R>
     where
         F: 'c + FnOnce(&[u8]) -> R;
 
-    type WriteFut<'r, R>: Send + Future<Output = Result<WriteReply>>
+    type WriteFut<'r>: Send + Future<Output = Result<WriteReply>>
     where
-        Self: 'r,
-        R: 'r + Send + Read;
-    fn write<'c, 'r, R>(
+        Self: 'r;
+    fn write<'c>(
         &'c self,
         from: &'c Arc<BlockPath>,
-        inode: Inode,
-        handle: Handle,
-        offset: u64,
-        size: u64,
-        reader: R,
-    ) -> Self::WriteFut<'r, R>
-    where
-        'c: 'r,
-        R: 'r + Read + Send;
+        write: Write<&'c [u8]>
+    ) -> Self::WriteFut<'c>;
 
     type FlushFut<'c>: Send + Future<Output = Result<()>>
     where
@@ -163,7 +155,10 @@ impl<P: FsProvider> FsProvider for &P {
     }
 
     type ReadingGuard<'c> = P::ReadingGuard<'c> where Self: 'c;
-    //type ReadFut<'c> = P::ReadFut<'c> where Self: 'c;
+    type ReadFut<'c> = P::ReadFut<'c> where Self: 'c;
+    fn read2<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg) -> Self::ReadFut<'c> {
+        (*self).read2(from, msg)
+    }
 
     fn read<'c, R, F>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg, callback: F) -> Result<R>
     where
@@ -172,21 +167,14 @@ impl<P: FsProvider> FsProvider for &P {
         (*self).read(from, msg, callback)
     }
 
-    type WriteFut<'r, R> = P::WriteFut<'r, R> where Self: 'r, R: 'r + Send + Read;
-    fn write<'c, 'r, R>(
+    type WriteFut<'r> = P::WriteFut<'r> where Self: 'r;
+    fn write<'c>(
         &'c self,
         from: &'c Arc<BlockPath>,
-        inode: Inode,
-        handle: Handle,
-        offset: u64,
-        size: u64,
-        reader: R,
-    ) -> Self::WriteFut<'r, R>
-    where
-        'c: 'r,
-        R: 'r + Read + Send,
+        write: Write<&'c [u8]>
+    ) -> Self::WriteFut<'c>
     {
-        (*self).write(from, inode, handle, offset, size, reader)
+        (*self).write(from, write)
     }
 
     type FlushFut<'c> = P::FlushFut<'c> where Self: 'c;
@@ -291,35 +279,38 @@ impl<P: 'static + Send + Sync + FsProvider> MsgCallback for ServerCallback<P> {
     fn call<'de>(&'de self, arg: MsgReceived<FsMsg<'de>>) -> Self::CallFut<'de> {
         async move {
             let (from, body, replier) = arg.into_parts();
-            let provider = &self.provider;
+            let provider = self.provider.as_ref();
             let reply = match body {
                 FsMsg::Lookup(lookup) => FsReply::Lookup(provider.lookup(&from, lookup).await?),
                 FsMsg::Create(create) => FsReply::Create(provider.create(&from, create).await?),
                 FsMsg::Open(open) => FsReply::Open(provider.open(&from, open).await?),
                 FsMsg::Read(read) => {
-                    let buf = provider.read(&from, read, move |data| {
-                        // TODO: Avoid allocating a buffer on every read. If possible, avoid coping
-                        // data altogether.
-                        let mut buf = Vec::with_capacity(data.len());
-                        buf.extend_from_slice(data);
-                        buf
-                    })?;
-                    let mut replier = replier.unwrap();
-                    replier
-                        .reply(FsReply::Read(ReadReply { data: &buf }))
-                        .await?;
-                    return Ok(());
-                }
-                FsMsg::Write(Write {
-                    inode,
-                    handle,
-                    offset,
-                    data,
-                }) => FsReply::Write(
+                    let top_guard = provider.read2(&from, read).await?;
+                    {
+                        //let read_guard = top_guard.read().await?;
+                        //let option = read_guard.get_buf().await?;
+                        //if let Some(buf) = option {
+                        //    todo!()
+                        //}
+                    }
                     todo!()
-                    //provider
-                    //    .write(&from, inode, handle, offset, data.len() as u64, data)
-                    //    .await?,
+                    //let buf = provider.read(&from, read, move |data| {
+                    //    // TODO: Avoid allocating a buffer on every read. If possible, avoid coping
+                    //    // data altogether.
+                    //    let mut buf = Vec::with_capacity(data.len());
+                    //    buf.extend_from_slice(data);
+                    //    buf
+                    //})?;
+                    //let mut replier = replier.unwrap();
+                    //replier
+                    //    .reply(FsReply::Read(ReadReply { data: &buf }))
+                    //    .await?;
+                    //return Ok(());
+                }
+                FsMsg::Write(write) => FsReply::Write(
+                    provider
+                        .write(&from, write)
+                        .await?,
                 ),
                 FsMsg::Flush(flush) => FsReply::Ack(provider.flush(&from, flush).await?),
                 FsMsg::ReadDir(read_dir) => {