use crate::app_entry::*;
use crate::error::*;
use crate::monitor::*;
use chrono::Utc;
use failure::format_err;
use fs_extra;
use log::*;
use nix::sys::signal;
use nix::unistd::Pid;
use std::ffi::OsStr;
use std::fs;
use std::io::Read;
use std::os::unix;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use tempfile::TempDir;
use toml;
pub static K_APPS_DIR: &str = "/home/system/kubos/apps";
pub static DEFAULT_CONFIG: &str = "/etc/kubos-config.toml";
#[derive(Clone, Debug)]
pub struct AppRegistry {
#[doc(hidden)]
pub entries: Arc<Mutex<Vec<AppRegistryEntry>>>,
pub monitoring: Arc<Mutex<Vec<MonitorEntry>>>,
pub apps_dir: String,
}
impl AppRegistry {
pub fn new_from_dir(apps_dir: &str) -> Result<AppRegistry, AppError> {
let active_dir = PathBuf::from(format!("{}/active", apps_dir));
if !active_dir.exists() {
fs::create_dir_all(&active_dir)?;
}
let apps_path = Path::new(&apps_dir);
let apps_dir = apps_path
.canonicalize()
.map_err(|e| AppError::RegistryError {
err: format!("Failed to get absolute apps_dir: {}", e),
})?;
let apps_dir = apps_dir.to_str().ok_or_else(|| AppError::RegistryError {
err: format!("Failed to create absolute apps_dir path: {:?}", apps_path),
})?;
let registry = AppRegistry {
entries: Arc::new(Mutex::new(Vec::new())),
monitoring: Arc::new(Mutex::new(Vec::new())),
apps_dir: String::from(apps_dir),
};
registry
.entries
.lock()
.map_err(|err| AppError::RegistryError {
err: format!("Couldn't get entries mutex: {:?}", err),
})?
.extend(registry.discover_apps()?);
Ok(registry)
}
pub fn new() -> Result<AppRegistry, AppError> {
Self::new_from_dir(K_APPS_DIR)
}
fn discover_apps(&self) -> Result<Vec<AppRegistryEntry>, AppError> {
let mut reg_entries: Vec<AppRegistryEntry> = Vec::new();
for entry in fs::read_dir(&self.apps_dir)? {
if let Ok(entry) = entry {
if let Ok(file_type) = entry.file_type() {
if file_type.is_dir() && entry.file_name().to_str() != Some("active") {
reg_entries.extend(self.discover_versions(entry.path())?);
}
}
}
}
Ok(reg_entries)
}
fn discover_versions(&self, app_dir: PathBuf) -> Result<Vec<AppRegistryEntry>, AppError> {
let mut reg_entries: Vec<AppRegistryEntry> = Vec::new();
for version in fs::read_dir(app_dir)? {
if version.is_err() {
continue;
}
let version = version.unwrap();
match version
.file_type()
.and_then(|file_type| Ok(file_type.is_dir()))
{
Ok(true) => {
if let Ok(entry) = AppRegistryEntry::from_dir(&version.path()) {
if entry.active_version {
self.set_active(&entry.app.name, &version.path().to_string_lossy())?;
}
reg_entries.push(entry);
} else {
let _ = fs::remove_dir_all(version.path());
}
}
_ => {
let _ = fs::remove_dir_all(version.path());
}
}
}
Ok(reg_entries)
}
fn set_active(&self, name: &str, app_dir: &str) -> Result<(), AppError> {
let active_symlink = PathBuf::from(format!("{}/active/{}", self.apps_dir, name));
if active_symlink.exists() {
if let Err(err) = fs::remove_file(active_symlink.clone()) {
return Err(AppError::RegisterError {
err: format!(
"Couldn't remove symlink {}: {:?}",
active_symlink.display(),
err
),
});
}
}
if let Err(err) = unix::fs::symlink(app_dir, active_symlink.clone()) {
let active_dir = PathBuf::from(format!("{}/active", self.apps_dir));
if !active_dir.exists() {
fs::create_dir_all(&active_dir)?;
if unix::fs::symlink(app_dir, active_symlink.clone()).is_ok() {
return Ok(());
}
}
return Err(AppError::RegisterError {
err: format!(
"Couldn't symlink {} to {}: {:?}",
active_symlink.display(),
app_dir,
err
),
});
}
Ok(())
}
pub fn register(&self, path: &str) -> Result<AppRegistryEntry, AppError> {
let app_path = Path::new(path);
if !app_path.exists() {
return Err(AppError::RegisterError {
err: format!("{} does not exist", path),
});
} else if !app_path.is_dir() {
return match app_path.extension().and_then(OsStr::to_str) {
Some("tgz") => extract_archive(&self, path),
Some(extension) => Err(AppError::RegisterError {
err: format!("Provided file with extension {} is neither a directory nor a tgz archive file", extension)
}),
None => Err(AppError::RegisterError {
err: String::from("Provided file is neither a directory nor a tgz archive file"),
})
};
}
let mut data = String::new();
fs::File::open(app_path.join("manifest.toml"))
.and_then(|mut fp| fp.read_to_string(&mut data))
.map_err(|error| AppError::RegisterError {
err: format!("Unable to load manifest.toml: {}", error),
})?;
let metadata: AppMetadata = match toml::from_str(&data) {
Ok(val) => val,
Err(error) => {
return Err(AppError::ParseError {
entity: "manifest.toml".to_owned(),
err: error.to_string(),
});
}
};
let app_name = metadata.name.clone();
let app_exec = if let Some(path) = metadata.executable.clone() {
path
} else {
metadata.name.clone()
};
if !app_path.join(app_exec.clone()).exists() {
return Err(AppError::RegisterError {
err: format!("Application file {} not found in given path", app_exec),
});
}
let config = if let Some(path) = metadata.config {
path
} else {
DEFAULT_CONFIG.to_owned()
};
let mut entries = self.entries.lock().map_err(|err| AppError::RegisterError {
err: format!("Couldn't get entries mutex: {:?}", err),
})?;
let old_active = if Path::new(&format!("{}/{}", self.apps_dir, app_name)).exists() {
entries
.iter()
.position(|ref e| e.active_version && e.app.name == app_name)
} else {
None
};
let app_dir_str = format!("{}/{}/{}", self.apps_dir, app_name, metadata.version);
let app_dir = Path::new(&app_dir_str);
if app_dir.exists() {
return Err(AppError::RegisterError {
err: format!(
"App {} version {} already exists",
app_name, metadata.version
),
});
} else {
fs::create_dir_all(app_dir)?;
}
let files: Vec<PathBuf> = fs::read_dir(app_path)?
.filter_map(|file| {
if let Ok(entry) = file {
Some(entry.path())
} else {
None
}
})
.collect();
fs_extra::copy_items(&files, app_dir, &fs_extra::dir::CopyOptions::new()).map_err(
|error| {
let _ = fs::remove_dir_all(app_dir);
let _ = fs::remove_dir(format!("{}/{}", self.apps_dir, app_name));
AppError::RegisterError {
err: format!("Error copying files into registry dir: {}", error),
}
},
)?;
let reg_entry = AppRegistryEntry {
app: App {
name: app_name.clone(),
executable: format!("{}/{}", app_dir_str, app_exec),
version: metadata.version,
author: metadata.author,
config,
},
active_version: true,
};
entries.push(reg_entry);
entries[entries.len() - 1].save().or_else(|err| {
let _ = fs::remove_dir_all(app_dir);
let _ = fs::remove_dir(format!("{}/{}", self.apps_dir, app_name));
Err(err)
})?;
if let Some(index) = old_active {
entries[index].active_version = false;
entries[index].save()?;
}
self.set_active(&app_name, &app_dir_str)?;
Ok(entries[entries.len() - 1].clone())
}
pub fn uninstall(&self, app_name: &str, version: &str) -> Result<bool, AppError> {
let mut errors: Vec<String> = vec![];
let app_dir = format!("{}/{}/{}", self.apps_dir, app_name, version);
if let Err(error) = fs::remove_dir_all(app_dir) {
errors.push(format!("Failed to remove app directory: {}", error));
}
if fs::remove_dir(format!("{}/{}", self.apps_dir, app_name)).is_ok() {
if let Err(error) = fs::remove_file(format!("{}/active/{}", self.apps_dir, app_name)) {
errors.push(format!(
"{}. Failed to remove active symlink for {}: {}",
error, app_name, error
));
}
}
let mut entries = self
.entries
.lock()
.map_err(|err| AppError::UninstallError {
err: format!("Couldn't get entries mutex: {:?}", err),
})?;
match entries
.iter()
.position(|ref e| e.app.name == app_name && e.app.version == version)
{
Some(index) => {
entries.remove(index);
}
None => {
errors.push(format!(
"{} version {} not found in registry",
app_name, version
));
}
}
if let Ok(Some(entry)) = find_running(&self.monitoring, app_name) {
if entry.version == version {
if let Some(pid) = entry.pid {
if let Err(err) = uninstall_kill(pid) {
errors.push(format!("Failed to kill {}: {:?}", app_name, err));
}
}
}
}
if let Err(new_error) = remove_entry(&self.monitoring, app_name, version) {
errors.push(new_error.to_string());
}
if errors.is_empty() {
Ok(true)
} else {
let err: String = errors.join(". ");
Err(AppError::UninstallError { err })
}
}
pub fn uninstall_all(&self, app_name: &str) -> Result<bool, AppError> {
let mut errors = vec![];
let app_dir = format!("{}/{}", self.apps_dir, app_name);
if let Err(error) = fs::remove_dir_all(app_dir) {
errors.push(format!("Failed to remove app directory: {}", error));
}
if let Err(error) = fs::remove_file(format!("{}/active/{}", self.apps_dir, app_name)) {
errors.push(format!(
"Failed to remove active symlink for {}: {}",
app_name, error
));
}
match self.entries.lock() {
Ok(mut entries) => {
entries.retain(|entry| entry.app.name != app_name);
}
Err(err) => errors.push(format!("Couldn't get entries mutex: {:?}", err)),
}
if let Ok(Some(entry)) = find_running(&self.monitoring, app_name) {
if let Some(pid) = entry.pid {
if let Err(err) = uninstall_kill(pid) {
errors.push(format!("Failed to kill {}: {:?}", app_name, err));
}
}
}
if let Err(new_error) = remove_entries(&self.monitoring, app_name) {
errors.push(new_error.to_string());
}
if errors.is_empty() {
Ok(true)
} else {
let err = errors.join(". ");
Err(AppError::UninstallError { err })
}
}
pub fn set_version(&self, app_name: &str, version: &str) -> Result<(), AppError> {
let mut entries = self.entries.lock().map_err(|err| AppError::RegistryError {
err: format!("Couldn't get entries mutex: {:?}", err),
})?;
let curr_active = entries
.iter()
.position(|ref e| e.active_version && e.app.name == app_name);
if let Some(index) = curr_active {
if entries[index].app.version == version {
return Ok(());
}
}
let new_active = entries
.iter()
.position(|ref e| e.app.name == app_name && e.app.version == version)
.ok_or(AppError::RegistryError {
err: format!("App {} version {} not found in registry", app_name, version),
})?;
entries[new_active].active_version = true;
entries[new_active]
.save()
.map_err(|error| AppError::RegistryError {
err: format!("Failed to update new active version entry: {:?}", error),
})?;
if let Some(index) = curr_active {
entries[index].active_version = false;
entries[index]
.save()
.map_err(|error| AppError::RegistryError {
err: format!("Failed to update old active version entry: {:?}", error),
})?;
}
self.set_active(
&app_name,
&format!("{}/{}/{}", self.apps_dir, app_name, version),
)?;
Ok(())
}
pub fn start_app(
&self,
app_name: &str,
config: Option<String>,
args: Option<Vec<String>>,
) -> Result<Option<i32>, AppError> {
let app = {
let entries = self.entries.lock().map_err(|err| AppError::StartError {
err: format!("Couldn't get entries mutex: {:?}", err),
})?;
match entries
.iter()
.find(|ref e| e.active_version && e.app.name == app_name)
{
Some(entry) => entry.app.clone(),
None => {
return Err(AppError::StartError {
err: format!("No active version found for app {}", app_name),
});
}
}
};
let app_path = PathBuf::from(&app.executable);
if !app_path.exists() {
let msg = match self.uninstall(&app.name, &app.version) {
Ok(_) => format!(
"{} does not exist. {} version {} automatically uninstalled",
app.executable, app.name, app.version
),
Err(error) => format!("{} does not exist. {}", app.executable, error),
};
error!("{}", msg);
return Err(AppError::StartError { err: msg });
}
if let Err(err) = app_path
.parent()
.ok_or_else(|| format_err!("Failed to get parent dir"))
.and_then(|parent_dir| {
::std::env::set_current_dir(parent_dir).map_err(|err| err.into())
})
{
warn!("Failed to set cwd before executing {}: {:?}", app_name, err);
}
let running_status = find_running(&self.monitoring, app_name);
match running_status {
Ok(None) => {}
Ok(Some(_)) => {
return Err(AppError::StartError {
err: format!("Instance of {} already running", app_name),
});
}
Err(err) => {
error!("Crashing service: {:?}", err);
panic!("{:?}", err);
}
}
let mut cmd = Command::new(app_path);
let config_path = match config {
Some(path) => path,
None => app.config.clone(),
};
cmd.arg("-c").arg(config_path.clone());
if let Some(add_args) = args.clone() {
cmd.args(&add_args);
}
let mut child = cmd.spawn().map_err(|err| {
error!("Failed to spawn app {}: {:?}", app_name, err);
AppError::StartError {
err: format!("Failed to spawn app: {:?}", err),
}
})?;
let start_time = Utc::now();
info!(
"Starting {}. Config: {:?}, Args: {:?}",
app_name, config_path, args
);
let entry = MonitorEntry {
start_time,
end_time: None,
name: app.name.clone(),
version: app.version.clone(),
running: true,
pid: Some(child.id() as i32),
last_rc: None,
last_signal: None,
args,
config: config_path,
};
if let Err(error) = start_entry(&self.monitoring, &entry) {
error!("Crashing service: {:?}", error);
panic!("{:?}", error);
}
thread::sleep(Duration::from_millis(300));
match child.try_wait() {
Ok(Some(status)) => {
finish_entry(&self.monitoring, app_name, &app.version, status)?;
if !status.success() {
Err(AppError::StartError {
err: format!("App returned {}", status),
})
} else {
Ok(None)
}
}
Ok(None) => {
let name = app_name.to_owned();
let pid = child.id() as i32;
let registry = self.monitoring.clone();
thread::spawn(move || {
let result = monitor_app(registry, child, &name, &app.version);
if let Err(error) = result {
error!("{:?}", error);
}
});
Ok(Some(pid))
}
Err(err) => Err(AppError::StartError {
err: format!(
"Started app, but failed to fetch status information: {:?}",
err
),
}),
}
}
pub fn kill_app(&self, name: &str, signal: Option<i32>) -> Result<(), AppError> {
let app = find_running(&self.monitoring, name)?.ok_or(AppError::KillError {
err: "No matching monitoring entry found".to_owned(),
})?;
let pid = app.pid.ok_or(AppError::KillError {
err: "No active PID found in registry".to_owned(),
})?;
let pid = Pid::from_raw(pid);
let sig = signal::Signal::from_c_int(signal.unwrap_or(15) as i32)
.unwrap_or(signal::Signal::SIGTERM);
signal::kill(pid, sig).map_err(|err| AppError::KillError {
err: err.to_string(),
})
}
}
fn uninstall_kill(pid: i32) -> Result<(), nix::Error> {
let pid = Pid::from_raw(pid);
signal::kill(pid, Some(signal::Signal::SIGTERM))?;
thread::spawn(move || {
thread::sleep(Duration::from_secs(2));
let _ = signal::kill(pid, Some(signal::Signal::SIGKILL));
});
Ok(())
}
fn extract_archive(registry: &AppRegistry, path: &str) -> Result<AppRegistryEntry, AppError> {
let tmp_dir = TempDir::new().map_err(|error| AppError::RegisterError {
err: format!(
"Error creating temporary directory to expand archive: {}",
error
),
})?;
let mut command = if PathBuf::from("/usr/bin/tar").exists() {
Command::new("/usr/bin/tar")
} else if PathBuf::from("/bin/tar").exists() {
Command::new("/bin/tar")
} else {
return Err(AppError::RegisterError {
err: String::from("Error expanding archive: tar command not found"),
});
};
let output = command
.arg("-zxf")
.arg(path)
.arg("--directory")
.arg(tmp_dir.path())
.output()
.map_err(|error| AppError::RegisterError {
err: format!("Error expanding archive: {}", error),
})?;
if output.status.success() {
if Path::new(&tmp_dir.path().join("manifest.toml")).exists() {
let tmp_dir_path =
OsStr::to_str(tmp_dir.path().as_os_str()).ok_or(AppError::RegisterError {
err: String::from("Error converting temp dir path to UTF8"),
})?;
registry.register(tmp_dir_path)
} else {
Err(AppError::RegisterError {
err: String::from("Manifest file manifest.toml not found in root of archive. When you create the archive, do so in the application directory, with a command like: tar -czf archive.tgz *")
})
}
} else {
Err(AppError::RegisterError {
err: format!(
"Non-successful status when expanding archive: {:?}",
output.status.code()
),
})
}
}