|
@@ -9,7 +9,6 @@ use log::{debug, error, warn};
|
|
|
use positioned_io::Size;
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
use std::{
|
|
|
- cell::RefCell,
|
|
|
collections::hash_map::{self, HashMap},
|
|
|
ffi::CStr,
|
|
|
fmt::{Display, Formatter},
|
|
@@ -18,7 +17,7 @@ use std::{
|
|
|
path::{Path, PathBuf},
|
|
|
sync::{
|
|
|
atomic::{AtomicU64, Ordering},
|
|
|
- RwLock, RwLockWriteGuard,
|
|
|
+ Mutex, RwLock, RwLockWriteGuard,
|
|
|
},
|
|
|
time::Duration,
|
|
|
};
|
|
@@ -33,7 +32,7 @@ use crate::{
|
|
|
MetaReader, Positioned, Principaled, ReadDual, Result, SeekFromExt, Split, TrySeek, WriteDual,
|
|
|
};
|
|
|
|
|
|
-pub use private::{Blocktree, ModeAuthorizer};
|
|
|
+pub use private::{Blocktree, ModeAuthorizer, SpecInodes};
|
|
|
|
|
|
mod private {
|
|
|
use super::*;
|
|
@@ -238,10 +237,10 @@ mod private {
|
|
|
|
|
|
enum HandleValue {
|
|
|
File {
|
|
|
- accessor: RefCell<Option<Accessor<&'static [u8]>>>,
|
|
|
+ accessor: Mutex<Option<Accessor<&'static [u8]>>>,
|
|
|
},
|
|
|
Directory {
|
|
|
- accessor: RefCell<Option<Accessor<&'static [u8]>>>,
|
|
|
+ accessor: Mutex<Option<Accessor<&'static [u8]>>>,
|
|
|
dir: Directory,
|
|
|
},
|
|
|
}
|
|
@@ -250,11 +249,11 @@ mod private {
|
|
|
fn new<T: Size>(accessor: Accessor<T>) -> HandleValue {
|
|
|
let (accessor, ..) = accessor.split();
|
|
|
HandleValue::File {
|
|
|
- accessor: RefCell::new(Some(accessor)),
|
|
|
+ accessor: Mutex::new(Some(accessor)),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- fn get_cell(&self) -> &RefCell<Option<Accessor<&'static [u8]>>> {
|
|
|
+ fn get_mutex(&self) -> &Mutex<Option<Accessor<&'static [u8]>>> {
|
|
|
match self {
|
|
|
Self::File { accessor, .. } => accessor,
|
|
|
Self::Directory { accessor, .. } => accessor,
|
|
@@ -262,15 +261,23 @@ mod private {
|
|
|
}
|
|
|
|
|
|
fn take_accessor(&self) -> Result<Accessor<&'static [u8]>> {
|
|
|
- self.get_cell()
|
|
|
- .take()
|
|
|
+ let mut guard = self.get_mutex().lock().display_err()?;
|
|
|
+ guard
|
|
|
.take()
|
|
|
.ok_or_else(|| bterr!("reader has already been taken"))
|
|
|
}
|
|
|
|
|
|
- fn give_accessor(&self, reader: Accessor<&'static [u8]>) {
|
|
|
- let mut borrow = self.get_cell().borrow_mut();
|
|
|
- *borrow = Some(reader);
|
|
|
+ fn use_accessor<T, F: FnOnce(Accessor<&'static [u8]>) -> (Accessor<&'static [u8]>, T)>(
|
|
|
+ &self,
|
|
|
+ cb: F,
|
|
|
+ ) -> Result<T> {
|
|
|
+ let mut guard = self.get_mutex().lock().display_err()?;
|
|
|
+ let accessor = guard
|
|
|
+ .take()
|
|
|
+ .ok_or_else(|| bterr!("accessor has already been taken"))?;
|
|
|
+ let (accessor, output) = cb(accessor);
|
|
|
+ *guard = Some(accessor);
|
|
|
+ Ok(output)
|
|
|
}
|
|
|
|
|
|
fn convert_to_dir<C: Signer + Principaled + Decrypter>(
|
|
@@ -283,7 +290,7 @@ mod private {
|
|
|
let (accessor, ..) = accessor.split();
|
|
|
Ok(HandleValue::Directory {
|
|
|
dir,
|
|
|
- accessor: RefCell::new(Some(accessor)),
|
|
|
+ accessor: Mutex::new(Some(accessor)),
|
|
|
})
|
|
|
}
|
|
|
|
|
@@ -297,30 +304,17 @@ mod private {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- fn access_block<T, C, F: FnOnce(&mut Accessor<&FileBlock<C>>) -> Result<T>>(
|
|
|
+ fn access_block<B: Size, T, F: FnOnce(&mut Accessor<B>) -> Result<T>>(
|
|
|
&self,
|
|
|
- block: &FileBlock<C>,
|
|
|
+ block: B,
|
|
|
cb: F,
|
|
|
) -> Result<T> {
|
|
|
- let accessor = self.take_accessor()?;
|
|
|
- let mut accessor = Accessor::combine(accessor, block);
|
|
|
- let result = cb(&mut accessor);
|
|
|
- let (reader, ..) = accessor.split();
|
|
|
- self.give_accessor(reader);
|
|
|
- result
|
|
|
- }
|
|
|
-
|
|
|
- fn access_block_mut<T, C, F: FnOnce(&mut Accessor<&mut FileBlock<C>>) -> Result<T>>(
|
|
|
- &self,
|
|
|
- block: &mut FileBlock<C>,
|
|
|
- cb: F,
|
|
|
- ) -> Result<T> {
|
|
|
- let accessor = self.take_accessor()?;
|
|
|
- let mut accessor = Accessor::combine(accessor, block);
|
|
|
- let result = cb(&mut accessor);
|
|
|
- let (reader, ..) = accessor.split();
|
|
|
- self.give_accessor(reader);
|
|
|
- result
|
|
|
+ self.use_accessor(|accessor| {
|
|
|
+ let mut accessor = Accessor::combine(accessor, block);
|
|
|
+ let result = cb(&mut accessor);
|
|
|
+ let (accessor, ..) = accessor.split();
|
|
|
+ (accessor, result)
|
|
|
+ })?
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -392,7 +386,7 @@ mod private {
|
|
|
.get(&handle)
|
|
|
.ok_or_else(|| Self::invalid_handle_err(handle))?;
|
|
|
let inner = self.block.get_mut();
|
|
|
- value.access_block_mut(inner, cb)
|
|
|
+ value.access_block(inner, cb)
|
|
|
}
|
|
|
|
|
|
fn borrow_block<T, F: FnOnce(&Accessor<FileBlock<C>>) -> Result<T>>(
|
|
@@ -1673,13 +1667,12 @@ mod tests {
|
|
|
abi::fuse_abi::CreateIn,
|
|
|
api::filesystem::{Context, FileSystem, FsOptions},
|
|
|
};
|
|
|
- use std::ffi::CString;
|
|
|
+ use std::{ffi::CString, sync::Arc};
|
|
|
use tempdir::TempDir;
|
|
|
|
|
|
- use super::private::SpecInodes;
|
|
|
use crate::{
|
|
|
crypto::ConcreteCreds,
|
|
|
- test_helpers::{node_creds, BtCursor},
|
|
|
+ test_helpers::{integer_array, node_creds, BtCursor},
|
|
|
BlockMeta, Decompose,
|
|
|
};
|
|
|
|
|
@@ -2233,4 +2226,76 @@ mod tests {
|
|
|
let actual = actual.into_inner();
|
|
|
assert_eq!([0, 1, 2, 3, 0, 0, 0, 0], actual);
|
|
|
}
|
|
|
+
|
|
|
+ #[test]
|
|
|
+ fn concurrent_reads() {
|
|
|
+ // The size of each of the reads.
|
|
|
+ const SIZE: usize = 4;
|
|
|
+ // The number of concurrent reads.
|
|
|
+ const NREADS: usize = 32;
|
|
|
+ const DATA_LEN: usize = SIZE * NREADS;
|
|
|
+ const DATA: [u8; DATA_LEN] = integer_array::<DATA_LEN>(0);
|
|
|
+ let case = BtTestCase::new_empty();
|
|
|
+ let ctx = case.context();
|
|
|
+ let file = CString::new("file.txt").unwrap();
|
|
|
+ let bt = &case.bt;
|
|
|
+ let root = SpecInodes::RootDir.into();
|
|
|
+ let flags = libc::O_RDWR as u32;
|
|
|
+ let create_in = CreateIn {
|
|
|
+ flags,
|
|
|
+ mode: 0o644,
|
|
|
+ umask: 0,
|
|
|
+ fuse_flags: 0,
|
|
|
+ };
|
|
|
+
|
|
|
+ let (entry, handle, ..) = bt.create(&ctx, root, &file, create_in).unwrap();
|
|
|
+ let handle = handle.unwrap();
|
|
|
+ let inode = entry.inode;
|
|
|
+ let written = bt
|
|
|
+ .write(
|
|
|
+ &ctx,
|
|
|
+ inode,
|
|
|
+ handle,
|
|
|
+ &mut BtCursor::new(DATA),
|
|
|
+ DATA.len() as u32,
|
|
|
+ 0,
|
|
|
+ None,
|
|
|
+ false,
|
|
|
+ flags,
|
|
|
+ 0,
|
|
|
+ )
|
|
|
+ .unwrap();
|
|
|
+ assert_eq!(DATA.len(), written);
|
|
|
+ let case = Box::new(case);
|
|
|
+ let cb = Arc::new(Box::new(move |offset: usize| {
|
|
|
+ // Notice that we have concurrent reads to different offsets using the same handle.
|
|
|
+ // Without proper synchronization, this shouldn't work.
|
|
|
+ let mut actual = BtCursor::new(Vec::new());
|
|
|
+ let nread = case
|
|
|
+ .bt
|
|
|
+ .read(
|
|
|
+ &ctx,
|
|
|
+ inode,
|
|
|
+ handle,
|
|
|
+ &mut actual,
|
|
|
+ SIZE as u32,
|
|
|
+ offset as u64,
|
|
|
+ None,
|
|
|
+ flags,
|
|
|
+ )
|
|
|
+ .unwrap();
|
|
|
+ assert_eq!(SIZE, nread);
|
|
|
+ let expected = integer_array::<SIZE>(offset as u8);
|
|
|
+ assert_eq!(&expected, actual.into_inner().as_slice());
|
|
|
+ }));
|
|
|
+
|
|
|
+ let mut handles = Vec::with_capacity(NREADS);
|
|
|
+ for offset in (0..NREADS).map(|e| e * SIZE) {
|
|
|
+ let thread_cb = cb.clone();
|
|
|
+ handles.push(std::thread::spawn(move || thread_cb(offset)));
|
|
|
+ }
|
|
|
+ for handle in handles {
|
|
|
+ handle.join().unwrap();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|