// SPDX-License-Identifier: AGPL-3.0-or-later use crate::{msg::*, server::FsProvider}; use btlib::{ accessor::Accessor, bterr, crypto::{rand_vec, Creds, Decrypter, HashKind, Signer, SymKey}, error::BtErr, AuthzAttrs, BlockAccessor, BlockError, BlockMeta, BlockMetaSecrets, BlockOpenOptions, BlockPath, BlockReader, DirEntry, Directory, Epoch, FileBlock, FlushMeta, IssuedProcRec, MetaAccess, MetaReader, Positioned, Principal, Principaled, ProcRec, Result, Split, TrySeek, ZeroExtendable, }; use btserde::{read_from, write_to}; use core::future::Ready; use log::{debug, error, warn}; use positioned_io::{ReadAt, Size}; use serde::{Deserialize, Serialize}; use std::{ collections::hash_map::{self, HashMap}, fmt::{Display, Formatter}, fs::File, future::Future, io::{self, Seek, SeekFrom, Write as IoWrite}, net::{IpAddr, Ipv6Addr}, ops::{Deref, DerefMut}, path::{Path, PathBuf}, sync::{ atomic::{AtomicU64, Ordering}, Arc, }, time::Duration, }; use tokio::sync::{ Mutex, MutexGuard, OwnedMutexGuard, OwnedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, }; use zeroize::ZeroizeOnDrop; pub use private::{Authorizer, AuthzContext, Error, LocalFs, ModeAuthorizer}; mod private { use super::*; type Inode = u64; type Handle = u64; #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub enum Error { NotOpen(Inode), InvalidHandle { inode: Inode, handle: Handle }, NoHandlesAvailable(Inode), InodeNotFound(Inode), ReadOnlyHandle(Handle), WrongOwner, } impl Display for Error { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { Error::NotOpen(inode) => write!(f, "inode {inode} is not open"), Error::InvalidHandle { inode, handle } => { write!(f, "invalid handle {handle} for inode {inode}") } Error::NoHandlesAvailable(inode) => { write!(f, "no handles are available for inode {inode}") } Error::InodeNotFound(inode) => write!(f, "inode {inode} could not be found"), Error::ReadOnlyHandle(handle) => { write!(f, "cannot mutably access read-only handle {handle}") } Error::WrongOwner => write!(f, "handle is not owned by the requestor"), } } } trait BlockPathExt { fn assert_eq(&self, other: &BlockPath) -> Result<()>; } impl BlockPathExt for BlockPath { fn assert_eq(&self, other: &BlockPath) -> Result<()> { if self != other { Err(Error::WrongOwner.into()) } else { Ok(()) } } } impl std::error::Error for Error {} /// This type provides context for an authorization decision as to whether a given process will /// be allowed to access a block. pub struct AuthzContext<'a> { /// The path from which this request was received. pub from: &'a BlockPath, /// The attributes of the principal whose access is being authorized. pub attrs: &'a AuthzAttrs, /// A reference to the metadata of a block, the access to which is being authorized. pub meta: &'a BlockMeta, } impl<'a> AuthzContext<'a> { fn new(from: &'a BlockPath, attrs: &'a AuthzAttrs, meta: &'a BlockMeta) -> Self { Self { from, attrs, meta } } } /// A trait for types which can render authorization decisions. pub trait Authorizer { /// Returns [Ok] if read authorization is granted, and [Err] otherwise. fn can_read(&self, ctx: &AuthzContext<'_>) -> io::Result<()>; /// Returns [Ok] if write authorization is granted, and [Err] otherwise. fn can_write(&self, ctx: &AuthzContext<'_>) -> io::Result<()>; /// Returns [Ok] if execute authorization is granted, and [Err] otherwise. fn can_exec(&self, ctx: &AuthzContext<'_>) -> io::Result<()>; } /// A particularly simple authorizer that just looks at the mode bits in the block metadata /// to make authorization decisions. pub struct ModeAuthorizer; impl ModeAuthorizer { fn authorize(mode: u32, mask: u32, denied_msg: &str) -> io::Result<()> { if (mode & mask) != 0 { Ok(()) } else { Err(io::Error::new(io::ErrorKind::PermissionDenied, denied_msg)) } } fn user_is_root(ctx: &AuthzContext<'_>) -> bool { ctx.attrs.uid == 0 } } impl Authorizer for ModeAuthorizer { fn can_read(&self, ctx: &AuthzContext<'_>) -> io::Result<()> { if Self::user_is_root(ctx) { return Ok(()); } let secrets = ctx.meta.body().secrets()?; let mask = (libc::S_IRUSR * (secrets.uid == ctx.attrs.uid) as u32) | (libc::S_IRGRP * (secrets.gid == ctx.attrs.gid) as u32) | libc::S_IROTH; Self::authorize(secrets.mode, mask, "read access denied") } fn can_write(&self, ctx: &AuthzContext<'_>) -> io::Result<()> { if Self::user_is_root(ctx) { return Ok(()); } let secrets = ctx.meta.body().secrets()?; let mask = (libc::S_IWUSR * (secrets.uid == ctx.attrs.uid) as u32) | (libc::S_IWGRP * (secrets.gid == ctx.attrs.gid) as u32) | libc::S_IWOTH; Self::authorize(secrets.mode, mask, "write access denied") } fn can_exec(&self, ctx: &AuthzContext<'_>) -> io::Result<()> { if Self::user_is_root(ctx) { return Ok(()); } let secrets = ctx.meta.body().secrets()?; let mask = (libc::S_IXUSR * (secrets.uid == ctx.attrs.uid) as u32) | (libc::S_IXGRP * (secrets.gid == ctx.attrs.gid) as u32) | libc::S_IXOTH; Self::authorize(secrets.mode, mask, "exec access denied") } } type EmptyAccessor = Option>; type HandleValueParts<'a> = (&'a Arc>, &'a Arc, Flags); struct HandleGuard> { guard: G, accessor: Option>, flags: Flags, } impl> HandleGuard { fn new(flags: Flags, mut guard: G, block: B) -> Self { let accessor = guard .take() .map(move |accessor| Accessor::combine(accessor, block)); Self { guard, accessor, flags, } } } impl> Drop for HandleGuard { fn drop(&mut self) { *self.guard = self.accessor.take().map(|accessor| { let (accessor, _) = accessor.split(); accessor }); } } impl> Deref for HandleGuard { type Target = Accessor; fn deref(&self) -> &Self::Target { self.accessor.as_ref().unwrap() } } impl> DerefMut for HandleGuard { fn deref_mut(&mut self) -> &mut Self::Target { self.accessor.as_mut().unwrap() } } enum HandleValue { File { accessor: Arc>, owner: Arc, flags: Flags, }, Directory { accessor: Arc>, owner: Arc, flags: Flags, dir: Directory, }, } impl HandleValue { fn new(accessor: Accessor, owner: Arc, flags: Flags) -> HandleValue { let (accessor, ..) = accessor.split(); HandleValue::File { accessor: Arc::new(Mutex::new(Some(accessor))), owner, flags, } } fn flags(&self) -> Flags { match self { Self::File { flags, .. } => *flags, Self::Directory { flags, .. } => *flags, } } fn parts(&self) -> HandleValueParts<'_> { match self { Self::File { accessor, owner, flags, } => (accessor, owner, *flags), Self::Directory { accessor, owner, flags, .. } => (accessor, owner, *flags), } } fn convert_to_dir( self, block: &mut FileBlock, ) -> Result { let (accessor, owner, flags) = match self { Self::File { accessor, owner, flags, } => (accessor, owner, flags), Self::Directory { accessor, owner, flags, .. } => (accessor, owner, flags), }; let accessor = Arc::try_unwrap(accessor).map_err(|_| { bterr!("LOGIC ERROR: accessor was still in use even though convert_to_dir owns it") })?; let accessor = accessor .into_inner() .ok_or_else(|| bterr!("LOGIC ERROR: accessor was not returned to mutex"))?; let mut accessor = Accessor::combine(accessor, block); let dir = accessor.read_dir()?; let (accessor, ..) = accessor.split(); Ok(HandleValue::Directory { dir, accessor: Arc::new(Mutex::new(Some(accessor))), owner, flags, }) } fn directory(&self) -> io::Result<&Directory> { match self { Self::Directory { dir, .. } => Ok(dir), _ => Err(io::Error::new( io::ErrorKind::Other, "handle is not for a directory", )), } } async fn lock(&self, from: &BlockPath) -> Result<(Flags, OwnedMutexGuard)> { let (mutex, owner, flags) = self.parts(); owner.assert_eq(from)?; Ok((flags, mutex.clone().lock_owned().await)) } async fn guard<'a, B: Size>( &'a self, from: &BlockPath, block: B, ) -> Result>> { let (mutex, owner, flags) = self.parts(); owner.assert_eq(from)?; let guard = mutex.lock().await; Ok(HandleGuard::new(flags, guard, block)) } fn set_flags(&mut self, new_flags: Flags) { match self { Self::File { flags, .. } => *flags = new_flags, Self::Directory { flags, .. } => *flags = new_flags, } } } struct BlockGuard { inner: B, } impl BlockGuard { fn new(inner: B) -> Self { Self { inner } } } impl>> Deref for BlockGuard { type Target = FileBlock; fn deref(&self) -> &Self::Target { self.inner.block.get_ref() } } impl>> DerefMut for BlockGuard { fn deref_mut(&mut self) -> &mut Self::Target { self.inner.block.get_mut() } } impl>> Size for BlockGuard { fn size(&self) -> io::Result> { self.inner.block.size() } } impl>> ReadAt for BlockGuard { fn read_at(&self, pos: u64, buf: &mut [u8]) -> io::Result { self.inner.block.get_ref().read_at(pos, buf) } } impl>> AsRef for BlockGuard { fn as_ref(&self) -> &BlockMeta { self.inner.block.as_ref() } } pub struct InodeTableValue { block: Accessor>, handle_values: HashMap, next_handle: Handle, lookup_counts: HashMap, u64>, delete: bool, } impl InodeTableValue { fn new(block: Accessor>, opener: Arc) -> InodeTableValue { let mut lookup_counts = HashMap::with_capacity(1); lookup_counts.insert(opener, 1); Self { block, handle_values: HashMap::new(), next_handle: 1, lookup_counts, delete: false, } } fn invalid_handle_err(handle: Handle) -> io::Error { io::Error::new(io::ErrorKind::Other, format!("invalid handle {handle}")) } fn value(&self, handle: Handle) -> io::Result<&HandleValue> { self.handle_values .get(&handle) .ok_or_else(|| Self::invalid_handle_err(handle)) } fn block(&self) -> &FileBlock { self.block.get_ref() } fn block_mut(&mut self) -> &mut FileBlock { self.block.get_mut() } fn convert_to_dir(&mut self, handle: Handle) -> io::Result<()> { let value = self .handle_values .remove(&handle) .ok_or_else(|| Self::invalid_handle_err(handle))?; let block = self.block_mut(); let value = value.convert_to_dir(block)?; self.handle_values.insert(handle, value); Ok(()) } async fn handle_guard<'a>( &'a self, from: &BlockPath, handle: Handle, ) -> Result, MutexGuard<'a, EmptyAccessor>>> { let value = self.value(handle)?; let block = self.block(); value.guard(from, block).await } async fn handle_guard_owned( guard: OwnedRwLockReadGuard, from: &BlockPath, handle: Handle, ) -> Result< HandleGuard>, OwnedMutexGuard>, > { let value = guard.value(handle)?; let (flags, mutex_guard) = value.lock(from).await?; let guard = BlockGuard::new(guard); Ok(HandleGuard::new(flags, mutex_guard, guard)) } async fn handle_guard_mut<'a>( &'a mut self, from: &BlockPath, handle: Handle, ) -> Result, MutexGuard<'a, EmptyAccessor>>> { let value = self .handle_values .get(&handle) .ok_or_else(|| Self::invalid_handle_err(handle))?; if !value.flags().writeable() { return Err(Error::ReadOnlyHandle(handle).into()); } let inner = self.block.get_mut(); value.guard(from, inner).await } fn new_handle(&mut self, owner: Arc, flags: Flags) -> Result { if self.handle_values.len() as u64 >= u64::MAX { return Err(bterr!("no handles are available")); } let mut handle_value = HandleValue::new(Accessor::new(self.block())?, owner, flags); loop { let handle = self.next_handle; self.next_handle = self.next_handle.wrapping_add(1); match self.handle_values.insert(handle, handle_value) { Some(prev) => { // We've wrapped around and this handle is already taken. Put the previous // value back and try again. handle_value = self.handle_values.insert(handle, prev).unwrap(); } // We generated an unused handle. Return it. None => return Ok(handle), } } } fn set_flags(&mut self, handle: Handle, flags: Flags) -> Result<()> { let handle_value = self .handle_values .get_mut(&handle) .ok_or_else(|| Self::invalid_handle_err(handle))?; handle_value.set_flags(flags); Ok(()) } fn forget_handle(&mut self, handle: Handle) { self.handle_values.remove(&handle); } /// Increments the lookup count from the given path by 1. fn incr_lookup_count(&mut self, from: &Arc) { match self.lookup_counts.entry(from.clone()) { hash_map::Entry::Occupied(mut entry) => { // I don't want this to silently overflow. let new_count = entry.get().checked_add(1).unwrap(); *entry.get_mut() = new_count; } hash_map::Entry::Vacant(entry) => { entry.insert(1); } } } /// Decrements the lookup count from the given path by the given amount. fn decr_lookup_count(&mut self, from: Arc, decr: u64) { match self.lookup_counts.entry(from) { hash_map::Entry::Occupied(mut entry) => { let new_count = entry.get().saturating_sub(decr); if new_count > 0 { *entry.get_mut() = new_count; } else { entry.remove(); } } hash_map::Entry::Vacant(..) => (), } } fn total_lookup_count(&self) -> u64 { self.lookup_counts.values().sum() } } type InodeTable = HashMap>>>; type OwnedTableLock = OwnedRwLockReadGuard>; type TableLock<'a, C> = RwLockReadGuard<'a, InodeTable>; struct TableGuard { table_guard: G, } impl TableGuard> { async fn new_owned(table: Arc>>) -> TableGuard> { let table_guard = table.read_owned().await; TableGuard { table_guard } } } impl<'a, C> TableGuard> { async fn new(table: &'a RwLock>) -> TableGuard> { let table_guard = table.read().await; TableGuard { table_guard } } } impl>> TableGuard { fn get_value(&self, inode: Inode) -> Result<&Arc>>> { self.table_guard .get(&inode) .ok_or_else(|| bterr!(Error::NotOpen(inode))) } async fn read<'a>(&'a self, inode: Inode) -> Result>> where C: 'a, { let value = self.get_value(inode)?; Ok(value.read().await) } async fn write<'a>( &'a self, inode: Inode, ) -> Result>> where C: 'a, { let value = self.get_value(inode)?; Ok(value.write().await) } } /// Structure for metadata about a blocktree. #[derive(Debug, Serialize, Deserialize, ZeroizeOnDrop)] struct Superblock { /// The generation number of the cluster this part of the blocktree is stored on. generation: u64, /// The next free inode available to the cluster. #[zeroize(skip)] next_inode: AtomicU64, /// The hash algorithm to use when computing inode paths. #[zeroize(skip)] inode_hash: HashKind, /// The key to use when hashing inodes to file paths. inode_key: Vec, } /// Structure for managing the part of a blocktree which is stored in the local filesystem. pub struct LocalFs { /// The path to the directory in the local filesystem where this blocktree is located. path: PathBuf, /// A map from inode numbers to their reference counts. inodes: Arc>>, /// An in-memory copy of the superblock. sb: Superblock, /// The credentials this blocktree instance will use for all cryptographic operations. creds: C, authorizer: A, root_principal: Principal, } impl LocalFs { /// The maximum number of directory entries that can be returned in any given call to /// `read_dir`. const READ_DIR_LIMIT: usize = 1024; } impl LocalFs { /// Creates a new empty blocktree at the given path. pub async fn new_empty( btdir: PathBuf, generation: u64, creds: C, authorizer: A, ) -> Result> { let writecap = creds.writecap().ok_or(BlockError::MissingWritecap)?; let mut root_block_path = writecap.root_block_path(); let root_principal = writecap.root_principal(); // Initialize the superblock. let mut sb_block = Self::open_superblock( &btdir, creds.clone(), root_block_path.clone(), &root_principal, true, )?; let sb = Superblock { generation, next_inode: AtomicU64::new(SpecInodes::FirstFree.into()), inode_hash: HashKind::default(), inode_key: rand_vec(HashKind::default().len())?, }; write_to(&sb, &mut sb_block)?; sb_block.mut_meta_body().access_secrets(|secrets| { secrets.block_id.generation = generation; secrets.block_id.inode = SpecInodes::Sb.into(); secrets.mode = FileType::Reg.value() | 0o666; secrets.uid = 0; secrets.gid = 0; secrets.nlink = 1; Ok(()) })?; sb_block.flush()?; // Initialize the root directory. let mut root_block = Self::open_block( &btdir, SpecInodes::RootDir.into(), creds.clone(), root_block_path.clone(), None, sb.inode_hash, &sb.inode_key, )?; write_to(&Directory::new(), &mut root_block)?; root_block.mut_meta_body().access_secrets(|secrets| { secrets.block_id.generation = generation; secrets.block_id.inode = SpecInodes::RootDir.into(); secrets.mode = FileType::Dir.value() | 0o777; secrets.uid = 0; secrets.gid = 0; secrets.nlink = 1; Ok(()) })?; root_block.flush()?; let fs = Self::new( btdir, sb, sb_block, root_block, creds, authorizer, root_principal, )?; let writecap = fs.creds.writecap().ok_or(BlockError::MissingWritecap)?; if fs.creds.principal() != fs.root_principal { let proc_rec = IssuedProcRec { addr: IpAddr::V6(Ipv6Addr::LOCALHOST), pub_creds: fs.creds.concrete_pub(), writecap: writecap.to_owned(), authz_attrs: AuthzAttrs { uid: 0, gid: 0, supp_gids: Vec::new(), }, }; root_block_path.push_component(fs.root_principal.to_string()); fs.grant_access_to( &Arc::new(root_block_path), SpecInodes::RootDir.into(), proc_rec, ) .await?; } Ok(fs) } /// Opens an existing blocktree stored at the given path. pub fn new_existing(btdir: PathBuf, creds: C, authorizer: A) -> Result> { let writecap = creds.writecap().ok_or(BlockError::MissingWritecap)?; let root_block_path = writecap.root_block_path(); let root_principal = writecap.root_principal(); let mut sb_block = Self::open_superblock( &btdir, creds.clone(), root_block_path.clone(), &root_principal, false, )?; let sb: Superblock = read_from(&mut sb_block)?; let root_block = Self::open_block( &btdir, SpecInodes::RootDir.into(), creds.clone(), root_block_path, None, sb.inode_hash, &sb.inode_key, )?; Self::new( btdir, sb, sb_block, root_block, creds, authorizer, root_principal, ) } fn new( btdir: PathBuf, sb: Superblock, sb_block: Accessor>, root_block: Accessor>, creds: C, authorizer: A, root_principal: Principal, ) -> Result> { let mut inodes = HashMap::with_capacity(1); let empty_path = Arc::new(BlockPath::default()); inodes.insert( SpecInodes::Sb.into(), Arc::new(RwLock::new(InodeTableValue::new( sb_block, empty_path.clone(), ))), ); inodes.insert( SpecInodes::RootDir.into(), Arc::new(RwLock::new(InodeTableValue::new(root_block, empty_path))), ); Ok(LocalFs { path: btdir, inodes: Arc::new(RwLock::new(inodes)), sb, creds, authorizer, root_principal, }) } fn make_path>(parent: P, file_name: &str) -> PathBuf { let mut path = PathBuf::with_capacity(parent.as_ref().as_os_str().len() + 1 + file_name.len() + 1); path.push(parent); path.push(&file_name[..2]); path.push(&file_name[2..]); path } fn hex_encode(src: &[u8]) -> Result { use std::fmt::Write; let mut string = String::with_capacity(2 * src.len()); for byte in src.iter() { write!(string, "{byte:02x}")?; } Ok(string) } fn ensure_parent_created(path: &Path) -> Result<()> { let dir = path.ancestors().nth(1).unwrap(); if let Err(err) = std::fs::create_dir(dir) { match err.kind() { io::ErrorKind::AlreadyExists => Ok(()), _ => Err(err.into()), } } else { Ok(()) } } fn open_superblock>( btdir: P, creds: C, block_path: BlockPath, root_principal: &Principal, create_new: bool, ) -> Result>> { const HASH: HashKind = HashKind::Sha2_256; let mut buf = [0u8; HASH.len()]; HASH.digest( &mut buf, [ SpecInodes::Sb.value().to_le_bytes().as_slice(), root_principal.as_slice(), ] .into_iter(), )?; let hex_str = Self::hex_encode(&buf)?; let path = Self::make_path(btdir, &hex_str); Self::ensure_parent_created(&path)?; let file = std::fs::OpenOptions::new() .read(true) .write(true) .create_new(create_new) .open(path)?; let block = BlockOpenOptions::new() .with_creds(creds) .with_encrypt(true) .with_inner(file) .with_block_path(block_path) .open()?; Ok(block) } /// Returns the path to the file storing the given inode's data. fn block_path>( parent: P, inode: Inode, inode_hash: HashKind, inode_key: &[u8], ) -> Result { let mut buf = vec![0u8; inode_hash.len()]; inode_hash.digest( &mut buf, [inode.to_le_bytes().as_slice(), inode_key].into_iter(), )?; let hex_str = Self::hex_encode(&buf)?; Ok(Self::make_path(parent, &hex_str)) } fn open_block>( btdir: P, inode: Inode, creds: C, block_path: BlockPath, parent_key: Option, inode_hash: HashKind, inode_key: &[u8], ) -> Result>> { let path = Self::block_path(&btdir, inode, inode_hash, inode_key)?; Self::ensure_parent_created(&path)?; let file = std::fs::OpenOptions::new() .read(true) .write(true) .create(true) .open(path)?; Self::open_block_file(file, creds, block_path, parent_key) } fn open_block_file( file: File, creds: C, block_path: BlockPath, parent_key: Option, ) -> Result>> { let block = BlockOpenOptions::new() .with_creds(creds) .with_encrypt(true) .with_inner(file) .with_parent_key(parent_key) .with_block_path(block_path) .open()?; Ok(block) } async fn table_guard(&self) -> TableGuard> { TableGuard::new(&self.inodes).await } async fn open_value( &self, from: Arc, inode: Inode, block_path: BlockPath, parent_key: Option, ) -> Result<()> { let block = Self::open_block( &self.path, inode, self.creds.clone(), block_path, parent_key, self.sb.inode_hash, &self.sb.inode_key, )?; let value = Arc::new(RwLock::new(InodeTableValue::new(block, from))); let mut inodes = self.inodes.write().await; if inodes.insert(inode, value).is_some() { error!( "LOGIC ERROR: open_value was called with inode {inode}, which is already open" ); } Ok(()) } /// Ensures that the given inode is open. If the inode is already open, then this method /// does nothing and returns the table guard which was used to check the status of the /// inode. /// ## Warning /// Because this method creates new table guards, no table guard must be alive when it's /// called. Otherwise a deadlock will occur. async fn ensure_open<'a>( &'a self, from: &Arc, inode: Inode, block_path: BlockPath, parent_key: Option, ) -> Result>> { { let table_guard = self.inodes.clone().read_owned().await; if table_guard.contains_key(&inode) { return Ok(TableGuard { table_guard }); } } self.open_value(from.clone(), inode, block_path, parent_key) .await?; Ok(TableGuard::new_owned(self.inodes.clone()).await) } fn delete_block_file(&self, inode: Inode) -> Result<()> { let mut path = Self::block_path(&self.path, inode, self.sb.inode_hash, &self.sb.inode_key)?; std::fs::remove_file(&path)?; path.pop(); let mut contents = std::fs::read_dir(&path)?; if contents.next().is_none() { std::fs::remove_dir(&path)?; } Ok(()) } async fn inode_forget<'a>( &self, from: Arc, inode: Inode, count: u64, ) -> io::Result<()> { let mut inodes = self.inodes.write().await; let lookup_count = { let inode_lock = match inodes.get_mut(&inode) { Some(inode_lock) => inode_lock, None => { warn!("an attempt was made to forget non-existent inode {inode}"); return Ok(()); } }; let mut value = inode_lock.write().await; value.decr_lookup_count(from, count); value.total_lookup_count() }; if 0 == lookup_count { let entry = Arc::try_unwrap(inodes.remove(&inode).unwrap()) .map_err(|_| bterr!("LOGIC ERROR: entry for inode {inode} was still in use while it was being forgotten"))?; let delete = entry.into_inner().delete; if delete { self.delete_block_file(inode)?; } } Ok(()) } /// Returns the next available inode and updates the superblock in one atomic operation. /// TODO: Obviously this strategy won't work when there are multiple servers in this /// generation. async fn next_inode(&self) -> Result { let table_guard = self.table_guard().await; let mut value_guard = table_guard.write(SpecInodes::Sb.into()).await?; let mut block = &mut value_guard.block; // We don't need strict ordering because the lock on the inode table value is already // serializing access. let inode = self.sb.next_inode.fetch_add(1, Ordering::Relaxed); block.rewind()?; write_to(&self.sb, &mut block)?; block.flush()?; Ok(inode) } fn attr_timeout(&self) -> Duration { Duration::from_secs(5) } fn entry_timeout(&self) -> Duration { Duration::from_secs(5) } fn unsupported_flag_err(flag: &str) -> btlib::Error { bterr!("unsupported flag: {flag}") } fn bt_entry(&self, attr: BlockMetaSecrets) -> crate::msg::Entry { crate::msg::Entry { attr, attr_timeout: self.attr_timeout(), entry_timeout: self.entry_timeout(), } } /// Grants the given credentials access to the directory this instance is responsible for. /// /// ## Warning /// This method calls `self.authz_attrs`, so the same consideration for avoiding deadlock /// apply to this method as well. See the documentation of `self.authz_attrs` for details. async fn grant_access_to( &self, from: &Arc, inode: Inode, proc_rec: IssuedProcRec, ) -> Result<()> { let authz_attrs = self.authz_attrs(from).await?; let principal = proc_rec.pub_creds.principal(); let (next_inode, parent_key) = { let table_guard = self.table_guard().await; let next_inode = if inode == SpecInodes::RootDir.value() { // If the inode is for the root directory we need to add a readcap for the // superblock. let mut value_guard = table_guard.write(SpecInodes::Sb.into()).await?; let mut block = &mut value_guard.block; let next_inode = self.sb.next_inode.fetch_add(1, Ordering::Relaxed); block.rewind()?; write_to(&self.sb, &mut block)?; block.mut_meta_body().add_readcap_for(&proc_rec.pub_creds)?; block.flush()?; Ok(next_inode) } else { self.next_inode().await }?; let parent_key = { let mut value_guard = table_guard.write(inode).await?; let block = &mut value_guard.block; self.authorizer.can_write(&AuthzContext::new( from, &authz_attrs, block.meta(), ))?; block.mut_meta_body().add_readcap_for(&proc_rec.pub_creds)?; let mut dir = block.read_dir()?; let proc_rec_name = principal.to_string(); dir.add_file(proc_rec_name, next_inode)?; // Note that write_dir calls flush, which also ensures metadata is written to // disk. block.write_dir(&dir)?; block.meta_body().block_key()?.clone() }; (next_inode, parent_key) }; let self_writecap = self.creds.writecap().ok_or(BlockError::MissingWritecap)?; let self_bind_path = Arc::new(self_writecap.bind_path()); let bind_path = proc_rec.writecap.bind_path(); self.open_value( self_bind_path.clone(), next_inode, bind_path, Some(parent_key), ) .await?; { let table_guard = self.table_guard().await; let mut value_guard = table_guard.write(next_inode).await?; let block = &mut value_guard.block; block.write_proc_rec(&ProcRec::Valid(proc_rec))?; }; // We must ensure the reference count for the inode is decremented, otherwise the table // entry will never be freed. self.inode_forget(self_bind_path, next_inode, 1).await?; Ok(()) } async fn lookup_inode_in<'a>( table_guard: &'a TableGuard>, parent: Inode, name: &str, ) -> Result { let mut value_guard = table_guard.write(parent).await?; let dir = value_guard.block.read_dir()?; dir.entry(name) .ok_or_else(|| io::Error::from_raw_os_error(libc::ENOENT).into()) .map(|e| e.inode()) } /// Returns a pair of inodes, where the first inode is the inode referred to by the given /// path, and the second is the parent inode. async fn lookup_inode<'a, 'b, I: Iterator>( table_guard: &'b TableGuard>, components: I, ) -> Result<(Inode, Option)> { const ROOT: Inode = SpecInodes::RootDir as Inode; let mut parent = None; let mut inode = ROOT; for component in components { parent = Some(inode); inode = Self::lookup_inode_in(table_guard, inode, component).await?; } Ok((inode, parent)) } /// Retrieves the authorization attributes for the principal identified by the given path. /// If the principal is not associated with a valid process record, then an [Err] is /// returned. /// ## Warning /// If this method is called while a lock for any component on the given path is held, then /// a deadlock may occur. It's safest to call this method when _no_ locks are held. async fn authz_attrs(&self, from: &Arc) -> Result { let writecap = self.creds.writecap().ok_or(BlockError::MissingWritecap)?; let root_principal = writecap.root_principal(); let from_principal = from.components().last().map_or_else( || Err(bterr!("path from which message was received was empty")), Principal::try_from, )?; if root_principal == from_principal { // Now I am become root, the destroyer of files. return Ok(AuthzAttrs { uid: 0, gid: 0, supp_gids: Vec::new(), }); } let local_root = writecap.path(); let relative = from.relative_to(local_root)?; let (inode, parent_key) = { let table_guard = self.table_guard().await; let (inode, parent) = Self::lookup_inode(&table_guard, relative.components()).await?; let parent_key = if let Some(parent) = parent { let value_guard = table_guard.read(parent).await?; Some(value_guard.block.meta_body().block_key()?.clone()) } else { None }; (inode, parent_key) }; let proc_rec = { let table_guard = self .ensure_open(from, inode, from.as_ref().to_owned(), parent_key) .await?; let mut value_guard = table_guard.write(inode).await?; value_guard.block.read_proc_rec()? }; let proc_rec = proc_rec.validate()?; Ok(proc_rec.authz_attrs) } } unsafe impl Sync for LocalFs {} /// An owned guard which allows read access to file data. pub struct BufGuard { offset: u64, size: u64, // Note that handle must come before _table to ensure the guards are dropped in the correct // order. handle: HandleGuard< BlockGuard>>, OwnedMutexGuard, >, _table: OwnedTableLock, } impl BufGuard { async fn new( table: Arc>>, from: &BlockPath, inode: Inode, handle: Handle, offset: u64, size: u64, ) -> Result> { let table = table.read_owned().await; let entry = table.get(&inode).ok_or(Error::NotOpen(inode))?; let inode_guard = { let inode_guard = entry.clone().read_owned().await; let mut handle_guard = inode_guard.handle_guard(from, handle).await?; handle_guard.flags.assert_readable()?; let pos = handle_guard.pos() as u64; if offset != pos { if let Err(err) = handle_guard.try_seek(SeekFrom::Start(offset)) { // An error with `ErrorKind::Unsupported` means that the `SectoredBuf` // has unflushed data and it needs exclusive access to the block to // perform this seek because this data needs to be written. if let io::ErrorKind::Unsupported = err.kind() { None } else { return Err(err.into()); } } else { drop(handle_guard); Some(inode_guard) } } else { drop(handle_guard); Some(inode_guard) } }; let inode_guard = match inode_guard { Some(inode_guard) => inode_guard, None => { { let mut inode_guard = entry.write().await; let mut handle_guard = inode_guard.handle_guard_mut(from, handle).await?; handle_guard.seek(SeekFrom::Start(offset))?; } entry.clone().read_owned().await } }; let handle = InodeTableValue::handle_guard_owned(inode_guard, from, handle).await?; Ok(BufGuard { handle, _table: table, offset, size, }) } } impl Deref for BufGuard { type Target = [u8]; fn deref(&self) -> &Self::Target { self.handle.get_buf(self.offset, self.size).unwrap() } } impl FsProvider for LocalFs { type LookupFut<'c> = impl 'c + Send + Future>; fn lookup<'c>(&'c self, from: &'c Arc, msg: Lookup<'c>) -> Self::LookupFut<'c> { async move { let Lookup { parent, name, .. } = msg; debug!("lookup: parent {parent}, {:?}", name); let authz_attrs = self.authz_attrs(from).await?; let (dir, block_path, parent_key) = { let table_guard = self.table_guard().await; let mut value_guard = table_guard.write(parent).await?; let parent_block = &mut value_guard.block; self.authorizer.can_exec(&AuthzContext::new( from, &authz_attrs, parent_block.meta(), ))?; let dir = parent_block.read_dir()?; let meta_body = parent_block.meta_body(); let block_path = meta_body.path().to_owned(); let parent_key = meta_body.block_key()?.clone(); (dir, block_path, parent_key) }; let entry = dir .entry(name) .ok_or_else(|| io::Error::from_raw_os_error(libc::ENOENT))?; let inode = entry.inode(); let stat = { let table_guard = self .ensure_open(from, inode, block_path, Some(parent_key)) .await?; let mut value_guard = table_guard.write(inode).await?; let stat = value_guard.block.meta_body().secrets()?.to_owned(); value_guard.incr_lookup_count(from); stat }; let entry = self.bt_entry(stat); let reply = LookupReply { inode, generation: self.sb.generation, entry, }; Ok(reply) } } type CreateFut<'c> = impl 'c + Send + Future>; fn create<'c>(&'c self, from: &'c Arc, msg: Create<'c>) -> Self::CreateFut<'c> { async move { let Create { parent, name, flags, mode, umask, } = msg; debug!("create: parent {parent}, name {:?}", name); let authz_attrs = self.authz_attrs(from).await?; let name = msg.name.to_owned(); // Add a directory entry to the parent for the new inode. let (inode, mut block_path, parent_key) = { let table_guard = self.table_guard().await; let mut value_guard = table_guard.write(parent).await?; let block = &mut value_guard.block; self.authorizer.can_write(&AuthzContext::new( from, &authz_attrs, block.meta(), ))?; let mut dir = block.read_dir()?; if dir.contains_entry(&name) { return Err(io::Error::from_raw_os_error(libc::EEXIST).into()); } // Reserve a free inode. let inode = self.next_inode().await?; dir.add_file(name.clone(), inode)?; block.write_dir(&dir)?; let meta_body = block.meta_body(); let block_path = meta_body.path().clone(); let parent_key = meta_body.block_key()?.clone(); (inode, block_path, parent_key) }; block_path.push_component(name); let (handle, stat) = { let table_guard = self .ensure_open(from, inode, block_path, Some(parent_key)) .await?; let mut value_guard = table_guard.write(inode).await?; let handle = value_guard.new_handle(from.clone(), FlagValue::ReadWrite.into())?; let stat = { let mut block = value_guard.handle_guard_mut(from, handle).await?; let stat = block.mut_meta_body().access_secrets(|secrets| { secrets.block_id.generation = self.sb.generation; secrets.block_id.inode = inode; secrets.mode = mode & !umask; if flags.directory() { secrets.mode |= FileType::Dir.value(); } else { secrets.mode |= FileType::Reg.value(); } secrets.uid = authz_attrs.uid; secrets.gid = authz_attrs.gid; let now = Epoch::now(); secrets.atime = now; secrets.ctime = now; secrets.mtime = now; secrets.nlink = 1; Ok(secrets.to_owned()) })?; if flags.directory() { // Note that write_dir flushes data after writing, including // metadata. block.write_dir(&Directory::new())?; } else { block.flush_meta()?; } stat }; if flags.directory() { value_guard.convert_to_dir(handle)?; } value_guard.set_flags(handle, flags)?; (handle, stat) }; Ok(CreateReply { inode, handle, entry: self.bt_entry(stat), }) } } type OpenFut<'c> = impl 'c + Send + Future>; fn open<'c>(&'c self, from: &'c Arc, msg: Open) -> Self::OpenFut<'c> { async move { let Open { inode, flags } = msg; debug!("open: inode {inode}, flags {flags}"); if flags.value() & libc::O_APPEND != 0 { return Err(Self::unsupported_flag_err("O_APPEND")); } if flags.value() & libc::O_CLOEXEC != 0 { return Err(Self::unsupported_flag_err("O_CLOEXEC")); } let authz_attrs = self.authz_attrs(from).await?; let handle = { let table_guard = self.table_guard().await; let mut value = table_guard.write(inode).await?; let handle = value.new_handle(from.clone(), flags)?; { let block = { let result = value.handle_guard(from, handle).await; match result { Ok(block) => block, Err(ref err) => { let message = err.to_string(); drop(result); value.forget_handle(handle); return Err(bterr!(message)); } } }; let ctx = AuthzContext::new(from, &authz_attrs, block.meta()); if flags.readable() { if let Err(err) = self.authorizer.can_read(&ctx) { drop(block); value.forget_handle(handle); return Err(err.into()); } } if flags.writeable() { if let Err(err) = self.authorizer.can_write(&ctx) { drop(block); value.forget_handle(handle); return Err(err.into()); } } } if flags.directory() { if let Err(err) = value.convert_to_dir(handle) { value.forget_handle(handle); return Err(err.into()); } } if let Err(err) = value.set_flags(handle, flags) { value.forget_handle(handle); return Err(err); } handle }; Ok(OpenReply { handle }) } } type ReadGuard = BufGuard; type ReadFut<'c> = impl 'c + Send + Future>; fn read<'c>(&'c self, from: &'c Arc, msg: Read) -> Self::ReadFut<'c> { async move { let Read { inode, handle, offset, size, } = msg; debug!("read: inode {inode}, handle {handle}, offset {offset}, size {size}"); BufGuard::new(self.inodes.clone(), from, inode, handle, offset, size).await } } type WriteFut<'r> = impl 'r + Send + Future>; fn write<'c>( &'c self, from: &'c Arc, write: Write<&'c [u8]>, ) -> Self::WriteFut<'c> { async move { let Write { inode, handle, offset, mut data, } = write; debug!("write: inode {inode}, handle {handle}, offset {offset}"); let table_guard = self.table_guard().await; let mut value_guard = table_guard.write(inode).await?; let mut block = value_guard.handle_guard_mut(from, handle).await?; block.flags.assert_writeable()?; let pos = block.pos() as u64; if offset != pos { block.seek(SeekFrom::Start(offset))?; } let written = std::io::copy(&mut data, block.deref_mut())?; Ok(WriteReply { written }) } } type FlushFut<'c> = impl 'c + Send + Future>; fn flush<'c>(&'c self, from: &'c Arc, msg: Flush) -> Self::FlushFut<'c> { async move { let Flush { inode, handle } = msg; debug!("flush: inode {inode}, handle {handle}"); let table_guard = self.table_guard().await; let mut value_guard = table_guard.write(inode).await?; let mut handle_guard = match value_guard.handle_guard_mut(from, handle).await { Ok(guard) => guard, Err(err) => match err.downcast_ref::() { Some(Error::ReadOnlyHandle(..)) => { // We ignore attempts to flush read-only handles. return Ok(()); } _ => return Err(err), }, }; handle_guard.flush()?; Ok(()) } } type ReadDirFut<'c> = impl 'c + Send + Future>; fn read_dir<'c>(&'c self, from: &'c Arc, msg: ReadDir) -> Self::ReadDirFut<'c> { async move { let ReadDir { inode, handle, limit, state, } = msg; debug!("read_dir: inode {inode}, handle {handle}, state {state}"); let table_guard = self.table_guard().await; let value = table_guard.read(inode).await?; let handle_value = value .value(handle) .map_err(|_| bterr!(Error::InvalidHandle { handle, inode }))?; let (_, owner, flags) = handle_value.parts(); flags.assert_readable()?; owner.assert_eq(from)?; let dir = handle_value.directory()?; let state: usize = state.try_into()?; let server_limit = Self::READ_DIR_LIMIT.min(dir.num_entries() - state); let entries_len = if limit > 0 { server_limit.min(limit as usize) } else { server_limit }; let pairs = dir .entries() .skip(state) .take(entries_len) .map(|(name, entry)| (name.to_owned(), entry.to_owned())); let mut entries = Vec::with_capacity(entries_len); entries.extend(pairs); Ok(ReadDirReply { entries, new_state: (state + entries_len) as u64, }) } } type LinkFut<'c> = impl 'c + Send + Future>; fn link<'c>(&'c self, from: &'c Arc, msg: Link<'c>) -> Self::LinkFut<'c> { async move { let Link { inode, new_parent, name, } = msg; debug!("link: inode {inode}, new_parent {new_parent}, name {name}"); let authz_attrs = self.authz_attrs(from).await?; let table_guard = self.table_guard().await; let mut value_guard = table_guard.write(new_parent).await?; let parent_block = &mut value_guard.block; self.authorizer.can_write(&AuthzContext::new( from, &authz_attrs, parent_block.meta(), ))?; let mut dir = parent_block.read_dir()?; if dir.contains_entry(name) { return Err(io::Error::from_raw_os_error(libc::EEXIST).into()); } let attr = { let table_guard = self.table_guard().await; let mut value = table_guard.write(inode).await?; let block = value.block_mut(); let meta = block.mut_meta_body(); let attr = meta.access_secrets(|secrets| { secrets.nlink += 1; Ok(secrets.to_owned()) })?; block.flush_meta()?; value.incr_lookup_count(from); attr }; let file_type = FileType::from_value(attr.mode)?; let entry = match file_type { FileType::Reg => DirEntry::File(inode), FileType::Dir => DirEntry::Directory(inode), }; dir.insert_entry(name.to_owned(), entry); parent_block.write_dir(&dir)?; let entry = self.bt_entry(attr); Ok(LinkReply { entry }) } } type UnlinkFut<'c> = impl 'c + Send + Future>; fn unlink<'c>(&'c self, from: &'c Arc, msg: Unlink<'c>) -> Self::UnlinkFut<'c> { fn decr_nlink(secrets: &mut BlockMetaSecrets) -> Result { secrets.nlink -= 1; Ok(secrets.nlink) } async move { let Unlink { parent, name } = msg; debug!("unlink: parent {parent}, name {name}"); let authz_attrs = self.authz_attrs(from).await?; let (block_path, inode, parent_key) = { let table_guard = self.table_guard().await; let mut value_guard = table_guard.write(parent).await?; let parent_block = &mut value_guard.block; self.authorizer.can_write(&AuthzContext::new( from, &authz_attrs, parent_block.meta(), ))?; let mut dir = parent_block.read_dir()?; let entry = match dir.remove_entry(name) { None => return Err(io::Error::from_raw_os_error(libc::ENOENT).into()), Some(entry) => entry, }; let inode = entry.inode(); parent_block.write_dir(&dir)?; let meta_body = parent_block.meta_body(); let mut block_path = meta_body.path().clone(); block_path.push_component(name.to_owned()); let parent_key = meta_body.block_key()?.clone(); (block_path, inode, parent_key) }; let table_guard = self.inodes.read().await; let delete = if let Some(entry) = table_guard.get(&inode) { let mut value = entry.write().await; let nlink = value .block_mut() .mut_meta_body() .access_secrets(decr_nlink)?; value.delete = 0 == nlink; // If the block is about to be deleted then there's no point in flushing its // metadata. if !value.delete { value.block_mut().flush_meta()?; } // Since this block was already open, a client is keeping it alive. When they // choose to forget this inode it will be deleted. Thus we return false here. false } else { // It may be tempting to drop the table_guard here, but if this were done then // another this block file could be opened concurrently. let mut block = Self::open_block( &self.path, inode, self.creds.clone(), block_path, Some(parent_key), self.sb.inode_hash, &self.sb.inode_key, )?; let nlink = block.mut_meta_body().access_secrets(decr_nlink)?; if nlink > 0 { block.flush_meta()?; false } else { true } }; if delete { self.delete_block_file(inode)?; } Ok(()) } } type ReadMetaFut<'c> = impl 'c + Send + Future>; fn read_meta<'c>( &'c self, from: &'c Arc, msg: ReadMeta, ) -> Self::ReadMetaFut<'c> { async move { let ReadMeta { inode, handle } = msg; debug!("read_meta: inode {inode}, handle {:?}", handle); let table_guard = self.table_guard().await; let value_guard = table_guard.read(inode).await?; let attrs = if let Some(handle) = handle { let block = value_guard.handle_guard(from, handle).await?; block.meta_body().secrets()?.to_owned() } else { value_guard.block().meta_body().secrets()?.to_owned() }; debug!("read_meta attrs: {:?}", attrs); let reply = ReadMetaReply { attrs, valid_for: self.attr_timeout(), }; Ok(reply) } } type WriteMetaFut<'c> = impl 'c + Send + Future>; fn write_meta<'c>( &'c self, from: &'c Arc, msg: WriteMeta, ) -> Self::WriteMetaFut<'c> { async move { let WriteMeta { inode, handle, attrs, attrs_set, } = msg; debug!("write_meta: inode {inode}, handle {:?}", handle); let authz_attrs = self.authz_attrs(from).await?; let cb = |block: &mut FileBlock| { self.authorizer.can_write(&AuthzContext::new( from, &authz_attrs, block.meta(), ))?; let attrs = block.mut_meta_body().access_secrets(|secrets| { if attrs_set.mode() { secrets.mode = attrs.mode; } if attrs_set.uid() { secrets.uid = attrs.uid; } if attrs_set.gid() { secrets.gid = attrs.gid; } if attrs_set.atime() { secrets.atime = attrs.atime; } if attrs_set.mtime() { secrets.mtime = attrs.mtime; } if attrs_set.ctime() { secrets.ctime = attrs.ctime; } for (key, value) in attrs.tags.into_iter() { secrets.tags.insert(key, value); } Ok(secrets.to_owned()) })?; block.flush_meta()?; Ok::<_, btlib::Error>(attrs) }; let attrs = { let table_guard = self.table_guard().await; let mut value_guard = table_guard.write(inode).await?; if let Some(handle) = handle { let mut block = value_guard.handle_guard_mut(from, handle).await?; block.flags.assert_writeable()?; cb(block.get_mut()) } else { cb(value_guard.block.get_mut()) } }?; Ok(WriteMetaReply { attrs, valid_for: self.attr_timeout(), }) } } type AllocateFut<'c> = impl 'c + Send + Future>; fn allocate<'c>( &'c self, from: &'c Arc, msg: Allocate, ) -> Self::AllocateFut<'c> { async move { let Allocate { inode, handle, offset, size, } = msg; debug!( "allocate: inode {inode}, handle {handle}, offset {:?}, size {size}", offset ); let table_guard = self.table_guard().await; let mut value_guard = table_guard.write(inode).await?; let mut block = value_guard.handle_guard_mut(from, handle).await?; let curr_size = block.meta_body().secrets()?.size; if let Some(offset) = offset { if curr_size != offset { return Err(bterr!("only allocations at the end of files are supported")); } } let new_size = curr_size.max(size); if new_size > curr_size { block.zero_extend(new_size - curr_size)?; } Ok(()) } } type CloseFut<'c> = impl 'c + Send + Future>; fn close<'c>(&'c self, from: &'c Arc, msg: Close) -> Self::CloseFut<'c> { async move { let Close { inode, handle } = msg; debug!("close: inode {inode}, handle {handle}"); let table_guard = self.table_guard().await; let mut value = table_guard.write(inode).await?; match value.handle_guard_mut(from, handle).await { Ok(mut block) => { block.flush()?; block.flush_meta()?; } Err(err) => match err.downcast_ref::() { // If the cause of the error is that the handle is read-only, then it is // not actually an error. Some(Error::ReadOnlyHandle(_)) => (), _ => return Err(err), }, }; value.forget_handle(handle); Ok(()) } } type ForgetFut<'c> = impl 'c + Send + Future>; fn forget<'c>(&'c self, from: &'c Arc, msg: Forget) -> Self::ForgetFut<'c> { async move { let Forget { inode, count } = msg; debug!("forget: inode {inode}, count {count}"); self.inode_forget(from.clone(), inode, count).await.bterr() } } type LockFut<'c> = Ready>; fn lock<'c>(&'c self, _from: &'c Arc, _msg: Lock) -> Self::LockFut<'c> { todo!(); } type UnlockFut<'c> = Ready>; fn unlock<'c>(&'c self, _from: &'c Arc, _msg: Unlock) -> Self::UnlockFut<'c> { todo!(); } type AddReacapFut<'c> = impl 'c + Send + Future>; fn add_readcap<'c>( &'c self, from: &'c Arc, msg: AddReadcap, ) -> Self::AddReacapFut<'c> { async move { let AddReadcap { inode, handle, pub_creds, } = msg; debug!("add_readcap: inode {inode}, handle {handle}"); let table_guard = self.table_guard().await; let mut value_guard = table_guard.write(inode).await?; let mut block = value_guard.handle_guard_mut(from, handle).await?; block.mut_meta_body().add_readcap_for(pub_creds) } } type GrantAccessFut<'c> = impl 'c + Send + Future>; fn grant_access<'c>( &'c self, from: &'c Arc, msg: GrantAccess, ) -> Self::GrantAccessFut<'c> { let GrantAccess { inode, record } = msg; debug!("grant_access: inode {inode}, record {:?}", record); self.grant_access_to(from, inode, record) } } }