Prechádzať zdrojové kódy

Resolved some issues with btfuse tests not shuting down gracefully.

Matthew Carr 1 rok pred
rodič
commit
ada24686a9

+ 2 - 1
crates/btfuse/src/config.rs

@@ -6,9 +6,10 @@ use btmsg::BlockAddr;
 
 use std::{
     net::IpAddr,
+    num::NonZeroUsize,
     path::{Path, PathBuf},
     sync::Arc,
-    thread::available_parallelism, num::NonZeroUsize,
+    thread::available_parallelism,
 };
 
 #[derive(PartialEq, Eq, Clone)]

+ 1 - 0
crates/btfuse/src/fuse_daemon.rs

@@ -96,6 +96,7 @@ mod private {
             mut channel: FuseChannel,
         ) {
             loop {
+                debug!("server_loop {task_num} blocking on the channel");
                 match channel.get_request() {
                     Ok(Some((reader, writer))) => {
                         // Safety: reader and writer are not mutated while the future returned from

+ 72 - 29
crates/btfuse/src/main.rs

@@ -169,6 +169,7 @@ mod test {
         fs::Permissions,
         io::SeekFrom,
         net::{IpAddr, Ipv6Addr},
+        num::NonZeroUsize,
         os::unix::fs::PermissionsExt,
         time::Duration,
     };
@@ -184,7 +185,7 @@ mod test {
     };
 
     /// An optional timeout to limit the time spent waiting for the FUSE daemon to start in tests.
-    const TIMEOUT: Option<Duration> = Some(Duration::from_millis(1000));
+    const TIMEOUT: Option<Duration> = Some(Duration::from_millis(1250));
 
     /// The log level to use when running tests.
     /// Note that the debug log level significantly reduces performance.
@@ -251,6 +252,7 @@ mod test {
         let (stop_tx, stop_rx) = oneshot::channel();
         let (swtpm, cred_store) = swtpm();
         let builder = Config::builder()
+            .with_threads(Some(NonZeroUsize::new(1).unwrap()))
             .with_mnt_dir(Some(tmp.path().join("mnt")))
             .with_tpm_state_file(Some(swtpm.state_path().to_owned().into()))
             .with_tabrmd(Some(swtpm.tabrmd_config().to_owned()));
@@ -272,10 +274,11 @@ mod test {
             (builder.with_block_dir(Some(block_dir)).build(), None)
         };
         let config_clone = config.clone();
-        let handle =
-            tokio::spawn(
-                async move { run_daemon(config_clone, Some(mounted_tx), Some(stop_rx)).await },
-            );
+        let handle = tokio::spawn(async move {
+            let mnt_dir = config_clone.mnt_dir.clone();
+            run_daemon(config_clone, Some(mounted_tx), Some(stop_rx)).await;
+            unmount(mnt_dir);
+        });
         if let Some(timeout) = TIMEOUT {
             tokio::time::timeout(timeout, mounted_rx)
                 .await
@@ -322,12 +325,29 @@ mod test {
             &self.config.mnt_dir
         }
 
-        async fn finished(&mut self) {
+        /// Signals to the daemon that it must stop.
+        fn signal_stop(&mut self) {
+            if let Some(stop_tx) = self.stop_tx.take() {
+                if let Err(_) = stop_tx.send(()) {
+                    log::error!("failed to send the TestCase stop signal");
+                }
+            }
+        }
+
+        /// Returns a future that resolves when the daemon has stopped.
+        async fn stopped(&mut self) {
             if let Some(handle) = self.handle.take() {
                 handle.await.expect("join failed");
             }
         }
 
+        /// Signals to the daemon to stop and returns a future that resolves when after it has
+        /// stopped.
+        async fn stop(&mut self) {
+            self.signal_stop();
+            self.stopped().await;
+        }
+
         fn initial_contents(&self) -> Vec<&OsStr> {
             vec![&self.node_principal]
         }
@@ -335,12 +355,7 @@ mod test {
 
     impl<R: Receiver> Drop for TestCase<R> {
         fn drop(&mut self) {
-            self.stop_tx.take().map(|stop_tx| {
-                if let Err(_) = stop_tx.send(()) {
-                    log::error!("failed to send the TestCase stop signal");
-                }
-            });
-            unmount(self.mnt_dir());
+            self.signal_stop()
         }
     }
 
@@ -350,10 +365,10 @@ mod test {
     #[allow(dead_code)]
     async fn manual_test() {
         let mut case = new_local().await;
-        case.finished().await
+        case.stopped().await
     }
 
-    async fn write_read(case: TestCase<impl Receiver>) -> Result<()> {
+    async fn write_read(mut case: TestCase<impl Receiver>) -> Result<()> {
         const EXPECTED: &[u8] =
             b"The paths to failure are uncountable, yet to success there are few.";
         let file_path = case.mnt_dir().join("file");
@@ -362,6 +377,8 @@ mod test {
 
         let actual = read(&file_path).await?;
         assert_eq!(EXPECTED, actual);
+
+        case.stop().await;
         Ok(())
     }
 
@@ -378,7 +395,7 @@ mod test {
         write_read(new_remote().await).await
     }
 
-    async fn create_file_then_readdir(case: TestCase<impl Receiver>) {
+    async fn create_file_then_readdir(mut case: TestCase<impl Receiver>) {
         const DATA: &[u8] = b"Au revoir Shoshanna!";
         let file_name = OsStr::new("landa_dialog.txt");
         let mut expected = case.initial_contents();
@@ -392,6 +409,8 @@ mod test {
         assert_eq!(expected, first);
         let second = file_names(read_dir(&mnt_path).await.expect("read_dir failed")).await;
         assert_eq!(expected, second);
+
+        case.stop().await;
     }
 
     #[tokio::test]
@@ -404,7 +423,7 @@ mod test {
         create_file_then_readdir(new_remote().await).await
     }
 
-    async fn create_then_delete_file(case: TestCase<impl Receiver>) {
+    async fn create_then_delete_file(mut case: TestCase<impl Receiver>) {
         const DATA: &[u8] = b"The universe is hostile, so impersonal. Devour to survive";
         let file_name = OsStr::new("tool_lyrics.txt");
         let mnt_path = case.mnt_dir();
@@ -416,6 +435,8 @@ mod test {
         let expected = case.initial_contents();
         let actual = file_names(read_dir(&mnt_path).await.expect("read_dir failed")).await;
         assert_eq!(expected, actual);
+
+        case.stop().await;
     }
 
     #[tokio::test]
@@ -428,7 +449,7 @@ mod test {
         create_then_delete_file(new_remote().await).await
     }
 
-    async fn hard_link_then_remove(case: TestCase<impl Receiver>) {
+    async fn hard_link_then_remove(mut case: TestCase<impl Receiver>) {
         const EXPECTED: &[u8] = b"And the lives we've reclaimed";
         let name1 = OsStr::new("refugee_lyrics.txt");
         let name2 = OsStr::new("rise_against_lyrics.txt");
@@ -442,6 +463,8 @@ mod test {
 
         let actual = read(&path2).await.expect("read failed");
         assert_eq!(EXPECTED, actual);
+
+        case.stop().await;
     }
 
     #[tokio::test]
@@ -454,7 +477,7 @@ mod test {
         hard_link_then_remove(new_remote().await).await
     }
 
-    async fn hard_link_then_remove_both(case: TestCase<impl Receiver>) {
+    async fn hard_link_then_remove_both(mut case: TestCase<impl Receiver>) {
         const EXPECTED: &[u8] = b"And the lives we've reclaimed";
         let name1 = OsStr::new("refugee_lyrics.txt");
         let name2 = OsStr::new("rise_against_lyrics.txt");
@@ -474,6 +497,8 @@ mod test {
         let expected = case.initial_contents();
         let actual = file_names(read_dir(&mnt_path).await.expect("read_dir failed")).await;
         assert_eq!(expected, actual);
+
+        case.stop().await;
     }
 
     #[tokio::test]
@@ -486,7 +511,7 @@ mod test {
         hard_link_then_remove_both(new_remote().await).await
     }
 
-    async fn set_mode_bits(case: TestCase<impl Receiver>) {
+    async fn set_mode_bits(mut case: TestCase<impl Receiver>) {
         const EXPECTED: u32 = libc::S_IFREG | 0o777;
         let file_path = case.mnt_dir().join("bagobits");
         write(&file_path, []).await.expect("write failed");
@@ -507,6 +532,8 @@ mod test {
             .permissions()
             .mode();
         assert_eq!(EXPECTED, actual);
+
+        case.stop().await;
     }
 
     #[tokio::test]
@@ -519,7 +546,7 @@ mod test {
         set_mode_bits(new_remote().await).await
     }
 
-    async fn create_directory(case: TestCase<impl Receiver>) {
+    async fn create_directory(mut case: TestCase<impl Receiver>) {
         const EXPECTED: &str = "etc";
         let mnt_path = case.mnt_dir();
         let dir_path = mnt_path.join(EXPECTED);
@@ -530,6 +557,8 @@ mod test {
 
         let actual = file_names(read_dir(mnt_path).await.expect("read_dir failed")).await;
         assert_eq!(expected, actual);
+
+        case.stop().await;
     }
 
     #[tokio::test]
@@ -542,7 +571,7 @@ mod test {
         create_directory(new_remote().await).await
     }
 
-    async fn create_file_under_new_directory(case: TestCase<impl Receiver>) {
+    async fn create_file_under_new_directory(mut case: TestCase<impl Receiver>) {
         const DIR_NAME: &str = "etc";
         const FILE_NAME: &str = "file";
         let mnt_path = case.mnt_dir();
@@ -554,6 +583,8 @@ mod test {
 
         let actual = file_names(read_dir(dir_path).await.expect("read_dir failed")).await;
         assert_eq!([FILE_NAME].as_slice(), actual.as_slice());
+
+        case.stop().await;
     }
 
     #[tokio::test]
@@ -566,7 +597,7 @@ mod test {
         create_file_under_new_directory(new_remote().await).await
     }
 
-    async fn create_then_remove_directory(case: TestCase<impl Receiver>) {
+    async fn create_then_remove_directory(mut case: TestCase<impl Receiver>) {
         const DIR_NAME: &str = "etc";
         let mnt_path = case.mnt_dir();
         let dir_path = mnt_path.join(DIR_NAME);
@@ -576,6 +607,8 @@ mod test {
 
         let actual = file_names(read_dir(&mnt_path).await.expect("read_dir failed")).await;
         assert_eq!(case.initial_contents(), actual);
+
+        case.stop().await;
     }
 
     #[tokio::test]
@@ -588,7 +621,7 @@ mod test {
         create_then_remove_directory(new_remote().await).await
     }
 
-    async fn read_only_dir_cant_create_subdir(case: TestCase<impl Receiver>) {
+    async fn read_only_dir_cant_create_subdir(mut case: TestCase<impl Receiver>) {
         const DIR_NAME: &str = "etc";
         let dir_path = case.mnt_dir().join(DIR_NAME);
         create_dir(&dir_path).await.expect("create_dir failed");
@@ -601,6 +634,8 @@ mod test {
         let err = result.err().expect("create_dir returned `Ok`");
         let os_err = err.raw_os_error().expect("raw_os_error was empty");
         assert_eq!(os_err, libc::EACCES);
+
+        case.stop().await;
     }
 
     #[tokio::test]
@@ -613,7 +648,7 @@ mod test {
         read_only_dir_cant_create_subdir(new_remote().await).await
     }
 
-    async fn read_only_dir_cant_remove_subdir(case: TestCase<impl Receiver>) {
+    async fn read_only_dir_cant_remove_subdir(mut case: TestCase<impl Receiver>) {
         const DIR_NAME: &str = "etc";
         let dir_path = case.mnt_dir().join(DIR_NAME);
         let sub_path = dir_path.join("sub");
@@ -628,6 +663,8 @@ mod test {
         let err = result.err().expect("remove_dir returned `Ok`");
         let os_err = err.raw_os_error().expect("raw_os_error was empty");
         assert_eq!(os_err, libc::EACCES);
+
+        case.stop().await;
     }
 
     #[tokio::test]
@@ -640,7 +677,7 @@ mod test {
         read_only_dir_cant_remove_subdir(new_remote().await).await
     }
 
-    async fn rename_file(case: TestCase<impl Receiver>) {
+    async fn rename_file(mut case: TestCase<impl Receiver>) {
         const FILE_NAME: &str = "parabola.txt";
         const EXPECTED: &[u8] = b"We are eternal all this pain is an illusion";
         let src_path = case.mnt_dir().join(FILE_NAME);
@@ -650,7 +687,9 @@ mod test {
         rename(&src_path, &dst_path).await.unwrap();
 
         let actual = read(&dst_path).await.unwrap();
-        assert_eq!(EXPECTED, actual)
+        assert_eq!(EXPECTED, actual);
+
+        case.stop().await;
     }
 
     #[tokio::test]
@@ -663,7 +702,7 @@ mod test {
         rename_file(new_remote().await).await
     }
 
-    async fn write_read_with_file_struct(case: TestCase<impl Receiver>) {
+    async fn write_read_with_file_struct(mut case: TestCase<impl Receiver>) {
         const FILE_NAME: &str = "big.dat";
         const LEN: usize = btlib::SECTOR_SZ_DEFAULT + 1;
         fn fill(buf: &mut Vec<u8>, value: u8) {
@@ -690,6 +729,8 @@ mod test {
 
         fill(&mut buf, 1);
         assert_eq!(buf, actual);
+
+        case.stop().await;
     }
 
     #[tokio::test]
@@ -697,14 +738,14 @@ mod test {
         write_read_with_file_struct(new_local().await).await
     }
 
-    //#[tokio::test(flavor = "multi_thread")]
+    #[tokio::test(flavor = "multi_thread")]
     async fn write_read_with_file_struct_remote() {
         write_read_with_file_struct(new_remote().await).await
     }
 
     /// KMC: This test is currently not working, and I've not been able to figure out why, nor
     /// reproduce it at a lower layer of the stack.
-    async fn read_more_than_whats_buffered(case: TestCase<impl Receiver>) {
+    async fn read_more_than_whats_buffered(mut case: TestCase<impl Receiver>) {
         const FILE_NAME: &str = "big.dat";
         const SECT_SZ: usize = btlib::SECTOR_SZ_DEFAULT;
         const DIVISOR: usize = 8;
@@ -734,6 +775,8 @@ mod test {
 
         buf.truncate(READ_SZ);
         assert!(buf == actual);
+
+        case.stop().await;
     }
 
     //#[tokio::test]

+ 15 - 4
crates/btmsg/src/lib.rs

@@ -17,8 +17,8 @@ use core::{
     pin::Pin,
 };
 use futures::{FutureExt, SinkExt};
-use log::error;
-use quinn::{Connection, Endpoint, RecvStream, SendStream};
+use log::{error, debug};
+use quinn::{Connection, Endpoint, RecvStream, SendStream, ConnectionError};
 use serde::{de::DeserializeOwned, Deserialize, Serialize};
 use std::{
     any::Any,
@@ -513,8 +513,19 @@ impl QuicReceiver {
         );
         loop {
             let result = await_or_stop!(connection.accept_bi().map(Some), stop_rx.recv());
-            let (send_stream, recv_stream) =
-                unwrap_or_continue!(result, |err| error!("error accepting stream: {err}"));
+            let (send_stream, recv_stream) = match result {
+                Ok(pair) => pair,
+                Err(err) => match err {
+                    ConnectionError::ApplicationClosed(app) => {
+                        debug!("connection closed: {app}");
+                        return;
+                    }
+                    _ => {
+                        error!("error accepting stream: {err}");
+                        continue;
+                    }
+                }
+            };
             let client_path = client_path.clone();
             let callback = callback.clone();
             tokio::task::spawn(Self::handle_message(