start working on encryption

This commit is contained in:
Ferdinand Schober
2024-09-02 15:35:52 +02:00
committed by Ferdinand Schober
parent 0d074e19f1
commit 79bc64e56e
9 changed files with 1263 additions and 92 deletions

View File

@@ -159,6 +159,7 @@ async fn handle_capture_event(
/* released capture */
State::Receiving => ProtoEvent::Leave(0),
};
log::error!("SENDING: {event:?} -> {addr:?}");
sender_tx.send((event, addr)).expect("sender closed");
};

View File

@@ -1,8 +1,19 @@
use local_channel::mpsc::{Receiver, Sender};
use std::{io, net::SocketAddr};
use std::{cell::RefCell, collections::HashMap, io, net::SocketAddr, rc::Rc, sync::Arc};
use webrtc_dtls::{
config::{ClientAuthType, Config, ExtendedMasterSecretType},
conn::DTLSConn,
listener::listen,
};
use webrtc_util::{conn::Listener, Conn};
use thiserror::Error;
use tokio::{net::UdpSocket, task::JoinHandle};
use tokio::{
net::UdpSocket,
task::{spawn_local, JoinHandle},
};
use crate::crypto;
use super::Server;
use lan_mouse_proto::{ProtoEvent, ProtocolError};
@@ -14,64 +25,94 @@ pub(crate) async fn new(
) -> io::Result<JoinHandle<()>> {
// bind the udp socket
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), server.port.get());
let mut socket = UdpSocket::bind(listen_addr).await?;
Ok(tokio::task::spawn_local(async move {
let mut sender_rx = udp_send_rx;
Ok(spawn_local(async move {
let sender_rx = Rc::new(RefCell::new(udp_send_rx));
loop {
let udp_receiver = udp_receiver(&socket, &udp_recv_tx);
let udp_sender = udp_sender(&socket, &mut sender_rx);
let udp_receiver = spawn_local(listen_dtls(listen_addr, udp_recv_tx.clone()));
let udp_sender = spawn_local(udp_sender(sender_rx.clone()));
log::info!("starting sender + receiver");
tokio::select! {
_ = udp_receiver => break, /* channel closed */
e = udp_receiver => panic!("{e:?}"), /* channel closed */
_ = udp_sender => break, /* channel closed */
_ = server.notifies.port_changed.notified() => update_port(&server, &mut socket).await,
_ = server.cancelled() => break, /* cancellation requested */
}
}
}))
}
async fn update_port(server: &Server, socket: &mut UdpSocket) {
let new_port = server.port.get();
let current_port = socket.local_addr().expect("socket not bound").port();
// if port is the same, we dont need to change it
if current_port == new_port {
return;
async fn listen_dtls(
listen_addr: SocketAddr,
udp_recv_tx: Sender<Result<(ProtoEvent, SocketAddr), NetworkError>>,
) -> Result<(), NetworkError> {
let server_cert = crypto::load_key_and_certificate("alice.pem".into(), "alice.pub.pem".into())?;
let mut cert_pool = rustls::RootCertStore::empty();
let certs = crypto::load_certificate("alice.pub.pem".into())?;
for cert in certs.into_iter() {
cert_pool.add(cert)?;
}
// bind new socket
let listen_addr = SocketAddr::new("0.0.0.0".parse().unwrap(), new_port);
let new_socket = UdpSocket::bind(listen_addr).await;
let err = match new_socket {
Ok(new_socket) => {
*socket = new_socket;
None
}
Err(e) => Some(e.to_string()),
let cfg = Config {
certificates: vec![server_cert],
extended_master_secret: ExtendedMasterSecretType::Require,
client_auth: ClientAuthType::RequireAndVerifyClientCert,
client_cas: cert_pool,
..Default::default()
};
// notify frontend of the actual port
let port = socket.local_addr().expect("socket not bound").port();
server.notify_port_changed(port, err);
}
async fn udp_receiver(
socket: &UdpSocket,
receiver_tx: &Sender<Result<(ProtoEvent, SocketAddr), NetworkError>>,
) {
let listener = Arc::new(listen(listen_addr, cfg).await?);
loop {
let event = receive_event(socket).await;
receiver_tx.send(event).expect("channel closed");
while let Ok((conn, addr)) = listener.accept().await {
let udp_recv_tx = udp_recv_tx.clone();
spawn_local(async move {
loop {
let mut buf = [0u8; lan_mouse_proto::MAX_EVENT_SIZE];
let event: Result<_, NetworkError> = match conn.recv(&mut buf).await {
Ok(_len) => match ProtoEvent::try_from(buf) {
Ok(e) => Ok((e, addr)),
Err(e) => Err(e.into()),
},
Err(e) => Err(e.into()),
};
udp_recv_tx.send(event).expect("channel closed");
}
});
}
}
}
async fn udp_sender(socket: &UdpSocket, rx: &mut Receiver<(ProtoEvent, SocketAddr)>) {
async fn udp_sender(rx: Rc<RefCell<Receiver<(ProtoEvent, SocketAddr)>>>) {
let mut connection_pool: HashMap<SocketAddr, DTLSConn> = HashMap::new();
loop {
let (event, addr) = rx.recv().await.expect("channel closed");
if let Err(e) = send_event(socket, event, addr) {
log::warn!("udp send failed: {e}");
log::error!("waiting for event to send ...");
let (event, addr) = rx.borrow_mut().recv().await.expect("channel closed");
log::error!("{:20} ------>->->-> {addr}", event.to_string());
if !connection_pool.contains_key(&addr) {
let socket = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap());
socket.connect(addr).await.unwrap();
let certificate =
crypto::load_key_and_certificate("bob.pem".into(), "bob.pub.pem".into()).unwrap();
let mut cert_pool = rustls::RootCertStore::empty();
let certs = crypto::load_certificate("alice.pub.pem".into()).unwrap();
for cert in certs.into_iter() {
cert_pool.add(cert.to_owned()).unwrap();
}
let config = Config {
certificates: vec![certificate],
extended_master_secret: ExtendedMasterSecretType::Require,
roots_cas: cert_pool,
server_name: "webrtc.rs".to_owned(),
..Default::default()
};
log::error!("connecting to {addr}");
let conn = DTLSConn::new(socket, config, true, None).await.unwrap();
log::error!("connected {addr}!");
connection_pool.insert(addr, conn);
};
let conn = connection_pool.get(&addr).unwrap();
log::error!("{:20} ------>->->-> {addr}", event.to_string());
let (data, len): ([u8; lan_mouse_proto::MAX_EVENT_SIZE], usize) = event.into();
// When udp blocks, we dont want to block the event loop.
// Dropping events is better than potentially crashing the input capture.
conn.send_to(&data[..len], addr).await.unwrap();
}
}
@@ -81,19 +122,12 @@ pub(crate) enum NetworkError {
Protocol(#[from] ProtocolError),
#[error("network error: `{0}`")]
Io(#[from] io::Error),
}
async fn receive_event(socket: &UdpSocket) -> Result<(ProtoEvent, SocketAddr), NetworkError> {
let mut buf = [0u8; lan_mouse_proto::MAX_EVENT_SIZE];
let (_len, src) = socket.recv_from(&mut buf).await?;
let event = ProtoEvent::try_from(buf)?;
Ok((event, src))
}
fn send_event(sock: &UdpSocket, e: ProtoEvent, addr: SocketAddr) -> Result<usize, NetworkError> {
log::trace!("{:20} ------>->->-> {addr}", e.to_string());
let (data, len): ([u8; lan_mouse_proto::MAX_EVENT_SIZE], usize) = e.into();
// When udp blocks, we dont want to block the event loop.
// Dropping events is better than potentially crashing the input capture.
Ok(sock.try_send_to(&data[..len], addr)?)
#[error(transparent)]
Crypt(#[from] crypto::Error),
#[error(transparent)]
Rustls(#[from] rustls::Error),
#[error(transparent)]
WebrtcDtls(#[from] webrtc_dtls::Error),
#[error(transparent)]
WebrtcUtil(#[from] webrtc_util::Error),
}