server.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. // SPDX-License-Identifier: AGPL-3.0-or-later
  2. use crate::{
  3. msg::{Read as ReadMsg, *},
  4. Handle, Inode,
  5. };
  6. use btlib::{crypto::Creds, BlockPath, IssuedProcRec, Result};
  7. use btmsg::{receiver, MsgCallback, MsgReceived, Receiver};
  8. use core::future::Future;
  9. use std::{io::Read, net::IpAddr, sync::Arc};
  10. pub trait FsProvider: Send + Sync {
  11. type LookupFut<'c>: Send + Future<Output = Result<LookupReply>>
  12. where
  13. Self: 'c;
  14. fn lookup<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lookup<'c>) -> Self::LookupFut<'c>;
  15. type CreateFut<'c>: Send + Future<Output = Result<CreateReply>>
  16. where
  17. Self: 'c;
  18. fn create<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Create<'c>) -> Self::CreateFut<'c>;
  19. type OpenFut<'c>: Send + Future<Output = Result<OpenReply>>
  20. where
  21. Self: 'c;
  22. fn open<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c>;
  23. fn read<'c, R, F>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg, callback: F) -> Result<R>
  24. where
  25. F: 'c + FnOnce(&[u8]) -> R;
  26. type WriteFut<'c>: Send + Future<Output = Result<WriteReply>>
  27. where
  28. Self: 'c;
  29. fn write<'c, R>(
  30. &'c self,
  31. from: &'c Arc<BlockPath>,
  32. inode: Inode,
  33. handle: Handle,
  34. offset: u64,
  35. size: u64,
  36. reader: R,
  37. ) -> Self::WriteFut<'c>
  38. where
  39. R: 'c + Read;
  40. type FlushFut<'c>: Send + Future<Output = Result<()>>
  41. where
  42. Self: 'c;
  43. fn flush<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Flush) -> Self::FlushFut<'c>;
  44. type ReadDirFut<'c>: Send + Future<Output = Result<ReadDirReply>>
  45. where
  46. Self: 'c;
  47. fn read_dir<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadDir) -> Self::ReadDirFut<'c>;
  48. type LinkFut<'c>: Send + Future<Output = Result<LinkReply>>
  49. where
  50. Self: 'c;
  51. fn link<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Link) -> Self::LinkFut<'c>;
  52. type UnlinkFut<'c>: Send + Future<Output = Result<()>>
  53. where
  54. Self: 'c;
  55. fn unlink<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlink) -> Self::UnlinkFut<'c>;
  56. type ReadMetaFut<'c>: Send + Future<Output = Result<ReadMetaReply>>
  57. where
  58. Self: 'c;
  59. fn read_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMeta) -> Self::ReadMetaFut<'c>;
  60. type WriteMetaFut<'c>: Send + Future<Output = Result<WriteMetaReply>>
  61. where
  62. Self: 'c;
  63. fn write_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: WriteMeta)
  64. -> Self::WriteMetaFut<'c>;
  65. type AllocateFut<'c>: Send + Future<Output = Result<()>>
  66. where
  67. Self: 'c;
  68. fn allocate<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Allocate) -> Self::AllocateFut<'c>;
  69. type CloseFut<'c>: Send + Future<Output = Result<()>>
  70. where
  71. Self: 'c;
  72. fn close<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Close) -> Self::CloseFut<'c>;
  73. type ForgetFut<'c>: Send + Future<Output = Result<()>>
  74. where
  75. Self: 'c;
  76. fn forget<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Forget) -> Self::ForgetFut<'c>;
  77. type LockFut<'c>: Send + Future<Output = Result<()>>
  78. where
  79. Self: 'c;
  80. fn lock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lock) -> Self::LockFut<'c>;
  81. type UnlockFut<'c>: Send + Future<Output = Result<()>>
  82. where
  83. Self: 'c;
  84. fn unlock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlock) -> Self::UnlockFut<'c>;
  85. type AddReacapFut<'c>: Send + Future<Output = Result<()>>
  86. where
  87. Self: 'c;
  88. fn add_readcap<'c>(
  89. &'c self,
  90. from: &'c Arc<BlockPath>,
  91. msg: AddReadcap,
  92. ) -> Self::AddReacapFut<'c>;
  93. type GrantAccessFut<'c>: Send + Future<Output = Result<()>>
  94. where
  95. Self: 'c;
  96. fn grant_access<'c>(
  97. &'c self,
  98. from: &'c Arc<BlockPath>,
  99. msg: IssuedProcRec,
  100. ) -> Self::GrantAccessFut<'c>;
  101. }
  102. impl<P: FsProvider> FsProvider for &P {
  103. type LookupFut<'c> = P::LookupFut<'c> where Self: 'c;
  104. fn lookup<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lookup<'c>) -> Self::LookupFut<'c> {
  105. (*self).lookup(from, msg)
  106. }
  107. type CreateFut<'c> = P::CreateFut<'c> where Self: 'c;
  108. fn create<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Create<'c>) -> Self::CreateFut<'c> {
  109. (*self).create(from, msg)
  110. }
  111. type OpenFut<'c> = P::OpenFut<'c> where Self: 'c;
  112. fn open<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c> {
  113. (*self).open(from, msg)
  114. }
  115. fn read<'c, R, F>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMsg, callback: F) -> Result<R>
  116. where
  117. F: 'c + FnOnce(&[u8]) -> R,
  118. {
  119. (*self).read(from, msg, callback)
  120. }
  121. type WriteFut<'c> = P::WriteFut<'c> where Self: 'c;
  122. fn write<'c, R>(
  123. &'c self,
  124. from: &'c Arc<BlockPath>,
  125. inode: Inode,
  126. handle: Handle,
  127. offset: u64,
  128. size: u64,
  129. reader: R,
  130. ) -> Self::WriteFut<'c>
  131. where
  132. R: 'c + Read,
  133. {
  134. (*self).write(from, inode, handle, offset, size, reader)
  135. }
  136. type FlushFut<'c> = P::FlushFut<'c> where Self: 'c;
  137. fn flush<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Flush) -> Self::FlushFut<'c> {
  138. (*self).flush(from, msg)
  139. }
  140. type ReadDirFut<'c> = P::ReadDirFut<'c> where Self: 'c;
  141. fn read_dir<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadDir) -> Self::ReadDirFut<'c> {
  142. (*self).read_dir(from, msg)
  143. }
  144. type LinkFut<'c> = P::LinkFut<'c> where Self: 'c;
  145. fn link<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Link) -> Self::LinkFut<'c> {
  146. (*self).link(from, msg)
  147. }
  148. type UnlinkFut<'c> = P::UnlinkFut<'c> where Self: 'c;
  149. fn unlink<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlink) -> Self::UnlinkFut<'c> {
  150. (*self).unlink(from, msg)
  151. }
  152. type ReadMetaFut<'c> = P::ReadMetaFut<'c> where Self: 'c;
  153. fn read_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMeta) -> Self::ReadMetaFut<'c> {
  154. (*self).read_meta(from, msg)
  155. }
  156. type WriteMetaFut<'c> = P::WriteMetaFut<'c> where Self: 'c;
  157. fn write_meta<'c>(
  158. &'c self,
  159. from: &'c Arc<BlockPath>,
  160. msg: WriteMeta,
  161. ) -> Self::WriteMetaFut<'c> {
  162. (*self).write_meta(from, msg)
  163. }
  164. type AllocateFut<'c> = P::AllocateFut<'c> where Self: 'c;
  165. fn allocate<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Allocate) -> Self::AllocateFut<'c> {
  166. (*self).allocate(from, msg)
  167. }
  168. type CloseFut<'c> = P::CloseFut<'c> where Self: 'c;
  169. fn close<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Close) -> Self::CloseFut<'c> {
  170. (*self).close(from, msg)
  171. }
  172. type ForgetFut<'c> = P::ForgetFut<'c> where Self: 'c;
  173. fn forget<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Forget) -> Self::ForgetFut<'c> {
  174. (*self).forget(from, msg)
  175. }
  176. type LockFut<'c> = P::LockFut<'c> where Self: 'c;
  177. fn lock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lock) -> Self::LockFut<'c> {
  178. (*self).lock(from, msg)
  179. }
  180. type UnlockFut<'c> = P::UnlockFut<'c> where Self: 'c;
  181. fn unlock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlock) -> Self::UnlockFut<'c> {
  182. (*self).unlock(from, msg)
  183. }
  184. type AddReacapFut<'c> = P::AddReacapFut<'c> where Self: 'c;
  185. fn add_readcap<'c>(
  186. &'c self,
  187. from: &'c Arc<BlockPath>,
  188. msg: AddReadcap,
  189. ) -> Self::AddReacapFut<'c> {
  190. (*self).add_readcap(from, msg)
  191. }
  192. type GrantAccessFut<'c> = P::GrantAccessFut<'c> where Self: 'c;
  193. fn grant_access<'c>(
  194. &'c self,
  195. from: &'c Arc<BlockPath>,
  196. msg: IssuedProcRec,
  197. ) -> Self::GrantAccessFut<'c> {
  198. (*self).grant_access(from, msg)
  199. }
  200. }
  201. struct ServerCallback<P> {
  202. provider: Arc<P>,
  203. }
  204. impl<P> ServerCallback<P> {
  205. fn new(provider: Arc<P>) -> Self {
  206. Self { provider }
  207. }
  208. }
  209. impl<P> Clone for ServerCallback<P> {
  210. fn clone(&self) -> Self {
  211. Self {
  212. provider: self.provider.clone(),
  213. }
  214. }
  215. }
  216. impl<P: 'static + Send + Sync + FsProvider> MsgCallback for ServerCallback<P> {
  217. type Arg<'de> = FsMsg<'de>;
  218. type CallFut<'de> = impl 'de + Future<Output = btlib::Result<()>>;
  219. fn call<'de>(&'de self, arg: MsgReceived<FsMsg<'de>>) -> Self::CallFut<'de> {
  220. async move {
  221. let (from, body, replier) = arg.into_parts();
  222. let provider = &self.provider;
  223. let reply = match body {
  224. FsMsg::Lookup(lookup) => FsReply::Lookup(provider.lookup(&from, lookup).await?),
  225. FsMsg::Create(create) => FsReply::Create(provider.create(&from, create).await?),
  226. FsMsg::Open(open) => FsReply::Open(provider.open(&from, open).await?),
  227. FsMsg::Read(read) => {
  228. let buf = provider.read(&from, read, move |data| {
  229. // TODO: Avoid allocating a buffer on every read. If possible, avoid coping
  230. // data altogether.
  231. let mut buf = Vec::with_capacity(data.len());
  232. buf.extend_from_slice(data);
  233. buf
  234. })?;
  235. let mut replier = replier.unwrap();
  236. replier
  237. .reply(FsReply::Read(ReadReply { data: &buf }))
  238. .await?;
  239. return Ok(());
  240. }
  241. FsMsg::Write(Write {
  242. inode,
  243. handle,
  244. offset,
  245. data,
  246. }) => FsReply::Write(
  247. provider
  248. .write(&from, inode, handle, offset, data.len() as u64, data)
  249. .await?,
  250. ),
  251. FsMsg::Flush(flush) => FsReply::Ack(provider.flush(&from, flush).await?),
  252. FsMsg::ReadDir(read_dir) => {
  253. FsReply::ReadDir(provider.read_dir(&from, read_dir).await?)
  254. }
  255. FsMsg::Link(link) => FsReply::Link(provider.link(&from, link).await?),
  256. FsMsg::Unlink(unlink) => FsReply::Ack(provider.unlink(&from, unlink).await?),
  257. FsMsg::ReadMeta(read_meta) => {
  258. FsReply::ReadMeta(provider.read_meta(&from, read_meta).await?)
  259. }
  260. FsMsg::WriteMeta(write_meta) => {
  261. FsReply::WriteMeta(provider.write_meta(&from, write_meta).await?)
  262. }
  263. FsMsg::Allocate(allocate) => {
  264. FsReply::Ack(provider.allocate(&from, allocate).await?)
  265. }
  266. FsMsg::Close(close) => FsReply::Ack(provider.close(&from, close).await?),
  267. FsMsg::Forget(forget) => FsReply::Ack(provider.forget(&from, forget).await?),
  268. FsMsg::Lock(lock) => FsReply::Ack(provider.lock(&from, lock).await?),
  269. FsMsg::Unlock(unlock) => FsReply::Ack(provider.unlock(&from, unlock).await?),
  270. FsMsg::AddReadcap(add_readcap) => {
  271. FsReply::Ack(provider.add_readcap(&from, add_readcap).await?)
  272. }
  273. FsMsg::GrantAccess(proc_rec) => {
  274. FsReply::Ack(provider.grant_access(&from, proc_rec).await?)
  275. }
  276. };
  277. replier.unwrap().reply(reply).await
  278. }
  279. }
  280. }
  281. pub fn new_fs_server<C, P>(
  282. ip_addr: IpAddr,
  283. creds: Arc<C>,
  284. provider: Arc<P>,
  285. ) -> Result<impl Receiver>
  286. where
  287. C: 'static + Send + Sync + Creds,
  288. P: 'static + Send + Sync + FsProvider,
  289. {
  290. receiver(ip_addr, creds, ServerCallback::new(provider))
  291. }