@@ -16,7 +16,7 @@ use core::{
use futures::{
future::{ready, Ready},
- sink::{Close, Send as SendFut, Sink},
+ sink::Send as SendFut,
SinkExt, StreamExt,
@@ -35,19 +35,14 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
hash::{Hash, Hasher},
- io,
- net::{IpAddr, Shutdown, SocketAddr},
+ net::{IpAddr, SocketAddr},
-use tokio::{
- io::{AsyncRead, AsyncWrite, ReadBuf},
- net::UnixDatagram,
- sync::{
- broadcast::{self, error::TryRecvError},
- mpsc,
- },
+use tokio::sync::{
+ broadcast::{self, error::TryRecvError},
+ mpsc,
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite};
@@ -56,15 +51,16 @@ use zerocopy::FromBytes;
pub use private::*;
mod private {
use super::*;
/// Returns a [Router] which can be used to make a [Receiver] for the given path and
/// [Sender] instances for any path.
pub fn router<C: 'static + Creds + Send + Sync>(
- addr: Arc<BlockAddr>,
+ ip_addr: IpAddr,
creds: Arc<C>,
) -> Result<impl Router> {
+ let writecap = creds.writecap().ok_or(btlib::BlockError::MissingWritecap)?;
+ let addr = Arc::new(BlockAddr::new(ip_addr, Arc::new(writecap.bind_path())));
QuicRouter::new(addr, Arc::new(CertResolver::new(creds)?))
@@ -77,12 +73,6 @@ mod private {
- /// Appends the given Blocktree path to the path of the given directory.
- #[allow(dead_code)]
- fn socket_path(fs_path: &mut PathBuf, addr: &BlockAddr) {
- fs_path.push(addr.path.to_string());
- }
fn common_config<Side: ConfigSide>(
builder: ConfigBuilder<Side, WantsCipherSuites>,
) -> Result<ConfigBuilder<Side, WantsVerifier>> {
@@ -126,13 +116,14 @@ mod private {
+ /// Verifier for the certificate chain presented by the server.
struct ServerCertVerifier {
- peer_path: Arc<BlockPath>,
+ server_path: Arc<BlockPath>,
impl ServerCertVerifier {
- fn new(path: Arc<BlockPath>) -> Self {
- Self { peer_path: path }
+ fn new(server_path: Arc<BlockPath>) -> Self {
+ Self { server_path }
@@ -148,9 +139,14 @@ mod private {
) -> std::result::Result<rustls::client::ServerCertVerified, rustls::Error> {
let (writecap, ..) =
Writecap::from_cert_chain(end_entity, intermediates).map_err(to_cert_err)?;
- writecap
- .assert_valid_for(&self.peer_path)
- .map_err(to_cert_err)?;
+ let path = writecap.bind_path();
+ if &path != self.server_path.as_ref() {
+ return Err(rustls::Error::InvalidCertificateData(format!(
+ "expected writecap with path '{}' got writecap with path '{path}'",
+ self.server_path
+ )));
+ }
+ writecap.assert_valid_for(&path).map_err(to_cert_err)?;
@@ -165,6 +161,7 @@ mod private {
+ /// Verifier for the certificate chain presented by the client.
struct ClientCertVerifier;
impl rustls::server::ClientCertVerifier for ClientCertVerifier {
@@ -320,30 +317,6 @@ mod private {
- /// An identifier for a block. Persistent blocks (files, directories, and servers) are
- /// identified by the `Inode` variant and transient blocks (processes) are identified by the
- /// PID variant.
- #[derive(Serialize, Deserialize, PartialEq, Eq, Hash, Clone, Debug)]
- pub enum BlockNum {
- Inode(u64),
- Pid(u64),
- }
- impl BlockNum {
- pub fn value(&self) -> u64 {
- match self {
- BlockNum::Inode(value) => *value,
- BlockNum::Pid(value) => *value,
- }
- }
- }
- impl From<BlockNum> for u64 {
- fn from(value: BlockNum) -> Self {
- value.value()
- }
- }
#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub struct BlockAddr {
ip_addr: IpAddr,
@@ -355,6 +328,14 @@ mod private {
Self { ip_addr, path }
+ pub fn ip_addr(&self) -> IpAddr {
+ self.ip_addr
+ }
+ pub fn path(&self) -> &BlockPath {
+ self.path.as_ref()
+ }
fn port(&self) -> u16 {
let mut hasher = DefaultHasher::new();
self.path.hash(&mut hasher);
@@ -399,6 +380,18 @@ mod private {
+ /// A message tagged with the block path that it was sent from.
+ pub struct MsgReceived<T> {
+ pub from: Arc<BlockPath>,
+ pub msg: Msg<T>,
+ }
+ impl<T> MsgReceived<T> {
+ fn new(from: Arc<BlockPath>, msg: Msg<T>) -> Self {
+ Self { from, msg }
+ }
+ }
/// A type which can be used to send messages.
/// Once the "Permit impl Trait in type aliases" https://github.com/rust-lang/rust/issues/63063
/// feature lands the future types in this trait should be rewritten to use it.
@@ -426,7 +419,7 @@ mod private {
/// A type which can be used to receive messages.
- pub trait Receiver<T>: Stream<Item = Result<Msg<T>>> {
+ pub trait Receiver<T>: Stream<Item = Result<MsgReceived<T>>> {
fn addr(&self) -> &BlockAddr;
@@ -511,178 +504,6 @@ mod private {
- /// Wraps a [UnixDatagram] and implements [AsyncRead] and [AsyncWrite] for it. Read operations
- /// are translated to calls to `recv_from` and write operations are translated to `send`. Note
- /// that this means that writes will fail unless the wrapped socket is connected to a peer.
- struct DatagramAdapter {
- socket: UnixDatagram,
- }
- impl DatagramAdapter {
- #[allow(dead_code)]
- fn new(socket: UnixDatagram) -> Self {
- Self { socket }
- }
- fn get_ref(&self) -> &UnixDatagram {
- &self.socket
- }
- fn get_mut(&mut self) -> &mut UnixDatagram {
- &mut self.socket
- }
- }
- impl AsRef<UnixDatagram> for DatagramAdapter {
- fn as_ref(&self) -> &UnixDatagram {
- self.get_ref()
- }
- }
- impl AsMut<UnixDatagram> for DatagramAdapter {
- fn as_mut(&mut self) -> &mut UnixDatagram {
- self.get_mut()
- }
- }
- impl AsyncRead for DatagramAdapter {
- fn poll_read(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut ReadBuf<'_>,
- ) -> Poll<io::Result<()>> {
- self.socket.poll_recv(cx, buf)
- }
- }
- impl AsyncWrite for DatagramAdapter {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<io::Result<usize>> {
- self.socket.poll_send(cx, buf)
- }
- fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Poll::Ready(self.socket.shutdown(Shutdown::Write))
- }
- fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
- Poll::Ready(Ok(()))
- }
- }
- /// An implementation of [Receiver] which uses a Unix datagram socket for receiving messages.
- struct UnixReceiver<T> {
- addr: BlockAddr,
- socket: FramedRead<DatagramAdapter, MsgDecoder<T>>,
- }
- impl<T: DeserializeOwned> UnixReceiver<T> {
- #[allow(dead_code)]
- fn new(mut fs_path: PathBuf, addr: BlockAddr) -> Result<Self> {
- socket_path(&mut fs_path, &addr);
- let socket = DatagramAdapter::new(UnixDatagram::bind(fs_path)?);
- let socket = FramedRead::new(socket, MsgDecoder(PhantomData));
- Ok(Self { addr, socket })
- }
- }
- impl<T: DeserializeOwned> Stream for UnixReceiver<T> {
- type Item = Result<Msg<T>>;
- fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- self.socket.poll_next_unpin(cx)
- }
- }
- impl<T: DeserializeOwned> Receiver<T> for UnixReceiver<T> {
- fn addr(&self) -> &BlockAddr {
- &self.addr
- }
- }
- /// An implementation of [Sender] which uses a Unix datagram socket to send messages.
- struct UnixSender {
- addr: BlockAddr,
- socket: FramedWrite<DatagramAdapter, MsgEncoder>,
- }
- impl UnixSender {
- #[allow(dead_code)]
- fn new(mut fs_path: PathBuf, addr: BlockAddr) -> Result<Self> {
- let socket = UnixDatagram::unbound()?;
- socket_path(&mut fs_path, &addr);
- socket.connect(fs_path)?;
- let socket = FramedWrite::new(DatagramAdapter::new(socket), MsgEncoder);
- Ok(Self { addr, socket })
- }
- }
- impl<T: Serialize> Sink<Msg<T>> for UnixSender {
- type Error = btlib::Error;
- fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- <tokio_util::codec::FramedWrite<DatagramAdapter, MsgEncoder> as futures::SinkExt<
- Msg<T>,
- >>::poll_ready_unpin(&mut self.socket, cx)
- }
- fn start_send(mut self: Pin<&mut Self>, item: Msg<T>) -> Result<()> {
- self.socket.start_send_unpin(item)
- }
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- <tokio_util::codec::FramedWrite<DatagramAdapter, MsgEncoder> as futures::SinkExt<
- Msg<T>,
- >>::poll_flush_unpin(&mut self.socket, cx)
- }
- fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- <tokio_util::codec::FramedWrite<DatagramAdapter, MsgEncoder> as futures::SinkExt<
- Msg<T>,
- >>::poll_close_unpin(&mut self.socket, cx)
- }
- }
- impl Sender for UnixSender {
- fn addr(&self) -> &BlockAddr {
- &self.addr
- }
- type SendFut<'a, T>
- = SendFut<'a, FramedWrite<DatagramAdapter, MsgEncoder>, Msg<T>>
- where T: 'a + Serialize + Send;
- fn send<'a, T: 'a + Serialize + Send>(&'a mut self, msg: Msg<T>) -> Self::SendFut<'a, T> {
- self.socket.send(msg)
- }
- type FinishFut = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
- fn finish(mut self) -> Self::FinishFut {
- Box::pin(async move {
- let fut: Close<'_, _, Msg<()>> = self.socket.close();
- fut.await
- })
- }
- }
- /// Causes the current function to return if the given `rx` has received a stop signal.
- macro_rules! check_stop {
- ($rx:expr) => {
- match $rx.try_recv() {
- Ok(_) => return,
- Err(err) => {
- if let TryRecvError::Closed = err {
- return;
- }
- }
- }
- };
- }
struct QuicRouter {
recv_addr: Arc<BlockAddr>,
resolver: Arc<CertResolver>,
@@ -728,7 +549,21 @@ mod private {
struct QuicReceiver<T> {
recv_addr: Arc<BlockAddr>,
stop_tx: broadcast::Sender<()>,
- stream: ReceiverStream<Result<Msg<T>>>,
+ stream: ReceiverStream<Result<MsgReceived<T>>>,
+ }
+ /// Causes the current function to return if the given `rx` has received a stop signal.
+ macro_rules! check_stop {
+ ($rx:expr) => {
+ match $rx.try_recv() {
+ Ok(_) => return,
+ Err(err) => {
+ if let TryRecvError::Closed = err {
+ return;
+ }
+ }
+ }
+ };
impl<T: DeserializeOwned + Send + 'static> QuicReceiver<T> {
@@ -754,7 +589,39 @@ mod private {
let conn_msg_tx = msg_tx.clone();
let mut conn_stop_rx = stop_rx.resubscribe();
+ let client_certs = match connection.peer_identity() {
+ Some(peer_certs) => {
+ match peer_certs.downcast::<Vec<rustls::Certificate>>() {
+ Ok(peer_certs) => peer_certs,
+ Err(err) => {
+ error!("failed to downcast peer certificate chain: {:?}", err);
+ continue;
+ }
+ }
+ }
+ None => {
+ error!("connection did not contain a peer identity");
+ continue;
+ }
+ };
tokio::spawn(async move {
+ let client_path = {
+ // There must be at least one certificate because the handshake was
+ // successful.
+ let first = client_certs.first().unwrap();
+ let (writecap, ..) =
+ match Writecap::from_cert_chain(first, &client_certs[1..]) {
+ Ok(pair) => pair,
+ Err(err) => {
+ error!(
+ "failed to create writecap from certificate chain: {err}"
+ );
+ return;
+ }
+ };
+ drop(client_certs);
+ Arc::new(writecap.bind_path())
+ };
let recv_stream = match connection.accept_uni().await {
Ok(recv_stream) => recv_stream,
Err(err) => {
@@ -769,7 +636,10 @@ mod private {
Some(result) => result,
None => return,
- if let Err(err) = conn_msg_tx.send(result).await {
+ if let Err(err) = conn_msg_tx
+ .send(result.map(|e| MsgReceived::new(client_path.clone(), e)))
+ .await
+ {
error!("error sending message to mpsc queue: {err}");
@@ -793,7 +663,7 @@ mod private {
impl<T: DeserializeOwned + Send + 'static> Stream for QuicReceiver<T> {
- type Item = Result<Msg<T>>;
+ type Item = Result<MsgReceived<T>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@@ -822,7 +692,7 @@ mod private {
client_config(addr.path.clone(), resolver)?,
// The ServerCertVerifier ensures we connect to the correct path.
let connection = connecting.await?;
let send_stream = connection.open_uni().await?;
@@ -871,11 +741,7 @@ mod tests {
use btlib::{crypto::ConcreteCreds, Epoch, Principal, Principaled};
use ctor::ctor;
- use std::{
- net::Ipv6Addr,
- sync::atomic::{AtomicU64, Ordering},
- time::Duration,
- };
+ use std::{net::Ipv6Addr, time::Duration};
fn setup_logging() {
@@ -900,13 +766,6 @@ mod tests {
static ref ROOT_PRINCIPAL: Principal = ROOT_CREDS.principal();
- fn block_addr<'a, I: Iterator<Item = S>, S: ToString>(components: I) -> BlockAddr {
- let components = components.map(|e| e.to_string()).collect();
- let path = BlockPath::new(ROOT_CREDS.principal(), components);
- let path = Arc::new(path);
- BlockAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), path)
- }
#[derive(Serialize, Deserialize)]
enum MsgError {
@@ -930,20 +789,27 @@ mod tests {
Write { offset: u64, buf: &'a [u8] },
- struct TestCase {
- instance_num: u64,
- }
+ struct TestCase;
impl TestCase {
fn new() -> TestCase {
- static INSTANCE_NUM: AtomicU64 = AtomicU64::new(0);
- let instance_num = INSTANCE_NUM.fetch_add(1, Ordering::SeqCst);
- Self { instance_num }
+ Self
- async fn endpoint(&self, inode: u64) -> (impl Sender, impl Receiver<BodyOwned>) {
- let addr = Arc::new(block_addr([self.instance_num, inode].iter()));
- let router = router(addr.clone(), Arc::new(NODE_CREDS.clone())).unwrap();
+ /// Returns a ([Sender], [Receiver]) pair for a process identified by the given integer.
+ async fn new_process(&self) -> (impl Sender, impl Receiver<BodyOwned>) {
+ let ip_addr = IpAddr::V6(Ipv6Addr::LOCALHOST);
+ let mut creds = ConcreteCreds::generate().unwrap();
+ let writecap = NODE_CREDS
+ .issue_writecap(
+ creds.principal(),
+ vec![],
+ Epoch::now() + Duration::from_secs(3600),
+ )
+ .unwrap();
+ let addr = Arc::new(BlockAddr::new(ip_addr, Arc::new(writecap.bind_path())));
+ creds.set_writecap(writecap);
+ let router = router(ip_addr, Arc::new(creds)).unwrap();
let receiver = router.receiver::<BodyOwned>().await.unwrap();
let sender = router.sender(addr).await.unwrap();
(sender, receiver)
@@ -953,12 +819,12 @@ mod tests {
async fn message_received_is_message_sent() {
let case = TestCase::new();
- let (mut sender, mut receiver) = case.endpoint(1).await;
+ let (mut sender, mut receiver) = case.new_process().await;
let actual = receiver.next().await.unwrap().unwrap();
- let matched = if let BodyOwned::Ping = actual.body {
+ let matched = if let BodyOwned::Ping = actual.msg.body {
} else {
@@ -966,15 +832,26 @@ mod tests {
+ #[tokio::test]
+ async fn message_received_from_path_is_correct() {
+ let case = TestCase::new();
+ let (mut sender, mut receiver) = case.new_process().await;
+ sender.send_with_rand_id(BodyRef::Ping).await.unwrap();
+ let actual = receiver.next().await.unwrap().unwrap();
+ assert_eq!(receiver.addr().path(), actual.from.as_ref());
+ }
async fn ping_pong() {
let case = TestCase::new();
- let (mut sender_one, mut receiver_one) = case.endpoint(1).await;
- let (mut sender_two, mut receiver_two) = case.endpoint(2).await;
+ let (mut sender_one, mut receiver_one) = case.new_process().await;
+ let (mut sender_two, mut receiver_two) = case.new_process().await;
tokio::spawn(async move {
- let msg = receiver_one.next().await.unwrap().unwrap();
- let reply_body = if let BodyOwned::Ping = msg.body {
+ let received = receiver_one.next().await.unwrap().unwrap();
+ let reply_body = if let BodyOwned::Ping = received.msg.body {
} else {
@@ -986,24 +863,25 @@ mod tests {
let reply = receiver_two.next().await.unwrap().unwrap();
- let matched = if let BodyOwned::Success = reply.body {
+ let matched = if let BodyOwned::Success = reply.msg.body {
} else {
- assert!(matched)
+ assert!(matched);
+ assert_eq!(receiver_two.addr().path(), reply.from.as_ref());
async fn read_write() {
let case = TestCase::new();
- let (mut sender_one, mut receiver_one) = case.endpoint(1).await;
- let (mut sender_two, mut receiver_two) = case.endpoint(2).await;
+ let (mut sender_one, mut receiver_one) = case.new_process().await;
+ let (mut sender_two, mut receiver_two) = case.new_process().await;
let handle = tokio::spawn(async move {
let data: [u8; 8] = [0, 1, 2, 3, 4, 5, 6, 7];
- let msg = receiver_one.next().await.unwrap().unwrap();
- let reply_body = if let BodyOwned::Read { offset, size } = msg.body {
+ let received = receiver_one.next().await.unwrap().unwrap();
+ let reply_body = if let BodyOwned::Read { offset, size } = received.msg.body {
let offset: usize = offset.try_into().unwrap();
let size: usize = size.try_into().unwrap();
let end: usize = offset + size;
@@ -1014,7 +892,7 @@ mod tests {
} else {
- let msg = Msg::new(msg.id, reply_body);
+ let msg = Msg::new(received.msg.id, reply_body);
let fut = assert_send::<'_, Result<()>>(sender_two.send(msg));
@@ -1026,7 +904,7 @@ mod tests {
let reply = receiver_two.next().await.unwrap().unwrap();
- if let BodyOwned::Write { offset, buf } = reply.body {
+ if let BodyOwned::Write { offset, buf } = reply.msg.body {
assert_eq!(2, offset);
assert_eq!([2, 3].as_slice(), buf.as_slice());
} else {