|
@@ -14,7 +14,14 @@ use futures::{
|
|
|
};
|
|
|
use lazy_static::lazy_static;
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
-use std::{io, marker::PhantomData, net::Shutdown, path::PathBuf};
|
|
|
+use std::{
|
|
|
+ io,
|
|
|
+ marker::PhantomData,
|
|
|
+ net::Shutdown,
|
|
|
+ path::PathBuf,
|
|
|
+ collections::hash_map::DefaultHasher,
|
|
|
+ hash::{Hash, Hasher},
|
|
|
+};
|
|
|
use tokio::{
|
|
|
io::{AsyncRead, AsyncWrite, ReadBuf},
|
|
|
net::UnixDatagram,
|
|
@@ -39,23 +46,56 @@ mod private {
|
|
|
|
|
|
/// Appends the given Blocktree path to the path of the given directory.
|
|
|
fn socket_path(fs_path: &mut PathBuf, addr: &BlockAddr) {
|
|
|
- fs_path.push(addr.inode.to_string());
|
|
|
+ fs_path.push(addr.num.value().to_string());
|
|
|
+ }
|
|
|
+
|
|
|
+ /// An identifier for a block. Persistent blocks (files, directories, and servers) are
|
|
|
+ /// identified by the `Inode` variant and transient blocks (processes) are identified by the
|
|
|
+ /// PID variant.
|
|
|
+ #[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Debug)]
|
|
|
+ pub enum BlockNum {
|
|
|
+ Inode(u64),
|
|
|
+ Pid(u64),
|
|
|
+ }
|
|
|
+
|
|
|
+ impl BlockNum {
|
|
|
+ pub fn value(&self) -> u64 {
|
|
|
+ match self {
|
|
|
+ BlockNum::Inode(value) => *value,
|
|
|
+ BlockNum::Pid(value) => *value,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ impl From<BlockNum> for u64 {
|
|
|
+ fn from(value: BlockNum) -> Self {
|
|
|
+ value.value()
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- #[derive(Serialize, Deserialize, PartialEq, Eq, Default, Hash, Clone, Debug)]
|
|
|
+ #[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Debug)]
|
|
|
pub struct BlockAddr {
|
|
|
+ /// The root principal of the blocktree this block is part of.
|
|
|
pub root: Principal,
|
|
|
- pub generation: u64,
|
|
|
- pub inode: u64,
|
|
|
+ /// The cluster ID this block is served by.
|
|
|
+ pub cluster: u64,
|
|
|
+ /// The number of this block.
|
|
|
+ pub num: BlockNum,
|
|
|
}
|
|
|
|
|
|
impl BlockAddr {
|
|
|
- pub fn new(root: Principal, generation: u64, inode: u64) -> Self {
|
|
|
- Self {
|
|
|
- root,
|
|
|
- generation,
|
|
|
- inode,
|
|
|
- }
|
|
|
+ pub fn new(root: Principal, cluster: u64, num: BlockNum) -> Self {
|
|
|
+ Self { root, cluster, num }
|
|
|
+ }
|
|
|
+
|
|
|
+ pub fn port(&self) -> u16 {
|
|
|
+ let mut hasher = DefaultHasher::new();
|
|
|
+ self.hash(&mut hasher);
|
|
|
+ let hash = hasher.finish();
|
|
|
+ // We compute a port in the dynamic range [49152, 65535] as defined by RFC 6335.
|
|
|
+ const NUM_RES_PORTS: u16 = 49153;
|
|
|
+ const PORTS_AVAIL: u64 = (u16::MAX - NUM_RES_PORTS) as u64;
|
|
|
+ NUM_RES_PORTS + (hash % PORTS_AVAIL) as u16
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -91,11 +131,6 @@ mod private {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- #[derive(Serialize, Deserialize)]
|
|
|
- enum VerMsg<T> {
|
|
|
- V0(Msg<T>),
|
|
|
- }
|
|
|
-
|
|
|
/// A type which can be used to send messages.
|
|
|
pub trait Sender {
|
|
|
type SendFut<'a, T>: 'a + Future<Output = Result<()>> + core::marker::Send
|
|
@@ -135,7 +170,7 @@ mod private {
|
|
|
const U64_LEN: usize = std::mem::size_of::<u64>();
|
|
|
let payload = dst.split_off(U64_LEN);
|
|
|
let mut writer = payload.writer();
|
|
|
- write_to(&VerMsg::V0(item), &mut writer)?;
|
|
|
+ write_to(&item, &mut writer)?;
|
|
|
let payload = writer.into_inner();
|
|
|
let payload_len = payload.len() as u64;
|
|
|
let mut writer = dst.writer();
|
|
@@ -168,7 +203,7 @@ mod private {
|
|
|
src.reserve(payload_len - slice.len());
|
|
|
return Ok(None);
|
|
|
}
|
|
|
- let VerMsg::V0(msg) = read_from(&mut slice)?;
|
|
|
+ let msg = read_from(&mut slice)?;
|
|
|
// Consume all the bytes that have been read out of the buffer.
|
|
|
let _ = src.split_to(std::mem::size_of::<u64>() + payload_len);
|
|
|
Ok(Some(msg))
|
|
@@ -365,7 +400,7 @@ mod tests {
|
|
|
}
|
|
|
|
|
|
fn block_addr(generation: u64, inode: u64) -> BlockAddr {
|
|
|
- BlockAddr::new(ROOT_PRINCIPAL.clone(), generation, inode)
|
|
|
+ BlockAddr::new(ROOT_PRINCIPAL.clone(), generation, BlockNum::Inode(inode))
|
|
|
}
|
|
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
@@ -411,6 +446,15 @@ mod tests {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// This tests just server to ensure that changes to the port hashing algorithm are made
|
|
|
+ /// deliberately.
|
|
|
+ #[test]
|
|
|
+ fn hash_block_addr() {
|
|
|
+ let block_addr = BlockAddr::new(ROOT_PRINCIPAL.clone(), 1, BlockNum::Inode(1));
|
|
|
+ let port = block_addr.port();
|
|
|
+ assert_eq!(64984, port);
|
|
|
+ }
|
|
|
+
|
|
|
#[tokio::test]
|
|
|
async fn message_received_is_message_sent() {
|
|
|
let case = TestCase::new();
|