Warden process manager

This commit is contained in:
LittleSheep 2024-01-15 23:22:53 +08:00
parent 4c08d78bed
commit ead748a508
10 changed files with 235 additions and 21 deletions

View File

@ -2,7 +2,10 @@
<module type="WEB_MODULE" version="4"> <module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" /> <component name="Go" enabled="true" />
<component name="NewModuleRootManager"> <component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" /> <content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/target" />
</content>
<orderEntry type="inheritedJdk" /> <orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>

16
Cargo.lock generated
View File

@ -133,9 +133,9 @@ checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "1.3.2" version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]] [[package]]
name = "bitflags" name = "bitflags"
@ -1452,7 +1452,7 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa"
dependencies = [ dependencies = [
"bitflags 1.3.2", "bitflags 1.2.1",
] ]
[[package]] [[package]]
@ -1601,7 +1601,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88073939a61e5b7680558e6be56b419e208420c2adb92be54921fa6b72283f1a" checksum = "88073939a61e5b7680558e6be56b419e208420c2adb92be54921fa6b72283f1a"
dependencies = [ dependencies = [
"base64 0.13.1", "base64 0.13.1",
"bitflags 1.3.2", "bitflags 1.2.1",
"serde", "serde",
] ]
@ -1666,11 +1666,11 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]] [[package]]
name = "security-framework" name = "security-framework"
version = "2.9.2" version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" checksum = "23a2ac85147a3a11d77ecf1bc7166ec0b92febfa4461c37944e180f319ece467"
dependencies = [ dependencies = [
"bitflags 1.3.2", "bitflags 1.2.1",
"core-foundation", "core-foundation",
"core-foundation-sys", "core-foundation-sys",
"libc", "libc",
@ -1878,7 +1878,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7"
dependencies = [ dependencies = [
"bitflags 1.3.2", "bitflags 1.2.1",
"core-foundation", "core-foundation",
"system-configuration-sys", "system-configuration-sys",
] ]

View File

@ -7,3 +7,8 @@ paths = ["/"]
[[locations.destinations]] [[locations.destinations]]
id = "static" id = "static"
uri = "files://regions?index=index.html" uri = "files://regions?index=index.html"
[[applications]]
id = "script"
exe = "./script.sh"
workdir = "regions"

3
regions/script.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/bash
echo "Good morning!"

View File

@ -2,8 +2,7 @@ pub mod auth;
mod config; mod config;
mod proxies; mod proxies;
mod sideload; mod sideload;
pub mod warden;
use std::collections::VecDeque;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use poem::{listener::TcpListener, EndpointExt, Route, Server}; use poem::{listener::TcpListener, EndpointExt, Route, Server};
@ -12,18 +11,10 @@ use proxies::RoadInstance;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::{error, info, Level}; use tracing::{error, info, Level};
use crate::proxies::{metrics::RoadMetrics, route}; use crate::proxies::route;
lazy_static! { lazy_static! {
static ref ROAD: Mutex<RoadInstance> = Mutex::new(RoadInstance { static ref ROAD: Mutex<RoadInstance> = Mutex::new(RoadInstance::new());
regions: vec![],
metrics: RoadMetrics {
requests_count: 0,
failures_count: 0,
recent_successes: VecDeque::new(),
recent_errors: VecDeque::new(),
}
});
} }
#[tokio::main] #[tokio::main]
@ -84,6 +75,16 @@ async fn main() -> Result<(), std::io::Error> {
}), }),
); );
// Process manager
{
let mut app = ROAD.lock().await;
{
let reg = app.regions.clone();
app.warden.scan(reg);
}
app.warden.start().await;
}
tokio::try_join!(proxies_server, sideload_server)?; tokio::try_join!(proxies_server, sideload_server)?;
Ok(()) Ok(())

View File

@ -5,12 +5,15 @@ use queryst::parse;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use crate::warden::Application;
use super::responder::StaticResponderConfig; use super::responder::StaticResponderConfig;
#[derive(Debug, Object, Clone, Serialize, Deserialize)] #[derive(Debug, Object, Clone, Serialize, Deserialize)]
pub struct Region { pub struct Region {
pub id: String, pub id: String,
pub locations: Vec<Location>, pub locations: Vec<Location>,
pub applications: Vec<Application>,
} }
#[derive(Debug, Object, Clone, Serialize, Deserialize)] #[derive(Debug, Object, Clone, Serialize, Deserialize)]

View File

