use crate::config::*;
use crate::errors::*;
use crate::telemetry::*;
use byteorder::{BigEndian, ByteOrder};
use log::info;
use pnet::packet::udp::{ipv4_checksum, UdpPacket};
use pnet::packet::Packet;
use std::net::{Ipv4Addr, UdpSocket};
use std::ops::Range;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
const HEADER_LEN: usize = 8;
const CHKSUM_RNG: Range<usize> = 6..8;
const MAX_SIZE: usize = 65499;
pub type ReadFn<T> = Fn(&T) -> CommsResult<Vec<u8>> + Send + Sync + 'static;
pub type WriteFn<T> = Fn(&T, &[u8]) -> CommsResult<()> + Send + Sync + 'static;
#[derive(Clone)]
pub struct CommsControlBlock<T: Clone> {
pub read: Option<Arc<ReadFn<T>>>,
pub write: Vec<Arc<WriteFn<T>>>,
pub read_conn: T,
pub write_conn: T,
pub handler_port_min: u16,
pub handler_port_max: u16,
pub timeout: u64,
pub ground_ip: Ipv4Addr,
pub satellite_ip: Ipv4Addr,
pub downlink_ports: Option<Vec<u16>>,
pub ground_port: Option<u16>,
}
impl<T: Clone> CommsControlBlock<T> {
pub fn new(
read: Option<Arc<ReadFn<T>>>,
write: Vec<Arc<WriteFn<T>>>,
read_conn: T,
write_conn: T,
config: CommsConfig,
) -> Self {
CommsControlBlock {
read,
write,
read_conn,
write_conn,
handler_port_min: config.handler_port_min.unwrap_or(DEFAULT_HANDLER_START),
handler_port_max: config.handler_port_max.unwrap_or(DEFAULT_HANDLER_END),
timeout: config.timeout.unwrap_or(DEFAULT_TIMEOUT),
ground_ip: Ipv4Addr::from_str(
&config.ground_ip.unwrap_or(DEFAULT_GROUND_IP.to_string()),
)
.unwrap(),
satellite_ip: Ipv4Addr::from_str(
&config
.satellite_ip
.unwrap_or(DEFAULT_SATELLITE_IP.to_string()),
)
.unwrap(),
downlink_ports: config.downlink_ports,
ground_port: config.ground_port,
}
}
}
pub struct CommsService;
impl CommsService {
pub fn start<T: Clone + Send + 'static>(
control: CommsControlBlock<T>,
telem: &Arc<Mutex<CommsTelemetry>>,
) -> CommsResult<()> {
if control.write.is_empty() {
if control.read.is_some() {
let telem_ref = telem.clone();
let control_ref = control.clone();
thread::spawn(move || read_thread(control_ref, &telem_ref));
}
} else {
return Err(CommsServiceError::MissingWriteMethod.into());
}
if let Some(ports) = control.downlink_ports {
if control.write.len() == ports.len() {
for (_, (port, write)) in ports.iter().zip(control.write.iter()).enumerate() {
let telem_ref = telem.clone();
let port_ref = *port;
let conn_ref = control.write_conn.clone();
let write_ref = write.clone();
let ground_ip = control.ground_ip;
let sat_ip = control.satellite_ip;
let ground_port = control
.ground_port
.ok_or(CommsServiceError::MissingGroundPort)?;
thread::spawn(move || {
downlink_endpoint(
&telem_ref,
port_ref,
conn_ref,
&write_ref,
ground_ip,
sat_ip,
ground_port,
);
});
}
} else {
return Err(CommsServiceError::ParameterLengthMismatch.into());
}
}
info!("Communication service started");
Ok(())
}
}
fn read_thread<T: Clone + Send + 'static>(
comms: CommsControlBlock<T>,
data: &Arc<Mutex<CommsTelemetry>>,
) {
let mut port = comms.handler_port_min;
let read = comms.read.unwrap();
loop {
let bytes = match (read)(&comms.read_conn.clone()) {
Ok(bytes) => bytes,
Err(e) => {
log_error(&data, e.to_string()).unwrap();
continue;
}
};
let packet = match UdpPacket::owned(bytes) {
Some(packet) => packet,
None => {
log_telemetry(&data, &TelemType::UpFailed).unwrap();
log_error(&data, CommsServiceError::HeaderParsing.to_string()).unwrap();
error!("Failed to parse packet header");
continue;
}
};
if packet.get_checksum() != ipv4_checksum(&packet, &comms.ground_ip, &comms.satellite_ip) {
log_telemetry(&data, &TelemType::UpFailed).unwrap();
log_error(&data, CommsServiceError::InvalidChecksum.to_string()).unwrap();
error!("Packet checksum failed");
continue;
}
log_telemetry(&data, &TelemType::Up).unwrap();
info!("UDP Packet successfully uplinked");
let socket = match socket_manager(
comms.satellite_ip,
&mut port,
comms.handler_port_min,
comms.handler_port_max,
) {
Some(sock) => sock,
None => {
log_error(&data, CommsServiceError::NoAvailablePorts.to_string()).unwrap();
error!("No message handler ports available");
continue;
}
};
let conn_ref = comms.write_conn.clone();
let write_ref = comms.write[0].clone();
let data_ref = data.clone();
let sat_ref = comms.satellite_ip;
let ground_ref = comms.ground_ip;
let time_ref = comms.timeout;
thread::spawn(move || {
handle_message(
&socket, &data_ref, conn_ref, &write_ref, &packet, time_ref, sat_ref, ground_ref,
);
});
}
}
fn socket_manager(ip: Ipv4Addr, port: &mut u16, min: u16, max: u16) -> Option<UdpSocket> {
let mut socket = None;
let last = *port;
while socket.is_none() {
if *port < max {
*port += 1;
} else {
*port = min;
}
socket = UdpSocket::bind((ip, *port)).ok();
if last == *port {
break;
}
}
socket
}
fn handle_message<T: Clone>(
socket: &UdpSocket,
data: &Arc<Mutex<CommsTelemetry>>,
write_conn: T,
write: &Arc<WriteFn<T>>,
message: &UdpPacket,
timeout: u64,
sat_ip: Ipv4Addr,
ground_ip: Ipv4Addr,
) {
let mut buf = [0; MAX_SIZE];
match socket.set_read_timeout(Some(Duration::from_millis(timeout))) {
Ok(_) => (),
Err(e) => return log_error(&data, e.to_string()).unwrap(),
};
match socket.send_to(message.payload(), (sat_ip, message.get_destination())) {
Ok(_) => (),
Err(e) => return log_error(&data, e.to_string()).unwrap(),
};
info!("UDP Packet sent to port {}", message.get_destination());
let (size, _) = match socket.recv_from(&mut buf) {
Ok(tuple) => tuple,
Err(e) => return log_error(&data, e.to_string()).unwrap(),
};
let packet = match build_packet(
&buf[0..size],
message.get_destination(),
message.get_source(),
(size + HEADER_LEN) as u16,
sat_ip,
ground_ip,
) {
Ok(packet) => packet,
Err(e) => return log_error(&data, e.to_string()).unwrap(),
};
match write(&write_conn.clone(), packet.as_slice()) {
Ok(_) => {
log_telemetry(&data, &TelemType::Down).unwrap();
info!("UDP Packet successfully downlinked");
}
Err(e) => {
log_telemetry(&data, &TelemType::DownFailed).unwrap();
log_error(&data, e.to_string()).unwrap();
error!("UDP packet failed to downlink");
}
};
}
fn downlink_endpoint<T: Clone>(
data: &Arc<Mutex<CommsTelemetry>>,
port: u16,
write_conn: T,
write: &Arc<WriteFn<T>>,
ground_ip: Ipv4Addr,
sat_ip: Ipv4Addr,
ground_port: u16,
) {
let socket = match UdpSocket::bind((sat_ip, port)) {
Ok(sock) => sock,
Err(e) => return log_error(&data, e.to_string()).unwrap(),
};
loop {
let mut buf = [0; MAX_SIZE];
let (size, address) = match socket.recv_from(&mut buf) {
Ok(tuple) => tuple,
Err(e) => {
log_error(&data, e.to_string()).unwrap();
continue;
}
};
let packet = match build_packet(
&buf[0..size],
address.port(),
ground_port,
(size + HEADER_LEN) as u16,
sat_ip,
ground_ip,
) {
Ok(packet) => packet,
Err(e) => {
log_error(&data, e.to_string()).unwrap();
continue;
}
};
match write(&write_conn.clone(), packet.as_slice()) {
Ok(_) => {
log_telemetry(&data, &TelemType::Down).unwrap();
info!("UDP Packet successfully downlinked");
}
Err(e) => {
log_telemetry(&data, &TelemType::DownFailed).unwrap();
log_error(&data, e.to_string()).unwrap();
error!("UDP Packet failed to downlink");
}
};
}
}
fn build_packet(
payload: &[u8],
source: u16,
dest: u16,
length: u16,
sat: Ipv4Addr,
ground: Ipv4Addr,
) -> CommsResult<Vec<u8>> {
let mut header = [0; HEADER_LEN];
let fields = [source, dest, length, 0];
BigEndian::write_u16_into(&fields, &mut header);
let mut packet = header.to_vec();
packet.append(&mut payload.to_vec());
let packet_without_checksum = match UdpPacket::owned(packet.clone()) {
Some(bytes) => bytes,
None => return Err(CommsServiceError::HeaderParsing.into()),
};
let mut checksum = [0; 2];
BigEndian::write_u16(
&mut checksum,
ipv4_checksum(&packet_without_checksum, &sat, &ground),
);
packet.splice(CHKSUM_RNG, checksum.iter().cloned());
Ok(packet)
}