123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- use crate::msg::*;
- use btlib::{crypto::Creds, BlockMeta, BlockPath, Result};
- use btmsg::{receiver, MsgCallback, MsgReceived, Receiver};
- use core::future::Future;
- use std::{net::IpAddr, sync::Arc};
- use tokio::runtime::Handle as RuntimeHandle;
- pub trait FsProvider {
- type LookupFut<'c>: Send + Future<Output = Result<LookupReply>>
- where
- Self: 'c;
- fn lookup<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lookup<'c>) -> Self::LookupFut<'c>;
- type CreateFut<'c>: Send + Future<Output = Result<CreateReply>>
- where
- Self: 'c;
- fn create<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Create<'c>) -> Self::CreateFut<'c>;
- type OpenFut<'c>: Send + Future<Output = Result<OpenReply>>
- where
- Self: 'c;
- fn open<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c>;
- fn read<'c, R, F>(&'c self, from: &'c Arc<BlockPath>, msg: Read, callback: F) -> Result<R>
- where
- F: 'c + Send + FnOnce(&[u8]) -> R;
- type WriteFut<'c>: Send + Future<Output = Result<WriteReply>>
- where
- Self: 'c;
- fn write<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Write) -> Self::WriteFut<'c>;
- type FlushFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn flush<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Flush) -> Self::FlushFut<'c>;
- type ReadDirFut<'c>: Send + Future<Output = Result<ReadDirReply>>
- where
- Self: 'c;
- fn read_dir<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadDir) -> Self::ReadDirFut<'c>;
- type LinkFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn link<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Link) -> Self::LinkFut<'c>;
- type UnlinkFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn unlink<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlink) -> Self::UnlinkFut<'c>;
- type ReadMetaFut<'c>: Send + Future<Output = Result<BlockMeta>>
- where
- Self: 'c;
- fn read_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMeta) -> Self::ReadMetaFut<'c>;
- type WriteMetaFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn write_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: WriteMeta)
- -> Self::WriteMetaFut<'c>;
- type CloseFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn close<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Close) -> Self::CloseFut<'c>;
- type ForgetFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn forget<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Forget) -> Self::ForgetFut<'c>;
- type LockFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn lock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lock) -> Self::LockFut<'c>;
- type UnlockFut<'c>: Send + Future<Output = Result<()>>
- where
- Self: 'c;
- fn unlock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlock) -> Self::UnlockFut<'c>;
- }
- struct ServerCallback<P> {
- provider: Arc<P>,
- }
- impl<P> ServerCallback<P> {
- fn new(provider: Arc<P>) -> Self {
- Self { provider }
- }
- }
- impl<P> Clone for ServerCallback<P> {
- fn clone(&self) -> Self {
- Self {
- provider: self.provider.clone(),
- }
- }
- }
- impl<P: 'static + Send + Sync + FsProvider> MsgCallback for ServerCallback<P> {
- type Arg<'de> = FsMsg<'de>;
- type CallFut<'de> = impl 'de + Future<Output = btlib::Result<()>>;
- fn call<'de>(&'de self, arg: MsgReceived<FsMsg<'de>>) -> Self::CallFut<'de> {
- async move {
- let (from, body, replier) = arg.into_parts();
- let provider = &self.provider;
- let reply = match body {
- FsMsg::Lookup(lookup) => FsReply::Lookup(provider.lookup(&from, lookup).await?),
- FsMsg::Create(create) => FsReply::Create(provider.create(&from, create).await?),
- FsMsg::Open(open) => FsReply::Open(provider.open(&from, open).await?),
- FsMsg::Read(read) => {
- return provider.read(&from, read, move |data| {
- let mut replier = replier.unwrap();
- RuntimeHandle::current()
- .block_on(replier.reply(FsReply::Read(ReadReply { data })))
- })?;
- }
- FsMsg::Write(write) => FsReply::Write(provider.write(&from, write).await?),
- FsMsg::Flush(flush) => FsReply::Ack(provider.flush(&from, flush).await?),
- FsMsg::ReadDir(read_dir) => {
- FsReply::ReadDir(provider.read_dir(&from, read_dir).await?)
- }
- FsMsg::Link(link) => FsReply::Ack(provider.link(&from, link).await?),
- FsMsg::Unlink(unlink) => FsReply::Ack(provider.unlink(&from, unlink).await?),
- FsMsg::ReadMeta(read_meta) => {
- FsReply::ReadMeta(provider.read_meta(&from, read_meta).await?)
- }
- FsMsg::WriteMeta(write_meta) => {
- FsReply::Ack(provider.write_meta(&from, write_meta).await?)
- }
- FsMsg::Close(close) => FsReply::Ack(provider.close(&from, close).await?),
- FsMsg::Forget(forget) => FsReply::Ack(provider.forget(&from, forget).await?),
- FsMsg::Lock(lock) => FsReply::Ack(provider.lock(&from, lock).await?),
- FsMsg::Unlock(unlock) => FsReply::Ack(provider.unlock(&from, unlock).await?),
- };
- replier.unwrap().reply(reply).await
- }
- }
- }
- pub fn new_fs_server<C, P>(
- ip_addr: IpAddr,
- creds: Arc<C>,
- provider: Arc<P>,
- ) -> Result<impl Receiver>
- where
- C: 'static + Send + Sync + Creds,
- P: 'static + Send + Sync + FsProvider,
- {
- receiver(ip_addr, creds, ServerCallback::new(provider))
- }
|