@ -50,6 +50,15 @@ pub struct RoadMetrics {
const MAX_TRACE_COUNT: usize = 10; const MAX_TRACE_COUNT: usize = 10;
impl RoadMetrics { impl RoadMetrics {
pub fn new() -> RoadMetrics {
RoadMetrics {
requests_count: 0,
failures_count: 0,
recent_successes: VecDeque::new(),
recent_errors: VecDeque::new(),
}
}
pub fn get_success_rate(&self) -> f64 { pub fn get_success_rate(&self) -> f64 {
if self.requests_count > 0 { if self.requests_count > 0 {
(self.requests_count - self.failures_count) as f64 / self.requests_count as f64 (self.requests_count - self.failures_count) as f64 / self.requests_count as f64

View File

@ -3,6 +3,8 @@ use poem::http::{HeaderMap, Uri};
use regex::Regex; use regex::Regex;
use wildmatch::WildMatch; use wildmatch::WildMatch;
use crate::warden::WardenInstance;
use self::{ use self::{
config::{Location, Region}, config::{Location, Region},
metrics::RoadMetrics, metrics::RoadMetrics,
@ -19,9 +21,20 @@ pub mod route;
pub struct RoadInstance { pub struct RoadInstance {
pub regions: Vec<Region>, pub regions: Vec<Region>,
pub metrics: RoadMetrics, pub metrics: RoadMetrics,
pub warden: WardenInstance,
} }
impl RoadInstance { impl RoadInstance {
pub fn new() -> RoadInstance {
RoadInstance {
regions: vec![],
warden: WardenInstance {
applications: vec![],
},
metrics: RoadMetrics::new(),
}
}
pub fn filter( pub fn filter(
&self, &self,
uri: &Uri, uri: &Uri,

73
src/warden/mod.rs Normal file
View File

@ -0,0 +1,73 @@
pub mod runner;
use std::collections::HashMap;
use futures_util::lock::Mutex;
use lazy_static::lazy_static;
use poem_openapi::Object;
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use crate::proxies::config::Region;
use self::runner::AppInstance;
lazy_static! {
static ref INSTANCES: Mutex<HashMap<String, AppInstance>> = Mutex::new(HashMap::new());
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WardenInstance {
pub applications: Vec<Application>,
}
impl WardenInstance {
pub fn new() -> WardenInstance {
WardenInstance {
applications: vec![],
}
}
pub fn scan(&mut self, regions: Vec<Region>) {
self.applications = regions
.iter()
.flat_map(|item| item.applications.clone())
.collect::<Vec<Application>>();
debug!(
applications = format!("{:?}", self.applications),
"Warden scan accomplished."
)
}
pub async fn start(&self) {
for item in self.applications.iter() {
let mut instance = AppInstance::new();
match instance.start(item.clone()).await {
Ok(_) => {
debug!(id = item.id, "Warden successfully created instance for");
INSTANCES.lock().await.insert(item.clone().id, instance);
}
Err(err) => warn!(
id = item.id,
err = format!("{:?}", err),
"Warden failed to create an instance for"
),
};
}
}
}
impl Default for WardenInstance {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Object, Clone, Serialize, Deserialize)]
pub struct Application {
pub id: String,
pub exe: String,
pub args: Option<Vec<String>>,
pub env: Option<HashMap<String, String>>,
pub workdir: String,
}

104
src/warden/runner.rs Normal file
View File

@ -0,0 +1,104 @@
use std::{borrow::BorrowMut, collections::HashMap, io};
use super::Application;
use futures_util::lock::Mutex;
use lazy_static::lazy_static;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, Command},
};
lazy_static! {
static ref STDOUT: Mutex<HashMap<String, String>> = Mutex::new(HashMap::new());
static ref STDERR: Mutex<HashMap<String, String>> = Mutex::new(HashMap::new());
}
#[derive(Debug)]
pub struct AppInstance {
pub app: Option<Application>,
pub program: Option<Child>,
}
impl AppInstance {
pub fn new() -> AppInstance {
AppInstance {
app: None,
program: None,
}
}
pub async fn start(&mut self, app: Application) -> io::Result<()> {
return match Command::new(app.exe.clone())
.args(app.args.clone().unwrap_or_default())
.envs(app.env.clone().unwrap_or_default())
.current_dir(app.workdir.clone())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(mut child) => {
let stderr_reader = BufReader::new(child.stderr.take().unwrap());
let stdout_reader = BufReader::new(child.stdout.take().unwrap());
tokio::spawn(read_stream_and_capture(stderr_reader, app.id.clone(), true));
tokio::spawn(read_stream_and_capture(
stdout_reader,
app.id.clone(),
false,
));
self.app = Some(app.clone());
self.program = Some(child);
Ok(())
}
Err(err) => Err(err),
};
}
pub async fn stop(&mut self) -> Result<(), io::Error> {
if let Some(child) = self.program.borrow_mut() {
return child.kill().await;
}
Ok(())
}
pub async fn get_stdout(&self) -> Option<String> {
if let Some(app) = self.app.clone() {
STDOUT.lock().await.get(&app.id).cloned()
} else {
None
}
}
pub async fn get_stderr(&self) -> Option<String> {
if let Some(app) = self.app.clone() {
STDERR.lock().await.get(&app.id).cloned()
} else {
None
}
}
}
impl Default for AppInstance {
fn default() -> Self {
Self::new()
}
}
async fn read_stream_and_capture<R>(reader: R, id: String, is_err: bool) -> io::Result<()>
where
R: tokio::io::AsyncBufRead + Unpin,
{
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
if !is_err {
if let Some(out) = STDOUT.lock().await.get_mut(&id) {
out.push_str(&line);
}
} else if let Some(out) = STDERR.lock().await.get_mut(&id) {
out.push_str(&line);
}
}
Ok(())
}