Browse Source

Fixed some reference counting bugs in `LocalFs`.

Matthew Carr 1 year ago
parent
commit
1d4cdfb617

+ 88 - 16
crates/btfproto-tests/src/local_fs_tests.rs

@@ -471,7 +471,7 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn rename_in_same_directory() {
+    async fn link_in_same_directory() {
         let case = LocalFsTest::new_empty().await;
         let bt = &case.fs;
         let from = case.from();
@@ -517,7 +517,7 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn rename_to_different_directory() {
+    async fn link_in_different_directory() {
         let case = LocalFsTest::new_empty().await;
         let bt = &case.fs;
         let from = case.from();
@@ -570,9 +570,12 @@ mod tests {
         assert_enoent(result);
     }
 
+    /// Tests that link can be used to overwrite a file in the same directory.
     #[tokio::test]
-    async fn rename_no_replace_same_directory() {
+    async fn link_replace_same_directory() {
+        const EXPECTED: &[u8] = b"got 'em";
         let case = LocalFsTest::new_empty().await;
+        let block_dir = case.dir.path();
         let bt = &case.fs;
         let from = case.from();
         let root: Inode = SpecInodes::RootDir.into();
@@ -582,11 +585,20 @@ mod tests {
         let create_msg = Create {
             parent: root,
             name: oldname,
-            flags: Flags::default(),
+            flags: FlagValue::ReadWrite.into(),
             mode: 0o644,
             umask: 0,
         };
-        let CreateReply { inode, .. } = bt.create(from, create_msg).await.unwrap();
+        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
+        let write_msg = Write {
+            inode,
+            handle,
+            offset: 0,
+            data: EXPECTED,
+        };
+        bt.write(from, write_msg).await.unwrap();
+        let close_msg = Close { inode, handle };
+        bt.close(from, close_msg).await.unwrap();
         let create_msg = Create {
             parent: root,
             name: newname,
@@ -594,21 +606,48 @@ mod tests {
             mode: 0o644,
             umask: 0,
         };
-        bt.create(from, create_msg).await.unwrap();
+        let CreateReply {
+            inode: newname_inode,
+            ..
+        } = bt.create(from, create_msg).await.unwrap();
+        let before = num_files(block_dir).unwrap();
 
         let link_msg = Link {
             inode,
             new_parent: root,
             name: newname,
         };
-        let result = bt.link(from, link_msg).await;
-        let err = result.err().unwrap().downcast::<io::Error>().unwrap();
-        assert_eq!(io::ErrorKind::AlreadyExists, err.kind());
+        bt.link(from, link_msg).await.unwrap();
+        let forget_msg = Forget {
+            inode: newname_inode,
+            count: 1,
+        };
+        bt.forget(from, forget_msg).await.unwrap();
+
+        let open_msg = Open {
+            inode,
+            flags: FlagValue::ReadOnly.into(),
+        };
+        let OpenReply { handle, .. } = bt.open(from, open_msg).await.unwrap();
+        let read_msg = Read {
+            inode,
+            handle,
+            offset: 0,
+            size: 2 * EXPECTED.len() as u64,
+        };
+        let buf_guard = bt.read(from, read_msg).await.unwrap();
+        assert_eq!(EXPECTED, buf_guard.deref());
+        // Check that the old block file was deleted.
+        let actual = num_files(block_dir).unwrap();
+        assert_eq!(actual, before - 1);
     }
 
+    /// Tests that `link` can be used to overwrite a file in a different directory.
     #[tokio::test]
-    async fn rename_no_replace_different_directory() {
+    async fn link_replace_different_directory() {
+        const EXPECTED: &[u8] = b"got 'em";
         let case = LocalFsTest::new_empty().await;
+        let block_dir = case.dir.path();
         let bt = &case.fs;
         let from = case.from();
         let root = SpecInodes::RootDir.into();
@@ -626,11 +665,20 @@ mod tests {
         let create_msg = Create {
             parent: root,
             name: file_name,
-            flags: Flags::default(),
+            flags: FlagValue::ReadWrite.into(),
             mode: 0o644,
             umask: 0,
         };
-        let CreateReply { inode, .. } = bt.create(from, create_msg).await.unwrap();
+        let CreateReply { inode, handle, .. } = bt.create(from, create_msg).await.unwrap();
+        let write_msg = Write {
+            inode,
+            handle,
+            offset: 0,
+            data: EXPECTED,
+        };
+        bt.write(from, write_msg).await.unwrap();
+        let close_msg = Close { inode, handle };
+        bt.close(from, close_msg).await.unwrap();
         let create_msg = Create {
             parent: dir,
             name: file_name,
@@ -638,16 +686,40 @@ mod tests {
             mode: 0o644,
             umask: 0,
         };
-        bt.create(from, create_msg).await.unwrap();
+        let CreateReply {
+            inode: newdir_inode,
+            ..
+        } = bt.create(from, create_msg).await.unwrap();
+        let before = num_files(block_dir).unwrap();
 
         let link_msg = Link {
             inode,
             new_parent: dir,
             name: file_name,
         };
-        let result = bt.link(from, link_msg).await;
-        let err = result.err().unwrap().downcast::<io::Error>().unwrap();
-        assert_eq!(io::ErrorKind::AlreadyExists, err.kind());
+        bt.link(from, link_msg).await.unwrap();
+        let forget_msg = Forget {
+            inode: newdir_inode,
+            count: 1,
+        };
+        bt.forget(from, forget_msg).await.unwrap();
+
+        let open_msg = Open {
+            inode,
+            flags: FlagValue::ReadOnly.into(),
+        };
+        let OpenReply { handle, .. } = bt.open(from, open_msg).await.unwrap();
+        let read_msg = Read {
+            inode,
+            handle,
+            offset: 0,
+            size: 2 * EXPECTED.len() as u64,
+        };
+        let buf_guard = bt.read(from, read_msg).await.unwrap();
+        assert_eq!(EXPECTED, buf_guard.deref());
+        // Check that the old block file was deleted.
+        let actual = num_files(block_dir).unwrap();
+        assert_eq!(actual, before - 1);
     }
 
     #[tokio::test]

+ 78 - 52
crates/btfproto/src/local_fs.rs

@@ -378,7 +378,6 @@ mod private {
         handle_values: HashMap<Handle, HandleValue>,
         next_handle: Handle,
         lookup_counts: HashMap<Arc<BlockPath>, u64>,
-        delete: bool,
     }
 
     impl InodeTableValue {
@@ -393,7 +392,6 @@ mod private {
                 handle_values: HashMap::new(),
                 next_handle: 1,
                 lookup_counts,
-                delete: false,
             }
         }
 
@@ -973,8 +971,8 @@ mod private {
             if 0 == lookup_count {
                 let entry = Arc::try_unwrap(inodes.remove(&inode).unwrap())
                     .map_err(|_| bterr!("LOGIC ERROR: entry for inode {inode} was still in use while it was being forgotten"))?;
-                let delete = entry.into_inner().delete;
-                if delete {
+                let value = entry.into_inner();
+                if value.block().meta_body().secrets()?.nlink == 0 {
                     self.delete_block_file(inode)?;
                 }
             }
@@ -1087,6 +1085,10 @@ mod private {
                 let mut value_guard = table_guard.write(next_inode).await?;
                 let block = &mut value_guard.block;
                 block.write_proc_rec(&ProcRec::Valid(proc_rec))?;
+                block.mut_meta_body().access_secrets(|secrets| {
+                    secrets.nlink += 1;
+                    Ok(())
+                })?;
             };
             // We must ensure the reference count for the inode is decremented, otherwise the table
             // entry will never be freed.
@@ -1167,6 +1169,67 @@ mod private {
             let proc_rec = proc_rec.validate()?;
             Ok(proc_rec.authz_attrs)
         }
+
+        /// Decrements the link count of the given inode.
+        ///
+        /// If the link count goes to zero, the inode's file in the local filesystem is deleted.
+        /// The `block_path` parameter is the path to the block identified by the inode.
+        /// The `parent_key` is the block key of the inode's parent.
+        async fn decr_link_count<G>(
+            &self,
+            inode: Inode,
+            block_path: BlockPath,
+            parent_key: SymKey,
+            table_guard: &TableGuard<G>,
+        ) -> Result<()>
+        where
+            G: Deref<Target = InodeTable>,
+        {
+            fn decr_nlink(secrets: &mut BlockMetaSecrets) -> Result<u32> {
+                secrets.nlink -= 1;
+                Ok(secrets.nlink)
+            }
+            let delete = match table_guard.write(inode).await {
+                Ok(mut value) => {
+                    value
+                        .block_mut()
+                        .mut_meta_body()
+                        .access_secrets(decr_nlink)?;
+                    value.block_mut().flush_meta()?;
+                    // Since this block was already open, a client is keeping it alive. When they
+                    // choose to forget this inode it will be deleted. Thus we return false here.
+                    false
+                }
+                Err(err) => {
+                    if let Some(Error::NotOpen(_)) = err.downcast_ref::<Error>() {
+                        // It may be tempting to drop the table_guard here, but if this were done then
+                        // another this block file could be opened concurrently.
+                        let mut block = Self::open_block(
+                            &self.path,
+                            inode,
+                            self.creds.clone(),
+                            block_path,
+                            Some(parent_key),
+                            self.sb.inode_hash,
+                            &self.sb.inode_key,
+                        )?;
+                        let nlink = block.mut_meta_body().access_secrets(decr_nlink)?;
+                        if nlink > 0 {
+                            block.flush_meta()?;
+                            false
+                        } else {
+                            true
+                        }
+                    } else {
+                        return Err(err);
+                    }
+                }
+            };
+            if delete {
+                self.delete_block_file(inode)?;
+            }
+            Ok(())
+        }
     }
 
     unsafe impl<A: Sync> Sync for LocalFs<A> {}
@@ -1565,7 +1628,7 @@ mod private {
                     new_parent,
                     name,
                 } = msg;
-                debug!("link: inode {inode}, new_parent {new_parent}, name {name}");
+                debug!("link: inode {inode}, new_parent {new_parent}, name '{name}'");
                 let authz_attrs = self.authz_attrs(from).await?;
                 let table_guard = self.table_guard().await;
                 let mut value_guard = table_guard.write(new_parent).await?;
@@ -1577,8 +1640,13 @@ mod private {
                 ))?;
 
                 let mut dir = parent_block.read_dir()?;
-                if dir.contains_entry(name) {
-                    return Err(io::Error::from_raw_os_error(libc::EEXIST).into());
+                if let Some(old_entry) = dir.entry(name) {
+                    let parent_meta = parent_block.meta_body();
+                    let mut block_path = parent_meta.path().to_owned();
+                    block_path.push_component(name);
+                    let parent_key = parent_meta.block_key()?.clone();
+                    self.decr_link_count(old_entry.inode(), block_path, parent_key, &table_guard)
+                        .await?;
                 }
 
                 let attr = {
@@ -1608,16 +1676,12 @@ mod private {
 
         type UnlinkFut<'c> = impl 'c + Send + Future<Output = Result<()>>;
         fn unlink<'c>(&'c self, from: &'c Arc<BlockPath>, msg: Unlink<'c>) -> Self::UnlinkFut<'c> {
-            fn decr_nlink(secrets: &mut BlockMetaSecrets) -> Result<u32> {
-                secrets.nlink -= 1;
-                Ok(secrets.nlink)
-            }
             async move {
                 let Unlink { parent, name } = msg;
                 debug!("unlink: parent {parent}, name {name}");
                 let authz_attrs = self.authz_attrs(from).await?;
+                let table_guard = self.table_guard().await;
                 let (block_path, inode, parent_key) = {
-                    let table_guard = self.table_guard().await;
                     let mut value_guard = table_guard.write(parent).await?;
                     let parent_block = &mut value_guard.block;
 
@@ -1642,46 +1706,8 @@ mod private {
                     (block_path, inode, parent_key)
                 };
 
-                let table_guard = self.inodes.read().await;
-                let delete = if let Some(entry) = table_guard.get(&inode) {
-                    let mut value = entry.write().await;
-                    let nlink = value
-                        .block_mut()
-                        .mut_meta_body()
-                        .access_secrets(decr_nlink)?;
-                    value.delete = 0 == nlink;
-                    // If the block is about to be deleted then there's no point in flushing its
-                    // metadata.
-                    if !value.delete {
-                        value.block_mut().flush_meta()?;
-                    }
-                    // Since this block was already open, a client is keeping it alive. When they
-                    // choose to forget this inode it will be deleted. Thus we return false here.
-                    false
-                } else {
-                    // It may be tempting to drop the table_guard here, but if this were done then
-                    // another this block file could be opened concurrently.
-                    let mut block = Self::open_block(
-                        &self.path,
-                        inode,
-                        self.creds.clone(),
-                        block_path,
-                        Some(parent_key),
-                        self.sb.inode_hash,
-                        &self.sb.inode_key,
-                    )?;
-                    let nlink = block.mut_meta_body().access_secrets(decr_nlink)?;
-                    if nlink > 0 {
-                        block.flush_meta()?;
-                        false
-                    } else {
-                        true
-                    }
-                };
-                if delete {
-                    self.delete_block_file(inode)?;
-                }
-                Ok(())
+                self.decr_link_count(inode, block_path, parent_key, &table_guard)
+                    .await
             }
         }
 

+ 0 - 1
crates/btfuse/src/fuse_daemon.rs

@@ -117,7 +117,6 @@ mod private {
             mut channel: FuseChannel,
         ) {
             loop {
-                debug!("server_loop {task_num} blocking on the channel");
                 match channel.get_request() {
                     Ok(Some((reader, writer))) => {
                         // Safety: reader and writer are not mutated while the future returned from

+ 1 - 1
crates/btfuse/src/main.rs

@@ -418,7 +418,7 @@ mod test {
 
     /// Creates a new file system and mounts it at `/tmp/btfuse.<random>/mnt` so it can be
     /// tested manually.
-    //#[tokio::test]
+    #[tokio::test]
     #[allow(dead_code)]
     async fn manual_test() {
         let mut case = new_local().await;

+ 8 - 3
crates/btrun/src/lib.rs

@@ -31,7 +31,7 @@ pub fn new_runtime<C: 'static + Send + Sync + Creds>(
 }
 
 /// An actor runtime.
-/// 
+///
 /// Actors can be activated by the runtime and execute autonomously until they halt. Running actors
 /// can be sent messages using the `send` method, which does not wait for a response from the
 /// recipient. If a reply is needed, then `call` can be used, which returns a future that will not
@@ -141,7 +141,12 @@ impl<Rx: Receiver> Runtime<Rx> {
     }
 
     /// Registers an actor as a service with the given [ServiceId].
-    pub async fn register<Msg, Fut, F, G>(&self, _id: ServiceId, _activator: F, _deserializer: G) -> Result<()>
+    pub async fn register<Msg, Fut, F, G>(
+        &self,
+        _id: ServiceId,
+        _activator: F,
+        _deserializer: G,
+    ) -> Result<()>
     where
         Msg: 'static + CallMsg,
         Fut: 'static + Send + Future<Output = ()>,
@@ -238,7 +243,7 @@ impl<'de> WireEnvelope<'de> {
 }
 
 /// Wraps a message to indicate if it was sent with `call` or `send`.
-/// 
+///
 /// If the message was sent with call, then this enum will contain a channel that can be used to
 /// reply to it.
 pub enum Envelope<T: CallMsg> {