// SPDX-License-Identifier: AGPL-3.0-or-later use crate::{ msg::{Read as ReadMsg, *}, Handle, Inode, }; use btlib::{crypto::Creds, BlockPath, IssuedProcRec, Result}; use btmsg::{receiver, MsgCallback, MsgReceived, Receiver}; use core::future::Future; use std::{io::Read, net::IpAddr, sync::Arc}; pub trait FsProvider: Send + Sync { type LookupFut<'c>: Send + Future> where Self: 'c; fn lookup<'c>(&'c self, from: &'c Arc, msg: Lookup<'c>) -> Self::LookupFut<'c>; type CreateFut<'c>: Send + Future> where Self: 'c; fn create<'c>(&'c self, from: &'c Arc, msg: Create<'c>) -> Self::CreateFut<'c>; type OpenFut<'c>: Send + Future> where Self: 'c; fn open<'c>(&'c self, from: &'c Arc, msg: Open) -> Self::OpenFut<'c>; fn read<'c, R, F>(&'c self, from: &'c Arc, msg: ReadMsg, callback: F) -> Result where F: 'c + FnOnce(&[u8]) -> R; type WriteFut<'c>: Send + Future> where Self: 'c; fn write<'c, R>( &'c self, from: &'c Arc, inode: Inode, handle: Handle, offset: u64, size: u64, reader: R, ) -> Self::WriteFut<'c> where R: 'c + Read; type FlushFut<'c>: Send + Future> where Self: 'c; fn flush<'c>(&'c self, from: &'c Arc, msg: Flush) -> Self::FlushFut<'c>; type ReadDirFut<'c>: Send + Future> where Self: 'c; fn read_dir<'c>(&'c self, from: &'c Arc, msg: ReadDir) -> Self::ReadDirFut<'c>; type LinkFut<'c>: Send + Future> where Self: 'c; fn link<'c>(&'c self, from: &'c Arc, msg: Link) -> Self::LinkFut<'c>; type UnlinkFut<'c>: Send + Future> where Self: 'c; fn unlink<'c>(&'c self, from: &'c Arc, msg: Unlink) -> Self::UnlinkFut<'c>; type ReadMetaFut<'c>: Send + Future> where Self: 'c; fn read_meta<'c>(&'c self, from: &'c Arc, msg: ReadMeta) -> Self::ReadMetaFut<'c>; type WriteMetaFut<'c>: Send + Future> where Self: 'c; fn write_meta<'c>(&'c self, from: &'c Arc, msg: WriteMeta) -> Self::WriteMetaFut<'c>; type AllocateFut<'c>: Send + Future> where Self: 'c; fn allocate<'c>(&'c self, from: &'c Arc, msg: Allocate) -> Self::AllocateFut<'c>; type CloseFut<'c>: Send + Future> where Self: 'c; fn close<'c>(&'c self, from: &'c Arc, msg: Close) -> Self::CloseFut<'c>; type ForgetFut<'c>: Send + Future> where Self: 'c; fn forget<'c>(&'c self, from: &'c Arc, msg: Forget) -> Self::ForgetFut<'c>; type LockFut<'c>: Send + Future> where Self: 'c; fn lock<'c>(&'c self, from: &'c Arc, msg: Lock) -> Self::LockFut<'c>; type UnlockFut<'c>: Send + Future> where Self: 'c; fn unlock<'c>(&'c self, from: &'c Arc, msg: Unlock) -> Self::UnlockFut<'c>; type AddReacapFut<'c>: Send + Future> where Self: 'c; fn add_readcap<'c>( &'c self, from: &'c Arc, msg: AddReadcap, ) -> Self::AddReacapFut<'c>; type GrantAccessFut<'c>: Send + Future> where Self: 'c; fn grant_access<'c>( &'c self, from: &'c Arc, msg: IssuedProcRec, ) -> Self::GrantAccessFut<'c>; } impl FsProvider for &P { type LookupFut<'c> = P::LookupFut<'c> where Self: 'c; fn lookup<'c>(&'c self, from: &'c Arc, msg: Lookup<'c>) -> Self::LookupFut<'c> { (*self).lookup(from, msg) } type CreateFut<'c> = P::CreateFut<'c> where Self: 'c; fn create<'c>(&'c self, from: &'c Arc, msg: Create<'c>) -> Self::CreateFut<'c> { (*self).create(from, msg) } type OpenFut<'c> = P::OpenFut<'c> where Self: 'c; fn open<'c>(&'c self, from: &'c Arc, msg: Open) -> Self::OpenFut<'c> { (*self).open(from, msg) } fn read<'c, R, F>(&'c self, from: &'c Arc, msg: ReadMsg, callback: F) -> Result where F: 'c + FnOnce(&[u8]) -> R, { (*self).read(from, msg, callback) } type WriteFut<'c> = P::WriteFut<'c> where Self: 'c; fn write<'c, R>( &'c self, from: &'c Arc, inode: Inode, handle: Handle, offset: u64, size: u64, reader: R, ) -> Self::WriteFut<'c> where R: 'c + Read, { (*self).write(from, inode, handle, offset, size, reader) } type FlushFut<'c> = P::FlushFut<'c> where Self: 'c; fn flush<'c>(&'c self, from: &'c Arc, msg: Flush) -> Self::FlushFut<'c> { (*self).flush(from, msg) } type ReadDirFut<'c> = P::ReadDirFut<'c> where Self: 'c; fn read_dir<'c>(&'c self, from: &'c Arc, msg: ReadDir) -> Self::ReadDirFut<'c> { (*self).read_dir(from, msg) } type LinkFut<'c> = P::LinkFut<'c> where Self: 'c; fn link<'c>(&'c self, from: &'c Arc, msg: Link) -> Self::LinkFut<'c> { (*self).link(from, msg) } type UnlinkFut<'c> = P::UnlinkFut<'c> where Self: 'c; fn unlink<'c>(&'c self, from: &'c Arc, msg: Unlink) -> Self::UnlinkFut<'c> { (*self).unlink(from, msg) } type ReadMetaFut<'c> = P::ReadMetaFut<'c> where Self: 'c; fn read_meta<'c>(&'c self, from: &'c Arc, msg: ReadMeta) -> Self::ReadMetaFut<'c> { (*self).read_meta(from, msg) } type WriteMetaFut<'c> = P::WriteMetaFut<'c> where Self: 'c; fn write_meta<'c>( &'c self, from: &'c Arc, msg: WriteMeta, ) -> Self::WriteMetaFut<'c> { (*self).write_meta(from, msg) } type AllocateFut<'c> = P::AllocateFut<'c> where Self: 'c; fn allocate<'c>(&'c self, from: &'c Arc, msg: Allocate) -> Self::AllocateFut<'c> { (*self).allocate(from, msg) } type CloseFut<'c> = P::CloseFut<'c> where Self: 'c; fn close<'c>(&'c self, from: &'c Arc, msg: Close) -> Self::CloseFut<'c> { (*self).close(from, msg) } type ForgetFut<'c> = P::ForgetFut<'c> where Self: 'c; fn forget<'c>(&'c self, from: &'c Arc, msg: Forget) -> Self::ForgetFut<'c> { (*self).forget(from, msg) } type LockFut<'c> = P::LockFut<'c> where Self: 'c; fn lock<'c>(&'c self, from: &'c Arc, msg: Lock) -> Self::LockFut<'c> { (*self).lock(from, msg) } type UnlockFut<'c> = P::UnlockFut<'c> where Self: 'c; fn unlock<'c>(&'c self, from: &'c Arc, msg: Unlock) -> Self::UnlockFut<'c> { (*self).unlock(from, msg) } type AddReacapFut<'c> = P::AddReacapFut<'c> where Self: 'c; fn add_readcap<'c>( &'c self, from: &'c Arc, msg: AddReadcap, ) -> Self::AddReacapFut<'c> { (*self).add_readcap(from, msg) } type GrantAccessFut<'c> = P::GrantAccessFut<'c> where Self: 'c; fn grant_access<'c>( &'c self, from: &'c Arc, msg: IssuedProcRec, ) -> Self::GrantAccessFut<'c> { (*self).grant_access(from, msg) } } struct ServerCallback

