use crate::config::*;
use crate::errors::*;
use crate::packet::{LinkPacket, PayloadType};
use crate::telemetry::*;
use log::info;
use std::fmt::Debug;
use std::net::{Ipv4Addr, UdpSocket};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
pub type ReadFn<Connection> = Fn(&Connection) -> CommsResult<Vec<u8>> + Send + Sync + 'static;
pub type WriteFn<Connection> = Fn(&Connection, &[u8]) -> CommsResult<()> + Send + Sync + 'static;
#[derive(Clone)]
pub struct CommsControlBlock<Connection: Clone> {
pub read: Option<Arc<ReadFn<Connection>>>,
pub write: Vec<Arc<WriteFn<Connection>>>,
pub read_conn: Connection,
pub write_conn: Connection,
pub max_num_handlers: u16,
pub timeout: u64,
pub ip: Ipv4Addr,
pub downlink_ports: Option<Vec<u16>>,
}
impl<Connection: Clone + Debug> Debug for CommsControlBlock<Connection> {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
let read = if self.read.is_some() {
"Some(fn)"
} else {
"None"
};
let mut write = vec![];
if !self.write.is_empty() {
for _n in 0..self.write.len() {
write.push("Fn");
}
}
write!(
f,
"CommsControlBlock {{ read: {}, write: {:?}, read_conn: {:?}, write_conn: {:?},
max_num_handlers: {:?}, timeout: {:?}, ip: {:?}, downlink_ports: {:?} }}",
read,
write,
self.read_conn,
self.write_conn,
self.max_num_handlers,
self.timeout,
self.ip,
self.downlink_ports,
)
}
}
impl<Connection: Clone> CommsControlBlock<Connection> {
pub fn new(
read: Option<Arc<ReadFn<Connection>>>,
write: Vec<Arc<WriteFn<Connection>>>,
read_conn: Connection,
write_conn: Connection,
config: CommsConfig,
) -> CommsResult<Self> {
if write.is_empty() {
return Err(
CommsServiceError::ConfigError("No `write` function provided".to_owned()).into(),
);
}
if let Some(ports) = config.clone().downlink_ports {
if write.len() != ports.len() {
return Err(CommsServiceError::ConfigError(
"There must be a unique write function for each downlink port".to_owned(),
)
.into());
}
}
Ok(CommsControlBlock {
read,
write,
read_conn,
write_conn,
max_num_handlers: config.max_num_handlers.unwrap_or(DEFAULT_MAX_HANDLERS),
timeout: config.timeout.unwrap_or(DEFAULT_TIMEOUT),
ip: Ipv4Addr::from_str(&config.ip)?,
downlink_ports: config.downlink_ports,
})
}
}
pub struct CommsService;
impl CommsService {
pub fn start<Connection: Clone + Send + 'static, Packet: LinkPacket + Send + 'static>(
control: CommsControlBlock<Connection>,
telem: &Arc<Mutex<CommsTelemetry>>,
) -> CommsResult<()> {
if control.read.is_some() {
let telem_ref = telem.clone();
let control_ref = control.clone();
thread::spawn(move || read_thread::<Connection, Packet>(control_ref, &telem_ref));
}
if let Some(ports) = control.downlink_ports {
for (_, (port, write)) in ports.iter().zip(control.write.iter()).enumerate() {
let telem_ref = telem.clone();
let port_ref = *port;
let conn_ref = control.write_conn.clone();
let write_ref = write.clone();
let ip = control.ip;
thread::spawn(move || {
downlink_endpoint::<Connection, Packet>(
&telem_ref, port_ref, conn_ref, &write_ref, ip,
);
});
}
}
info!("Communication service started");
Ok(())
}
}
fn read_thread<Connection: Clone + Send + 'static, Packet: LinkPacket + Send + 'static>(
comms: CommsControlBlock<Connection>,
data: &Arc<Mutex<CommsTelemetry>>,
) {
let read = comms.read.unwrap();
let num_handlers: Arc<Mutex<u16>> = Arc::new(Mutex::new(0));
loop {
let bytes = match (read)(&comms.read_conn.clone()) {
Ok(bytes) => bytes,
Err(e) => {
log_error(&data, e.to_string()).unwrap();
continue;
}
};
let packet = match Packet::parse(&bytes) {
Ok(packet) => packet,
Err(e) => {
log_telemetry(&data, &TelemType::UpFailed).unwrap();
log_error(&data, CommsServiceError::HeaderParsing.to_string()).unwrap();
error!("Failed to parse packet header {}", e);
continue;
}
};
if !packet.validate() {
log_telemetry(&data, &TelemType::UpFailed).unwrap();
log_error(&data, CommsServiceError::InvalidChecksum.to_string()).unwrap();
error!("Packet checksum failed");
continue;
}
log_telemetry(&data, &TelemType::Up).unwrap();
info!("Packet successfully uplinked");
match packet.payload_type() {
PayloadType::Unknown(value) => {
log_error(
&data,
CommsServiceError::UnknownPayloadType(value).to_string(),
)
.unwrap();
error!("Unknown payload type encountered: {}", value);
}
PayloadType::UDP => {
let sat_ref = comms.ip;
let data_ref = data.clone();
thread::spawn(move || match handle_udp_passthrough(packet, sat_ref) {
Ok(_) => {
log_telemetry(&data_ref, &TelemType::Down).unwrap();
info!("UDP Packet successfully uplinked");
}
Err(e) => {
log_telemetry(&data_ref, &TelemType::DownFailed).unwrap();
log_error(&data_ref, e.to_string()).unwrap();
error!("UDP packet failed to uplink: {}", e.to_string());
}
});
}
PayloadType::GraphQL => {
if let Ok(mut num_handlers) = num_handlers.lock() {
if *num_handlers >= comms.max_num_handlers {
log_error(&data, CommsServiceError::NoAvailablePorts.to_string()).unwrap();
error!("No message handler ports available");
continue;
} else {
*num_handlers += 1;
}
}
let conn_ref = comms.write_conn.clone();
let write_ref = comms.write[0].clone();
let data_ref = data.clone();
let sat_ref = comms.ip;
let time_ref = comms.timeout;
let num_handlers_ref = num_handlers.clone();
thread::spawn(move || {
let res =
handle_graphql_request(conn_ref, &write_ref, packet, time_ref, sat_ref);
if let Ok(mut num_handlers) = num_handlers_ref.lock() {
*num_handlers -= 1;
}
match res {
Ok(_) => {
log_telemetry(&data_ref, &TelemType::Down).unwrap();
info!("GraphQL Packet successfully downlinked");
}
Err(e) => {
log_telemetry(&data_ref, &TelemType::DownFailed).unwrap();
log_error(&data_ref, e.to_string()).unwrap();
error!("GraphQL packet failed to downlink: {}", e.to_string());
}
}
});
}
}
}
}
#[allow(clippy::boxed_local)]
fn handle_graphql_request<Connection: Clone, Packet: LinkPacket>(
write_conn: Connection,
write: &Arc<WriteFn<Connection>>,
message: Box<Packet>,
timeout: u64,
sat_ip: Ipv4Addr,
) -> Result<(), String> {
let payload = message.payload().to_vec();
let client = reqwest::Client::builder()
.timeout(Duration::from_millis(timeout))
.build()
.map_err(|e| e.to_string())?;
let mut res = client
.post(&format!("http://{}:{}", sat_ip, message.destination()))
.body(payload)
.send()
.map_err(|e| e.to_string())?;
let size = res.content_length().unwrap_or(0) as usize;
let buf = res.text().unwrap_or_else(|_| "".to_owned());
let buf = buf.as_bytes();
let packet = Packet::build(message.command_id(), PayloadType::GraphQL, 0, &buf[0..size])
.and_then(|packet| packet.to_bytes())
.map_err(|e| e.to_string())?;
write(&write_conn.clone(), &packet).map_err(|e| e.to_string())
}
#[allow(clippy::boxed_local)]
fn handle_udp_passthrough<Packet: LinkPacket>(
message: Box<Packet>,
sat_ip: Ipv4Addr,
) -> Result<(), String> {
let socket = UdpSocket::bind((sat_ip, 0)).map_err(|e| e.to_string())?;
socket
.send_to(&message.payload(), (sat_ip, message.destination()))
.map_err(|e| e.to_string())
.map(|_c| ())
}
fn downlink_endpoint<Connection: Clone, Packet: LinkPacket>(
data: &Arc<Mutex<CommsTelemetry>>,
port: u16,
write_conn: Connection,
write: &Arc<WriteFn<Connection>>,
sat_ip: Ipv4Addr,
) {
let socket = match UdpSocket::bind((sat_ip, port)) {
Ok(sock) => sock,
Err(e) => return log_error(&data, e.to_string()).unwrap(),
};
loop {
let mut buf = vec![0; Packet::max_size()];
let (size, _address) = match socket.recv_from(&mut buf) {
Ok(tuple) => tuple,
Err(e) => {
log_error(&data, e.to_string()).unwrap();
continue;
}
};
let packet = match Packet::build(0, PayloadType::UDP, 0, &buf[0..size])
.and_then(|packet| packet.to_bytes())
{
Ok(packet) => packet,
Err(e) => {
log_error(&data, e.to_string()).unwrap();
continue;
}
};
match write(&write_conn.clone(), &packet) {
Ok(_) => {
log_telemetry(&data, &TelemType::Down).unwrap();
info!("Packet successfully downlinked");
}
Err(e) => {
log_telemetry(&data, &TelemType::DownFailed).unwrap();
log_error(&data, e.to_string()).unwrap();
error!("Packet failed to downlink");
}
};
}
}