diff --git a/.idea/RoadSign.iml b/.idea/RoadSign.iml index 5e764c4..758ceaa 100644 --- a/.idea/RoadSign.iml +++ b/.idea/RoadSign.iml @@ -2,7 +2,10 @@ - + + + + diff --git a/Cargo.lock b/Cargo.lock index 36b0e29..70acf64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -133,9 +133,9 @@ checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" [[package]] name = "bitflags" -version = "1.3.2" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" [[package]] name = "bitflags" @@ -1452,7 +1452,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" dependencies = [ - "bitflags 1.3.2", + "bitflags 1.2.1", ] [[package]] @@ -1601,7 +1601,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88073939a61e5b7680558e6be56b419e208420c2adb92be54921fa6b72283f1a" dependencies = [ "base64 0.13.1", - "bitflags 1.3.2", + "bitflags 1.2.1", "serde", ] @@ -1666,11 +1666,11 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "security-framework" -version = "2.9.2" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" +checksum = "23a2ac85147a3a11d77ecf1bc7166ec0b92febfa4461c37944e180f319ece467" dependencies = [ - "bitflags 1.3.2", + "bitflags 1.2.1", "core-foundation", "core-foundation-sys", "libc", @@ -1878,7 +1878,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" dependencies = [ - "bitflags 1.3.2", + "bitflags 1.2.1", "core-foundation", "system-configuration-sys", ] diff --git a/regions/index.toml b/regions/index.toml index db75202..9d87b77 100644 --- a/regions/index.toml +++ b/regions/index.toml @@ -7,3 +7,8 @@ paths = ["/"] [[locations.destinations]] id = "static" uri = "files://regions?index=index.html" + +[[applications]] +id = "script" +exe = "./script.sh" +workdir = "regions" diff --git a/regions/script.sh b/regions/script.sh new file mode 100755 index 0000000..eca748e --- /dev/null +++ b/regions/script.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "Good morning!" diff --git a/src/main.rs b/src/main.rs index b5859be..a634ae5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,8 +2,7 @@ pub mod auth; mod config; mod proxies; mod sideload; - -use std::collections::VecDeque; +pub mod warden; use lazy_static::lazy_static; use poem::{listener::TcpListener, EndpointExt, Route, Server}; @@ -12,18 +11,10 @@ use proxies::RoadInstance; use tokio::sync::Mutex; use tracing::{error, info, Level}; -use crate::proxies::{metrics::RoadMetrics, route}; +use crate::proxies::route; lazy_static! { - static ref ROAD: Mutex = Mutex::new(RoadInstance { - regions: vec![], - metrics: RoadMetrics { - requests_count: 0, - failures_count: 0, - recent_successes: VecDeque::new(), - recent_errors: VecDeque::new(), - } - }); + static ref ROAD: Mutex = Mutex::new(RoadInstance::new()); } #[tokio::main] @@ -84,6 +75,16 @@ async fn main() -> Result<(), std::io::Error> { }), ); + // Process manager + { + let mut app = ROAD.lock().await; + { + let reg = app.regions.clone(); + app.warden.scan(reg); + } + app.warden.start().await; + } + tokio::try_join!(proxies_server, sideload_server)?; Ok(()) diff --git a/src/proxies/config.rs b/src/proxies/config.rs index ae4f2d5..108e6f8 100644 --- a/src/proxies/config.rs +++ b/src/proxies/config.rs @@ -5,12 +5,15 @@ use queryst::parse; use serde::{Deserialize, Serialize}; use serde_json::json; +use crate::warden::Application; + use super::responder::StaticResponderConfig; #[derive(Debug, Object, Clone, Serialize, Deserialize)] pub struct Region { pub id: String, pub locations: Vec, + pub applications: Vec, } #[derive(Debug, Object, Clone, Serialize, Deserialize)] diff --git a/src/proxies/metrics.rs b/src/proxies/metrics.rs index 4ecd101..5d06a0e 100644 --- a/src/proxies/metrics.rs +++ b/src/proxies/metrics.rs @@ -50,6 +50,15 @@ pub struct RoadMetrics { const MAX_TRACE_COUNT: usize = 10; impl RoadMetrics { + pub fn new() -> RoadMetrics { + RoadMetrics { + requests_count: 0, + failures_count: 0, + recent_successes: VecDeque::new(), + recent_errors: VecDeque::new(), + } + } + pub fn get_success_rate(&self) -> f64 { if self.requests_count > 0 { (self.requests_count - self.failures_count) as f64 / self.requests_count as f64 diff --git a/src/proxies/mod.rs b/src/proxies/mod.rs index 2433bb8..c62bfe0 100644 --- a/src/proxies/mod.rs +++ b/src/proxies/mod.rs @@ -3,6 +3,8 @@ use poem::http::{HeaderMap, Uri}; use regex::Regex; use wildmatch::WildMatch; +use crate::warden::WardenInstance; + use self::{ config::{Location, Region}, metrics::RoadMetrics, @@ -19,9 +21,20 @@ pub mod route; pub struct RoadInstance { pub regions: Vec, pub metrics: RoadMetrics, + pub warden: WardenInstance, } impl RoadInstance { + pub fn new() -> RoadInstance { + RoadInstance { + regions: vec![], + warden: WardenInstance { + applications: vec![], + }, + metrics: RoadMetrics::new(), + } + } + pub fn filter( &self, uri: &Uri, diff --git a/src/warden/mod.rs b/src/warden/mod.rs new file mode 100644 index 0000000..e7265b4 --- /dev/null +++ b/src/warden/mod.rs @@ -0,0 +1,73 @@ +pub mod runner; + +use std::collections::HashMap; + +use futures_util::lock::Mutex; +use lazy_static::lazy_static; +use poem_openapi::Object; +use serde::{Deserialize, Serialize}; +use tracing::{debug, warn}; + +use crate::proxies::config::Region; + +use self::runner::AppInstance; + +lazy_static! { + static ref INSTANCES: Mutex> = Mutex::new(HashMap::new()); +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WardenInstance { + pub applications: Vec, +} + +impl WardenInstance { + pub fn new() -> WardenInstance { + WardenInstance { + applications: vec![], + } + } + + pub fn scan(&mut self, regions: Vec) { + self.applications = regions + .iter() + .flat_map(|item| item.applications.clone()) + .collect::>(); + debug!( + applications = format!("{:?}", self.applications), + "Warden scan accomplished." + ) + } + + pub async fn start(&self) { + for item in self.applications.iter() { + let mut instance = AppInstance::new(); + match instance.start(item.clone()).await { + Ok(_) => { + debug!(id = item.id, "Warden successfully created instance for"); + INSTANCES.lock().await.insert(item.clone().id, instance); + } + Err(err) => warn!( + id = item.id, + err = format!("{:?}", err), + "Warden failed to create an instance for" + ), + }; + } + } +} + +impl Default for WardenInstance { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug, Object, Clone, Serialize, Deserialize)] +pub struct Application { + pub id: String, + pub exe: String, + pub args: Option>, + pub env: Option>, + pub workdir: String, +} diff --git a/src/warden/runner.rs b/src/warden/runner.rs new file mode 100644 index 0000000..080388f --- /dev/null +++ b/src/warden/runner.rs @@ -0,0 +1,104 @@ +use std::{borrow::BorrowMut, collections::HashMap, io}; + +use super::Application; +use futures_util::lock::Mutex; +use lazy_static::lazy_static; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + process::{Child, Command}, +}; + +lazy_static! { + static ref STDOUT: Mutex> = Mutex::new(HashMap::new()); + static ref STDERR: Mutex> = Mutex::new(HashMap::new()); +} + +#[derive(Debug)] +pub struct AppInstance { + pub app: Option, + pub program: Option, +} + +impl AppInstance { + pub fn new() -> AppInstance { + AppInstance { + app: None, + program: None, + } + } + + pub async fn start(&mut self, app: Application) -> io::Result<()> { + return match Command::new(app.exe.clone()) + .args(app.args.clone().unwrap_or_default()) + .envs(app.env.clone().unwrap_or_default()) + .current_dir(app.workdir.clone()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + { + Ok(mut child) => { + let stderr_reader = BufReader::new(child.stderr.take().unwrap()); + let stdout_reader = BufReader::new(child.stdout.take().unwrap()); + + tokio::spawn(read_stream_and_capture(stderr_reader, app.id.clone(), true)); + tokio::spawn(read_stream_and_capture( + stdout_reader, + app.id.clone(), + false, + )); + + self.app = Some(app.clone()); + self.program = Some(child); + + Ok(()) + } + Err(err) => Err(err), + }; + } + + pub async fn stop(&mut self) -> Result<(), io::Error> { + if let Some(child) = self.program.borrow_mut() { + return child.kill().await; + } + Ok(()) + } + + pub async fn get_stdout(&self) -> Option { + if let Some(app) = self.app.clone() { + STDOUT.lock().await.get(&app.id).cloned() + } else { + None + } + } + + pub async fn get_stderr(&self) -> Option { + if let Some(app) = self.app.clone() { + STDERR.lock().await.get(&app.id).cloned() + } else { + None + } + } +} + +impl Default for AppInstance { + fn default() -> Self { + Self::new() + } +} + +async fn read_stream_and_capture(reader: R, id: String, is_err: bool) -> io::Result<()> +where + R: tokio::io::AsyncBufRead + Unpin, +{ + let mut lines = reader.lines(); + while let Some(line) = lines.next_line().await? { + if !is_err { + if let Some(out) = STDOUT.lock().await.get_mut(&id) { + out.push_str(&line); + } + } else if let Some(out) = STDERR.lock().await.get_mut(&id) { + out.push_str(&line); + } + } + Ok(()) +}