{ provider: Arc

, } impl

ServerCallback

{ fn new(provider: Arc

) -> Self { Self { provider } } } impl

Clone for ServerCallback

{ fn clone(&self) -> Self { Self { provider: self.provider.clone(), } } } impl MsgCallback for ServerCallback

{ type Arg<'de> = FsMsg<'de>; type CallFut<'de> = impl 'de + Future>; fn call<'de>(&'de self, arg: MsgReceived>) -> Self::CallFut<'de> { async move { let (from, body, replier) = arg.into_parts(); let provider = &self.provider; let reply = match body { FsMsg::Lookup(lookup) => FsReply::Lookup(provider.lookup(&from, lookup).await?), FsMsg::Create(create) => FsReply::Create(provider.create(&from, create).await?), FsMsg::Open(open) => FsReply::Open(provider.open(&from, open).await?), FsMsg::Read(read) => { let buf = provider.read(&from, read, move |data| { // TODO: Avoid allocating a buffer on every read. If possible, avoid coping // data altogether. let mut buf = Vec::with_capacity(data.len()); buf.extend_from_slice(data); buf })?; let mut replier = replier.unwrap(); replier .reply(FsReply::Read(ReadReply { data: &buf })) .await?; return Ok(()); } FsMsg::Write(Write { inode, handle, offset, data, }) => FsReply::Write( provider .write(&from, inode, handle, offset, data.len() as u64, data) .await?, ), FsMsg::Flush(flush) => FsReply::Ack(provider.flush(&from, flush).await?), FsMsg::ReadDir(read_dir) => { FsReply::ReadDir(provider.read_dir(&from, read_dir).await?) } FsMsg::Link(link) => FsReply::Link(provider.link(&from, link).await?), FsMsg::Unlink(unlink) => FsReply::Ack(provider.unlink(&from, unlink).await?), FsMsg::ReadMeta(read_meta) => { FsReply::ReadMeta(provider.read_meta(&from, read_meta).await?) } FsMsg::WriteMeta(write_meta) => { FsReply::WriteMeta(provider.write_meta(&from, write_meta).await?) } FsMsg::Allocate(allocate) => { FsReply::Ack(provider.allocate(&from, allocate).await?) } FsMsg::Close(close) => FsReply::Ack(provider.close(&from, close).await?), FsMsg::Forget(forget) => FsReply::Ack(provider.forget(&from, forget).await?), FsMsg::Lock(lock) => FsReply::Ack(provider.lock(&from, lock).await?), FsMsg::Unlock(unlock) => FsReply::Ack(provider.unlock(&from, unlock).await?), FsMsg::AddReadcap(add_readcap) => { FsReply::Ack(provider.add_readcap(&from, add_readcap).await?) } FsMsg::GrantAccess(proc_rec) => { FsReply::Ack(provider.grant_access(&from, proc_rec).await?) } }; replier.unwrap().reply(reply).await } } } pub fn new_fs_server( ip_addr: IpAddr, creds: Arc, provider: Arc

, ) -> Result where C: 'static + Send + Sync + Creds, P: 'static + Send + Sync + FsProvider, { receiver(ip_addr, creds, ServerCallback::new(provider)) }