server.rs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. use crate::msg::*;
  2. use btlib::{crypto::Creds, BlockMeta, BlockPath, Result};
  3. use btmsg::{receiver, MsgCallback, MsgReceived, Receiver};
  4. use core::future::Future;
  5. use std::{net::IpAddr, sync::Arc};
  6. use tokio::runtime::Handle as RuntimeHandle;
  7. pub trait FsProvider {
  8. type LookupFut<'c>: Send + Future<Output = Result<LookupReply>>
  9. where
  10. Self: 'c;
  11. fn lookup<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lookup<'c>) -> Self::LookupFut<'c>;
  12. type CreateFut<'c>: Send + Future<Output = Result<CreateReply>>
  13. where
  14. Self: 'c;
  15. fn create<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Create<'c>) -> Self::CreateFut<'c>;
  16. type OpenFut<'c>: Send + Future<Output = Result<OpenReply>>
  17. where
  18. Self: 'c;
  19. fn open<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Open) -> Self::OpenFut<'c>;
  20. fn read<'c, R, F>(&'c self, from: &'c Arc<BlockPath>, msg: Read, callback: F) -> Result<R>
  21. where
  22. F: 'c + Send + FnOnce(&[u8]) -> R;
  23. type WriteFut<'c>: Send + Future<Output = Result<WriteReply>>
  24. where
  25. Self: 'c;
  26. fn write<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Write) -> Self::WriteFut<'c>;
  27. type FlushFut<'c>: Send + Future<Output = Result<()>>
  28. where
  29. Self: 'c;
  30. fn flush<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Flush) -> Self::FlushFut<'c>;
  31. type ReadDirFut<'c>: Send + Future<Output = Result<ReadDirReply>>
  32. where
  33. Self: 'c;
  34. fn read_dir<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadDir) -> Self::ReadDirFut<'c>;
  35. type LinkFut<'c>: Send + Future<Output = Result<()>>
  36. where
  37. Self: 'c;
  38. fn link<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Link) -> Self::LinkFut<'c>;
  39. type UnlinkFut<'c>: Send + Future<Output = Result<()>>
  40. where
  41. Self: 'c;
  42. fn unlink<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlink) -> Self::UnlinkFut<'c>;
  43. type ReadMetaFut<'c>: Send + Future<Output = Result<BlockMeta>>
  44. where
  45. Self: 'c;
  46. fn read_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: ReadMeta) -> Self::ReadMetaFut<'c>;
  47. type WriteMetaFut<'c>: Send + Future<Output = Result<()>>
  48. where
  49. Self: 'c;
  50. fn write_meta<'c>(&'c self, from: &'c Arc<BlockPath>, msg: WriteMeta)
  51. -> Self::WriteMetaFut<'c>;
  52. type CloseFut<'c>: Send + Future<Output = Result<()>>
  53. where
  54. Self: 'c;
  55. fn close<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Close) -> Self::CloseFut<'c>;
  56. type ForgetFut<'c>: Send + Future<Output = Result<()>>
  57. where
  58. Self: 'c;
  59. fn forget<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Forget) -> Self::ForgetFut<'c>;
  60. type LockFut<'c>: Send + Future<Output = Result<()>>
  61. where
  62. Self: 'c;
  63. fn lock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Lock) -> Self::LockFut<'c>;
  64. type UnlockFut<'c>: Send + Future<Output = Result<()>>
  65. where
  66. Self: 'c;
  67. fn unlock<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlock) -> Self::UnlockFut<'c>;
  68. }
  69. struct ServerCallback<P> {
  70. provider: Arc<P>,
  71. }
  72. impl<P> ServerCallback<P> {
  73. fn new(provider: Arc<P>) -> Self {
  74. Self { provider }
  75. }
  76. }
  77. impl<P> Clone for ServerCallback<P> {
  78. fn clone(&self) -> Self {
  79. Self {
  80. provider: self.provider.clone(),
  81. }
  82. }
  83. }
  84. impl<P: 'static + Send + Sync + FsProvider> MsgCallback for ServerCallback<P> {
  85. type Arg<'de> = FsMsg<'de>;
  86. type CallFut<'de> = impl 'de + Future<Output = btlib::Result<()>>;
  87. fn call<'de>(&'de self, arg: MsgReceived<FsMsg<'de>>) -> Self::CallFut<'de> {
  88. async move {
  89. let (from, body, replier) = arg.into_parts();
  90. let provider = &self.provider;
  91. let reply = match body {
  92. FsMsg::Lookup(lookup) => FsReply::Lookup(provider.lookup(&from, lookup).await?),
  93. FsMsg::Create(create) => FsReply::Create(provider.create(&from, create).await?),
  94. FsMsg::Open(open) => FsReply::Open(provider.open(&from, open).await?),
  95. FsMsg::Read(read) => {
  96. return provider.read(&from, read, move |data| {
  97. let mut replier = replier.unwrap();
  98. RuntimeHandle::current()
  99. .block_on(replier.reply(FsReply::Read(ReadReply { data })))
  100. })?;
  101. }
  102. FsMsg::Write(write) => FsReply::Write(provider.write(&from, write).await?),
  103. FsMsg::Flush(flush) => FsReply::Ack(provider.flush(&from, flush).await?),
  104. FsMsg::ReadDir(read_dir) => {
  105. FsReply::ReadDir(provider.read_dir(&from, read_dir).await?)
  106. }
  107. FsMsg::Link(link) => FsReply::Ack(provider.link(&from, link).await?),
  108. FsMsg::Unlink(unlink) => FsReply::Ack(provider.unlink(&from, unlink).await?),
  109. FsMsg::ReadMeta(read_meta) => {
  110. FsReply::ReadMeta(provider.read_meta(&from, read_meta).await?)
  111. }
  112. FsMsg::WriteMeta(write_meta) => {
  113. FsReply::Ack(provider.write_meta(&from, write_meta).await?)
  114. }
  115. FsMsg::Close(close) => FsReply::Ack(provider.close(&from, close).await?),
  116. FsMsg::Forget(forget) => FsReply::Ack(provider.forget(&from, forget).await?),
  117. FsMsg::Lock(lock) => FsReply::Ack(provider.lock(&from, lock).await?),
  118. FsMsg::Unlock(unlock) => FsReply::Ack(provider.unlock(&from, unlock).await?),
  119. };
  120. replier.unwrap().reply(reply).await
  121. }
  122. }
  123. }
  124. pub fn new_fs_server<C, P>(
  125. ip_addr: IpAddr,
  126. creds: Arc<C>,
  127. provider: Arc<P>,
  128. ) -> Result<impl Receiver>
  129. where
  130. C: 'static + Send + Sync + Creds,
  131. P: 'static + Send + Sync + FsProvider,
  132. {
  133. receiver(ip_addr, creds, ServerCallback::new(provider))
  134. }