123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- // SPDX-License-Identifier: AGPL-3.0-or-later
- use crate::{
- msg::{Read as ReadMsg, *},
- Handle, Inode,
- };
- use btlib::{crypto::Creds, BlockPath, IssuedProcRec, Result};
- use btmsg::{receiver, MsgCallback, MsgReceived, Receiver};
- use core::future::Future;
- use std::{io::Read, net::IpAddr, sync::Arc};
- pub trait FsProvider: Send + Sync {
- type LookupFut<'c>: Send + Future<Output = Result<LookupReply>>
- where
- Self: 'c;
- fn lookup<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lookup<'c>) -> Self::LookupFut<'c>;
- type CreateFut<'c>: Send + Future<Output = Result<CreateReply>>
- where
- Self: 'c;
- fn create<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Create<'c>) -> Self::CreateFut<'c>;
- type OpenFut<'c>: Send + Future<Output = Result<OpenReply>>
- where
- Self: 'c;
- fn open<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c>;
- fn read<'c, R, F>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg, callback: F) -> Result<R>
- where
- F: 'c + FnOnce(&[u8]) -> R;
- type WriteFut<'c>: Send + Future<Output = Result<WriteReply>>
- where
- Self: 'c;
- fn write<'c, R>(
- &'c self,
- from: &'c Arc<BlockPath>,
- inode: Inode,
- handle: Handle,
- offset: u64,
- size: u64,
- reader: R,
- ) -> Self::WriteFut<'c>
- where
- R: 'c + Read;
- type FlushFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn flush<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Flush) -> Self::FlushFut<'c>;
- type ReadDirFut<'c>: Send + Future<Output = Result<ReadDirReply>>
- where
- Self: 'c;
- fn read_dir<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadDir) -> Self::ReadDirFut<'c>;
- type LinkFut<'c>: Send + Future<Output = Result<LinkReply>>
- where
- Self: 'c;
- fn link<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Link) -> Self::LinkFut<'c>;
- type UnlinkFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn unlink<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlink) -> Self::UnlinkFut<'c>;
- type ReadMetaFut<'c>: Send + Future<Output = Result<ReadMetaReply>>
- where
- Self: 'c;
- fn read_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMeta) -> Self::ReadMetaFut<'c>;
- type WriteMetaFut<'c>: Send + Future<Output = Result<WriteMetaReply>>
- where
- Self: 'c;
- fn write_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: WriteMeta)
- -> Self::WriteMetaFut<'c>;
- type AllocateFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn allocate<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Allocate) -> Self::AllocateFut<'c>;
- type CloseFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn close<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Close) -> Self::CloseFut<'c>;
- type ForgetFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn forget<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Forget) -> Self::ForgetFut<'c>;
- type LockFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn lock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lock) -> Self::LockFut<'c>;
- type UnlockFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn unlock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlock) -> Self::UnlockFut<'c>;
- type AddReacapFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn add_readcap<'c>(
- &'c self,
- from: &'c Arc<BlockPath>,
- msg: AddReadcap,
- ) -> Self::AddReacapFut<'c>;
- type GrantAccessFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn grant_access<'c>(
- &'c self,
- from: &'c Arc<BlockPath>,
- msg: IssuedProcRec,
- ) -> Self::GrantAccessFut<'c>;
- }
- impl<P: FsProvider> FsProvider for &P {
- type LookupFut<'c> = P::LookupFut<'c> where Self: 'c;
- fn lookup<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lookup<'c>) -> Self::LookupFut<'c> {
- (*self).lookup(from, msg)
- }
- type CreateFut<'c> = P::CreateFut<'c> where Self: 'c;
- fn create<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Create<'c>) -> Self::CreateFut<'c> {
- (*self).create(from, msg)
- }
- type OpenFut<'c> = P::OpenFut<'c> where Self: 'c;
- fn open<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c> {
- (*self).open(from, msg)
- }
- fn read<'c, R, F>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg, callback: F) -> Result<R>
- where
- F: 'c + FnOnce(&[u8]) -> R,
- {
- (*self).read(from, msg, callback)
- }
- type WriteFut<'c> = P::WriteFut<'c> where Self: 'c;
- fn write<'c, R>(
- &'c self,
- from: &'c Arc<BlockPath>,
- inode: Inode,
- handle: Handle,
- offset: u64,
- size: u64,
- reader: R,
- ) -> Self::WriteFut<'c>
- where
- R: 'c + Read,
- {
- (*self).write(from, inode, handle, offset, size, reader)
- }
- type FlushFut<'c> = P::FlushFut<'c> where Self: 'c;
- fn flush<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Flush) -> Self::FlushFut<'c> {
- (*self).flush(from, msg)
- }
- type ReadDirFut<'c> = P::ReadDirFut<'c> where Self: 'c;
- fn read_dir<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadDir) -> Self::ReadDirFut<'c> {
- (*self).read_dir(from, msg)
- }
- type LinkFut<'c> = P::LinkFut<'c> where Self: 'c;
- fn link<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Link) -> Self::LinkFut<'c> {
- (*self).link(from, msg)
- }
- type UnlinkFut<'c> = P::UnlinkFut<'c> where Self: 'c;
- fn unlink<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlink) -> Self::UnlinkFut<'c> {
- (*self).unlink(from, msg)
- }
- type ReadMetaFut<'c> = P::ReadMetaFut<'c> where Self: 'c;
- fn read_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMeta) -> Self::ReadMetaFut<'c> {
- (*self).read_meta(from, msg)
- }
- type WriteMetaFut<'c> = P::WriteMetaFut<'c> where Self: 'c;
- fn write_meta<'c>(
- &'c self,
- from: &'c Arc<BlockPath>,
- msg: WriteMeta,
- ) -> Self::WriteMetaFut<'c> {
- (*self).write_meta(from, msg)
- }
- type AllocateFut<'c> = P::AllocateFut<'c> where Self: 'c;
- fn allocate<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Allocate) -> Self::AllocateFut<'c> {
- (*self).allocate(from, msg)
- }
- type CloseFut<'c> = P::CloseFut<'c> where Self: 'c;
- fn close<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Close) -> Self::CloseFut<'c> {
- (*self).close(from, msg)
- }
- type ForgetFut<'c> = P::ForgetFut<'c> where Self: 'c;
- fn forget<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Forget) -> Self::ForgetFut<'c> {
- (*self).forget(from, msg)
- }
- type LockFut<'c> = P::LockFut<'c> where Self: 'c;
- fn lock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lock) -> Self::LockFut<'c> {
- (*self).lock(from, msg)
- }
- type UnlockFut<'c> = P::UnlockFut<'c> where Self: 'c;
- fn unlock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlock) -> Self::UnlockFut<'c> {
- (*self).unlock(from, msg)
- }
- type AddReacapFut<'c> = P::AddReacapFut<'c> where Self: 'c;
- fn add_readcap<'c>(
- &'c self,
- from: &'c Arc<BlockPath>,
- msg: AddReadcap,
- ) -> Self::AddReacapFut<'c> {
- (*self).add_readcap(from, msg)
- }
- type GrantAccessFut<'c> = P::GrantAccessFut<'c> where Self: 'c;
- fn grant_access<'c>(
- &'c self,
- from: &'c Arc<BlockPath>,
- msg: IssuedProcRec,
- ) -> Self::GrantAccessFut<'c> {
- (*self).grant_access(from, msg)
- }
- }
- struct ServerCallback<P> {
- provider: Arc<P>,
- }
- impl<P> ServerCallback<P> {
- fn new(provider: Arc<P>) -> Self {
- Self { provider }
- }
- }
- impl<P> Clone for ServerCallback<P> {
- fn clone(&self) -> Self {
- Self {
- provider: self.provider.clone(),
- }
- }
- }
- impl<P: 'static + Send + Sync + FsProvider> MsgCallback for ServerCallback<P> {
- type Arg<'de> = FsMsg<'de>;
- type CallFut<'de> = impl 'de + Future<Output = btlib::Result<()>>;
- fn call<'de>(&'de self, arg: MsgReceived<FsMsg<'de>>) -> Self::CallFut<'de> {
- async move {
- let (from, body, replier) = arg.into_parts();
- let provider = &self.provider;
- let reply = match body {
- FsMsg::Lookup(lookup) => FsReply::Lookup(provider.lookup(&from, lookup).await?),
- FsMsg::Create(create) => FsReply::Create(provider.create(&from, create).await?),
- FsMsg::Open(open) => FsReply::Open(provider.open(&from, open).await?),
- FsMsg::Read(read) => {
- let buf = provider.read(&from, read, move |data| {
- // TODO: Avoid allocating a buffer on every read. If possible, avoid coping
- // data altogether.
- let mut buf = Vec::with_capacity(data.len());
- buf.extend_from_slice(data);
- buf
- })?;
- let mut replier = replier.unwrap();
- replier
- .reply(FsReply::Read(ReadReply { data: &buf }))
- .await?;
- return Ok(());
- }
- FsMsg::Write(Write {
- inode,
- handle,
- offset,
- data,
- }) => FsReply::Write(
- provider
- .write(&from, inode, handle, offset, data.len() as u64, data)
- .await?,
- ),
- FsMsg::Flush(flush) => FsReply::Ack(provider.flush(&from, flush).await?),
- FsMsg::ReadDir(read_dir) => {
- FsReply::ReadDir(provider.read_dir(&from, read_dir).await?)
- }
- FsMsg::Link(link) => FsReply::Link(provider.link(&from, link).await?),
- FsMsg::Unlink(unlink) => FsReply::Ack(provider.unlink(&from, unlink).await?),
- FsMsg::ReadMeta(read_meta) => {
- FsReply::ReadMeta(provider.read_meta(&from, read_meta).await?)
- }
- FsMsg::WriteMeta(write_meta) => {
- FsReply::WriteMeta(provider.write_meta(&from, write_meta).await?)
- }
- FsMsg::Allocate(allocate) => {
- FsReply::Ack(provider.allocate(&from, allocate).await?)
- }
- FsMsg::Close(close) => FsReply::Ack(provider.close(&from, close).await?),
- FsMsg::Forget(forget) => FsReply::Ack(provider.forget(&from, forget).await?),
- FsMsg::Lock(lock) => FsReply::Ack(provider.lock(&from, lock).await?),
- FsMsg::Unlock(unlock) => FsReply::Ack(provider.unlock(&from, unlock).await?),
- FsMsg::AddReadcap(add_readcap) => {
- FsReply::Ack(provider.add_readcap(&from, add_readcap).await?)
- }
- FsMsg::GrantAccess(proc_rec) => {
- FsReply::Ack(provider.grant_access(&from, proc_rec).await?)
- }
- };
- replier.unwrap().reply(reply).await
- }
- }
- }
- pub fn new_fs_server<C, P>(
- ip_addr: IpAddr,
- creds: Arc<C>,
- provider: Arc<P>,
- ) -> Result<impl Receiver>
- where
- C: 'static + Send + Sync + Creds,
- P: 'static + Send + Sync + FsProvider,
- {
- receiver(ip_addr, creds, ServerCallback::new(provider))
- }
|