Compare commits

17 Commits

Author SHA1 Message Date
LittleSheep
0b38c4a470 Expand config search area 2024-02-14 15:06:51 +08:00
LittleSheep
515f086f19 Compress 2024-02-14 14:51:00 +08:00
LittleSheep
d75ac2999b 🗑️ Remove test data 2024-02-14 14:31:29 +08:00
LittleSheep
7eee10c4ff WebSocket Support 2024-02-14 13:53:43 +08:00
LittleSheep
e40fe6049f 🐛 Fix http proxy 2024-02-13 22:56:22 +08:00
LittleSheep
ed9434b85a 💚 Fix dockerfile missing build-base 2024-02-13 21:03:33 +08:00
LittleSheep
804108a209 💚 Fix dockerfile missing openssl during build 2024-02-13 20:51:39 +08:00
LittleSheep
46736c12b9 🔨 Update dockerfile 2024-02-13 20:48:40 +08:00
LittleSheep
1a562fbee8 🔨 Add rust version as a pre-release version 2024-02-13 20:44:52 +08:00
7796ee3554 🔀 Merge pull request '♻️ 使用 Actix RS 重构' (#8) from refactor/actix-rs into refactor/rust
Reviewed-on: https://code.smartsheep.studio/Goatworks/RoadSign/pulls/8
2024-02-13 12:39:08 +00:00
LittleSheep
12add73ecb Multiple listeners 2024-02-13 20:32:13 +08:00
LittleSheep
3fbe1db1ef TLS 2024-02-13 18:27:01 +08:00
LittleSheep
e27023c130 More detailed error 2024-02-13 01:42:03 +08:00
LittleSheep
2478a05c89 More details trace 2024-02-13 01:20:27 +08:00
LittleSheep
cb8eab6c1b 🛂 Secured sideload 2024-02-13 01:04:43 +08:00
LittleSheep
ae3894bea6 Sideload 2024-02-13 00:34:54 +08:00
LittleSheep
b7d4a54d62 ♻️ Migrated to actix rs 2024-02-13 00:01:39 +08:00
31 changed files with 1343 additions and 1702 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

View File

@@ -2,27 +2,27 @@ name: release-nightly
on:
push:
branches: [ master ]
branches: [ refactor/rust ]
jobs:
build-docker:
runs-on: edge
build-image:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
uses: docker/setup-buildx-action@v3
- name: Login to Docker Hub
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_REGISTRY_USERNAME }}
password: ${{ secrets.DOCKER_REGISTRY_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v4
uses: docker/build-push-action@v5
with:
context: .
file: ./Dockerfile
push: true
tags: xsheep2010/roadsign:nightly
file: ./Dockerfile
tags: xsheep2010/roadsign:sigma

2
.gitignore vendored
View File

@@ -1,4 +1,6 @@
/config
/certs
/test/data
/letsencrypt
# Added by cargo

6
.idea/RoadSign.iml generated
View File

@@ -1,5 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="FacetManager">
<facet type="Python" name="Python facet">
<configuration sdkName="Python 3.9" />
</facet>
</component>
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
@@ -8,5 +13,6 @@
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Python 3.9 interpreter library" level="application" />
</component>
</module>

1843
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,24 +6,18 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
config = { version = "0.13.4", features = ["toml"] }
futures-util = "0.3.30"
http = "1.0.0"
hyper-util = { version = "0.1.2", features = ["full"] }
actix-files = "0.6.5"
actix-proxy = "0.2.0"
actix-web = { version = "4.5.1", features = ["rustls-0_22"] }
actix-web-httpauth = "0.8.1"
awc = { version = "3.4.0", features = ["tls-rustls-0_22"] }
config = { version = "0.14.0", features = ["toml"] }
lazy_static = "1.4.0"
mime = "0.3.17"
percent-encoding = "2.3.1"
poem = { version = "2.0.0", features = [
"tokio-metrics",
"websocket",
"static-files",
"reqwest",
] }
poem-openapi = { version = "4.0.0" }
queryst = "3.0.0"
rand = "0.8.5"
regex = "1.10.2"
reqwest = { git = "https://github.com/seanmonstar/reqwest.git", branch = "hyper-v1", version = "0.11.23" }
serde = "1.0.195"
serde_json = "1.0.111"
tokio = { version = "1.35.1", features = [
@@ -32,8 +26,13 @@ tokio = { version = "1.35.1", features = [
"time",
"full",
] }
tokio-tungstenite = "0.21.0"
toml = "0.8.8"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
wildmatch = "2.3.0"
derive_more = "0.99.17"
rustls = "0.22.2"
rustls-pemfile = "2.0.0"
futures = "0.3.30"
actix-web-actors = "4.3.0"
actix = "0.13.3"

View File

@@ -1,21 +1,13 @@
# Building Backend
FROM golang:alpine as roadsign-server
FROM rust:alpine as roadsign-server
RUN apk add nodejs npm
RUN apk add libressl-dev build-base
WORKDIR /source
COPY . .
WORKDIR /source/pkg/sideload/view
RUN npm install
RUN npm run build
WORKDIR /source
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -buildvcs -o /dist ./pkg/cmd/server/main.go
# Runtime
FROM golang:alpine
COPY --from=roadsign-server /dist /roadsign/server
ENV RUSTFLAGS="-C target-feature=-crt-static"
RUN cargo build --release
EXPOSE 81
CMD ["/roadsign/server"]
CMD ["/source/target/release/roadsign"]

View File

@@ -74,9 +74,9 @@ rds cli with this command.
```shell
rds connect <id> <url> <password>
# ID will allow you find this server in after commands.
# URL is to your roadsign server sideload api.
# Password is your roadsign server credential.
# ID will allow you find this server.py.rs in after commands.
# URL is to your roadsign server.py.rs sideload api.
# Password is your roadsign server.py.rs credential.
# ======================================================================
# !WARNING! All these things will storage in your $HOME/.roadsignrc.yaml
# ======================================================================
@@ -85,8 +85,8 @@ rds connect <id> <url> <password>
Then, sync your local config to remote.
```shell
rds sync <server id> <site id> <config file>
# Server ID is your server added by last command.
rds sync <server.py.rs id> <site id> <config file>
# Server ID is your server.py.rs added by last command.
# Site ID is your new site id or old site id if you need update it.
# Config File is your local config file path.
```

View File

@@ -1,7 +1,17 @@
regions = "./regions"
secret = "aEXcED5xJ3"
[listen]
proxies = "0.0.0.0:80"
proxies_tls = "0.0.0.0:443"
sideload = "0.0.0.0:81"
[sideload]
bind_addr = "0.0.0.0:81"
[[proxies.bind]]
addr = "0.0.0.0:80"
tls = false
[[proxies.bind]]
addr = "0.0.0.0:443"
tls = false
[[certificates]]
domain = "localhost"
certs = "certs/fullchain.pem"
key = "certs/privkey.pem"

View File

@@ -5,10 +5,17 @@ id = "root"
hosts = ["localhost"]
paths = ["/"]
[[locations.destinations]]
id = "static"
uri = "files://regions?index=index.html"
id = "websocket"
uri = "http://localhost:8765"
# [[locations.destinations]]
# id = "hypertext"
# uri = "https://example.com"
# [[locations.destinations]]
# id = "static"
# uri = "files://regions?index=index.html"
[[applications]]
id = "script"
exe = "./script.sh"
workdir = "regions"
# [[applications]]
# id = "script"
# exe = "./script.sh"
# workdir = "regions"

View File

@@ -1,3 +1,3 @@
#!/bin/bash
echo "Good morning!"
echo "Good morning!" > ./kokodayo.txt

View File

@@ -1,50 +0,0 @@
use http::StatusCode;
use poem::{
web::headers::{self, authorization::Basic, HeaderMapExt},
Endpoint, Error, Middleware, Request, Response, Result,
};
pub struct BasicAuth {
pub username: String,
pub password: String,
}
impl<E: Endpoint> Middleware<E> for BasicAuth {
type Output = BasicAuthEndpoint<E>;
fn transform(&self, ep: E) -> Self::Output {
BasicAuthEndpoint {
ep,
username: self.username.clone(),
password: self.password.clone(),
}
}
}
pub struct BasicAuthEndpoint<E> {
ep: E,
username: String,
password: String,
}
#[poem::async_trait]
impl<E: Endpoint> Endpoint for BasicAuthEndpoint<E> {
type Output = E::Output;
async fn call(&self, req: Request) -> Result<Self::Output> {
if let Some(auth) = req.headers().typed_get::<headers::Authorization<Basic>>() {
if auth.0.username() == self.username && auth.0.password() == self.password {
return self.ep.call(req).await;
}
}
Err(Error::from_response(
Response::builder()
.header(
"WWW-Authenticate",
"Basic realm=\"RoadSig\", charset=\"UTF-8\"",
)
.status(StatusCode::UNAUTHORIZED)
.finish(),
))
}
}

View File

@@ -3,6 +3,7 @@ use config::Config;
pub fn load_settings() -> Config {
Config::builder()
.add_source(config::File::with_name("Settings"))
.add_source(config::File::with_name("/Settings"))
.add_source(config::Environment::with_prefix("ROADSIGN"))
.build()
.unwrap()

View File

@@ -7,5 +7,5 @@ use crate::config::loader::load_settings;
pub mod loader;
lazy_static! {
pub static ref C: RwLock<Config> = RwLock::new(load_settings());
pub static ref CFG: RwLock<Config> = RwLock::new(load_settings());
}

View File

@@ -1,28 +1,28 @@
pub mod auth;
extern crate core;
mod config;
mod proxies;
mod sideload;
pub mod warden;
mod warden;
mod server;
pub mod tls;
use std::error;
use lazy_static::lazy_static;
use poem::{listener::TcpListener, EndpointExt, Route, Server};
use poem_openapi::OpenApiService;
use proxies::RoadInstance;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tracing::{error, info, Level};
use crate::proxies::route;
use crate::proxies::server::build_proxies;
use crate::sideload::server::build_sideload;
lazy_static! {
static ref ROAD: Mutex<RoadInstance> = Mutex::new(RoadInstance::new());
}
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
async fn main() -> Result<(), Box<dyn error::Error>> {
// Setting up logging
if std::env::var_os("RUST_LOG").is_none() {
std::env::set_var("RUST_LOG", "poem=debug");
}
tracing_subscriber::fmt()
.with_max_level(Level::DEBUG)
.init();
@@ -30,11 +30,10 @@ async fn main() -> Result<(), std::io::Error> {
// Prepare all the stuff
info!("Loading proxy regions...");
match proxies::loader::scan_regions(
config::C
config::CFG
.read()
.await
.get_string("regions")
.unwrap_or("./regions".to_string()),
.get_string("regions")?
) {
Err(_) => error!("Loading proxy regions... failed"),
Ok((regions, count)) => {
@@ -43,37 +42,15 @@ async fn main() -> Result<(), std::io::Error> {
}
};
let mut server_set = JoinSet::new();
// Proxies
let proxies_server = Server::new(TcpListener::bind(
config::C
.read()
.await
.get_string("listen.proxies")
.unwrap_or("0.0.0.0:80".to_string()),
))
.run(route::handle);
for server in build_proxies().await? {
server_set.spawn(server);
}
// Sideload
let sideload = OpenApiService::new(sideload::SideloadApi, "Sideload API", "1.0")
.server("http://localhost:3000/cgi");
let sideload_server = Server::new(TcpListener::bind(
config::C
.read()
.await
.get_string("listen.sideload")
.unwrap_or("0.0.0.0:81".to_string()),
))
.run(
Route::new().nest("/cgi", sideload).with(auth::BasicAuth {
username: "RoadSign".to_string(),
password: config::C
.read()
.await
.get_string("secret")
.unwrap_or("password".to_string()),
}),
);
server_set.spawn(build_sideload().await?);
// Process manager
{
@@ -85,7 +62,8 @@ async fn main() -> Result<(), std::io::Error> {
app.warden.start().await;
}
tokio::try_join!(proxies_server, sideload_server)?;
// Wait for web servers
server_set.join_next().await;
Ok(())
}

View File

@@ -1,52 +0,0 @@
use std::fmt::Write;
pub struct DirectoryTemplate<'a> {
pub path: &'a str,
pub files: Vec<FileRef>,
}
impl<'a> DirectoryTemplate<'a> {
pub fn render(&self) -> String {
let mut s = format!(
r#"
<html>
<head>
<title>Index of {}</title>
</head>
<body>
<h1>Index of /{}</h1>
<ul>"#,
self.path, self.path
);
for file in &self.files {
if file.is_dir {
let _ = write!(
s,
r#"<li><a href="{}">{}/</a></li>"#,
file.url, file.filename
);
} else {
let _ = write!(
s,
r#"<li><a href="{}">{}</a></li>"#,
file.url, file.filename
);
}
}
s.push_str(
r#"</ul>
</body>
</html>"#,
);
s
}
}
pub struct FileRef {
pub url: String,
pub filename: String,
pub is_dir: bool,
}

View File

@@ -1,6 +1,5 @@
use std::collections::HashMap;
use poem_openapi::Object;
use queryst::parse;
use serde::{Deserialize, Serialize};
use serde_json::json;
@@ -9,14 +8,14 @@ use crate::warden::Application;
use super::responder::StaticResponderConfig;
#[derive(Debug, Object, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Region {
pub id: String,
pub locations: Vec<Location>,
pub applications: Vec<Application>,
pub applications: Option<Vec<Application>>,
}
#[derive(Debug, Object, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Location {
pub id: String,
pub hosts: Vec<String>,
@@ -27,7 +26,7 @@ pub struct Location {
pub destinations: Vec<Destination>,
}
#[derive(Debug, Object, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Destination {
pub id: String,
pub uri: String,
@@ -64,26 +63,17 @@ impl Destination {
}
pub fn get_host(&self) -> &str {
(self
self
.uri
.as_str()
.splitn(2, "://")
.collect::<Vec<_>>()
.get(1)
.unwrap_or(&""))
.unwrap_or(&"")
.splitn(2, '?')
.collect::<Vec<_>>()[0]
}
pub fn get_websocket_uri(&self) -> Result<String, ()> {
let parts = self.uri.as_str().splitn(2, "://").collect::<Vec<_>>();
let url = parts.get(1).unwrap_or(&"");
match self.get_protocol() {
"http" | "https" => Ok(url.replace("http", "ws")),
_ => Err(()),
}
}
pub fn get_hypertext_uri(&self) -> Result<String, ()> {
match self.get_protocol() {
"http" => Ok("http://".to_string() + self.get_host()),

View File

@@ -1,21 +1,30 @@
use std::collections::VecDeque;
use poem_openapi::Object;
use serde::{Deserialize, Serialize};
use super::config::{Destination, Location, Region};
#[derive(Debug, Object, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RoadTrace {
pub region: String,
pub location: String,
pub destination: String,
pub ip_address: String,
pub user_agent: String,
pub error: Option<String>,
}
impl RoadTrace {
pub fn from_structs(reg: Region, loc: Location, end: Destination) -> RoadTrace {
pub fn from_structs(
ip: String,
ua: String,
reg: Region,
loc: Location,
end: Destination,
) -> RoadTrace {
RoadTrace {
ip_address: ip,
user_agent: ua,
region: reg.id,
location: loc.id,
destination: end.id,
@@ -24,17 +33,16 @@ impl RoadTrace {
}
pub fn from_structs_with_error(
ip: String,
ua: String,
reg: Region,
loc: Location,
end: Destination,
err: String,
) -> RoadTrace {
RoadTrace {
region: reg.id,
location: loc.id,
destination: end.id,
error: Some(err),
}
let mut trace = Self::from_structs(ip, ua, reg, loc, end);
trace.error = Some(err);
trace
}
}
@@ -47,7 +55,7 @@ pub struct RoadMetrics {
pub recent_errors: VecDeque<RoadTrace>,
}
const MAX_TRACE_COUNT: usize = 10;
const MAX_TRACE_COUNT: usize = 32;
impl RoadMetrics {
pub fn new() -> RoadMetrics {
@@ -67,26 +75,35 @@ impl RoadMetrics {
}
}
pub fn add_success_request(&mut self, reg: Region, loc: Location, end: Destination) {
pub fn add_success_request(
&mut self,
ip: String,
ua: String,
reg: Region,
loc: Location,
end: Destination,
) {
self.requests_count += 1;
self.recent_successes
.push_back(RoadTrace::from_structs(reg, loc, end));
.push_back(RoadTrace::from_structs(ip, ua, reg, loc, end));
if self.recent_successes.len() > MAX_TRACE_COUNT {
self.recent_successes.pop_front();
}
}
pub fn add_faliure_request(
pub fn add_failure_request(
&mut self,
ip: String,
ua: String,
reg: Region,
loc: Location,
end: Destination,
err: String, // For some reason error is rarely clonable, so we use preformatted message
err: String, // For some reason error is rarely cloneable, so we use preformatted message
) {
self.requests_count += 1;
self.failures_count += 1;
self.recent_errors
.push_back(RoadTrace::from_structs_with_error(reg, loc, end, err));
.push_back(RoadTrace::from_structs_with_error(ip, ua, reg, loc, end, err));
if self.recent_errors.len() > MAX_TRACE_COUNT {
self.recent_errors.pop_front();
}

View File

@@ -1,7 +1,9 @@
use http::Method;
use poem::http::{HeaderMap, Uri};
use actix_web::http::header::{ContentType, HeaderMap};
use actix_web::http::{Method, StatusCode, Uri};
use regex::Regex;
use wildmatch::WildMatch;
use actix_web::{error, HttpResponse};
use derive_more::{Display};
use crate::warden::WardenInstance;
@@ -10,12 +12,56 @@ use self::{
metrics::RoadMetrics,
};
pub mod browser;
pub mod config;
pub mod loader;
pub mod metrics;
pub mod responder;
pub mod route;
pub mod server;
#[derive(Debug, Display)]
pub enum ProxyError {
#[display(fmt = "Upgrade required for this connection")]
UpgradeRequired,
#[display(fmt = "Remote gateway issue")]
BadGateway,
#[display(fmt = "No configured able to process this request")]
NoGateway,
#[display(fmt = "Not found")]
NotFound,
#[display(fmt = "Only accepts method GET")]
MethodGetOnly,
#[display(fmt = "Invalid request path")]
InvalidRequestPath,
#[display(fmt = "Upstream does not support protocol you used")]
NotImplemented,
}
impl error::ResponseError for ProxyError {
fn status_code(&self) -> StatusCode {
match *self {
ProxyError::UpgradeRequired => StatusCode::UPGRADE_REQUIRED,
ProxyError::BadGateway => StatusCode::BAD_GATEWAY,
ProxyError::NoGateway => StatusCode::NOT_FOUND,
ProxyError::NotFound => StatusCode::NOT_FOUND,
ProxyError::MethodGetOnly => StatusCode::METHOD_NOT_ALLOWED,
ProxyError::InvalidRequestPath => StatusCode::BAD_REQUEST,
ProxyError::NotImplemented => StatusCode::NOT_IMPLEMENTED,
}
}
fn error_response(&self) -> HttpResponse {
HttpResponse::build(self.status_code())
.insert_header(ContentType::html())
.body(self.to_string())
}
}
#[derive(Debug, Clone)]
pub struct RoadInstance {
@@ -38,7 +84,7 @@ impl RoadInstance {
pub fn filter(
&self,
uri: &Uri,
method: Method,
method: &Method,
headers: &HeaderMap,
) -> Option<(&Region, &Location)> {
self.regions.iter().find_map(|region| {

View File

@@ -1,117 +1,205 @@
use futures_util::{SinkExt, StreamExt};
use http::{header, request::Builder, HeaderMap, Method, StatusCode, Uri};
use lazy_static::lazy_static;
use poem::{
web::{websocket::WebSocket, StaticFileRequest},
Body, Error, FromRequest, IntoResponse, Request, Response,
};
use crate::proxies::ProxyError;
use crate::proxies::ProxyError::{BadGateway, UpgradeRequired};
use actix_files::NamedFile;
use actix_web::http::{header, Method};
use actix_web::{web, HttpRequest, HttpResponse};
use awc::error::HeaderValue;
use awc::http::Uri;
use awc::Client;
use futures::Sink;
use futures::stream::StreamExt;
use std::str::FromStr;
use std::time::Duration;
use std::{
ffi::OsStr,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::RwLock;
use tokio_tungstenite::connect_async;
use super::browser::{DirectoryTemplate, FileRef};
lazy_static! {
pub static ref CLIENT: reqwest::Client = reqwest::Client::new();
}
pub async fn repond_websocket(req: Builder, ws: WebSocket) -> Response {
ws.on_upgrade(move |socket| async move {
let (mut clientsink, mut clientstream) = socket.split();
// Start connection to server
let (serversocket, _) = connect_async(req.body(()).unwrap()).await.unwrap();
let (mut serversink, mut serverstream) = serversocket.split();
let client_live = Arc::new(RwLock::new(true));
let server_live = client_live.clone();
tokio::spawn(async move {
while let Some(Ok(msg)) = clientstream.next().await {
if (serversink.send(msg.into()).await).is_err() {
break;
};
if !*client_live.read().await {
break;
};
}
*client_live.write().await = false;
});
// Relay server messages to the client
tokio::spawn(async move {
while let Some(Ok(msg)) = serverstream.next().await {
if (clientsink.send(msg.into()).await).is_err() {
break;
};
if !*server_live.read().await {
break;
};
}
*server_live.write().await = false;
});
})
.into_response()
}
use actix::io::{SinkWrite, WriteHandler};
use actix::{Actor, ActorContext, AsyncContext, StreamHandler};
use actix_web_actors::ws;
use actix_web_actors::ws::{CloseReason, handshake, ProtocolError, WebsocketContext};
use tracing::log::warn;
pub async fn respond_hypertext(
uri: String,
ori: &Uri,
req: &Request,
method: Method,
body: Body,
headers: &HeaderMap,
) -> Result<Response, Error> {
let ip = req.remote_addr().to_string();
let proto = req.uri().scheme_str().unwrap();
let host = req.uri().host().unwrap();
let mut headers = headers.clone();
headers.insert("Server", "RoadSign".parse().unwrap());
headers.insert("X-Forward-For", ip.parse().unwrap());
headers.insert("X-Forwarded-Proto", proto.parse().unwrap());
headers.insert("X-Forwarded-Host", host.parse().unwrap());
headers.insert("X-Real-IP", ip.parse().unwrap());
headers.insert(
"Forwarded",
format!("by={};for={};host={};proto={}", ip, ip, host, proto)
.parse()
.unwrap(),
);
let res = CLIENT
.request(method, uri + ori.path() + ori.query().unwrap_or(""))
.headers(headers.clone())
.body(body.into_bytes().await.unwrap())
.send()
.await;
match res {
Ok(result) => {
let mut res = Response::default();
res.extensions().clone_from(&result.extensions());
result.headers().iter().for_each(|(key, val)| {
res.headers_mut().insert(key, val.to_owned());
});
res.headers_mut()
.insert("Server", "RoadSign".parse().unwrap());
res.set_status(result.status());
res.set_version(result.version());
res.set_body(result.bytes().await.unwrap());
Ok(res)
}
Err(error) => Err(Error::from_string(
error.to_string(),
error.status().unwrap_or(StatusCode::BAD_GATEWAY),
)),
req: HttpRequest,
payload: web::Payload,
client: web::Data<Client>,
) -> Result<HttpResponse, ProxyError> {
let mut append_part = req.uri().to_string();
if let Some(stripped_uri) = append_part.strip_prefix('/') {
append_part = stripped_uri.to_string();
}
let uri = Uri::from_str(uri.as_str()).expect("Invalid upstream");
let target_url = format!("{}{}", uri, append_part);
let forwarded_req = client
.request_from(target_url.as_str(), req.head())
.insert_header((header::HOST, uri.host().expect("Invalid upstream")));
let forwarded_req = match req.connection_info().realip_remote_addr() {
Some(addr) => forwarded_req
.insert_header((header::X_FORWARDED_FOR, addr))
.insert_header((header::X_FORWARDED_PROTO, req.connection_info().scheme()))
.insert_header((header::X_FORWARDED_HOST, req.connection_info().host()))
.insert_header((
header::FORWARDED,
format!(
"by={};for={};host={};proto={}",
addr,
addr,
req.connection_info().host(),
req.connection_info().scheme()
),
)),
None => forwarded_req,
};
if req
.headers()
.get(header::UPGRADE)
.unwrap_or(&HeaderValue::from_static(""))
.to_str()
.unwrap_or("")
.to_lowercase()
== "websocket"
{
let uri = uri.to_string().replacen("http", "ws", 1);
return respond_websocket(uri, req, payload).await;
}
let res = forwarded_req
.timeout(Duration::from_secs(1800))
.send_stream(payload)
.await
.map_err(|err| {
warn!("Remote gateway issue... {}", err);
BadGateway
})?;
let mut client_resp = HttpResponse::build(res.status());
for (header_name, header_value) in res
.headers()
.iter()
.filter(|(h, _)| *h != header::CONNECTION && *h != header::CONTENT_ENCODING)
{
client_resp.insert_header((header_name.clone(), header_value.clone()));
}
Ok(client_resp.streaming(res))
}
pub struct WebsocketProxy<S>
where
S: Unpin + Sink<ws::Message>,
{
send: SinkWrite<ws::Message, S>,
}
impl<S> WriteHandler<ProtocolError> for WebsocketProxy<S>
where
S: Unpin + 'static + Sink<ws::Message>,
{
fn error(&mut self, err: ProtocolError, ctx: &mut Self::Context) -> actix::Running {
self.error(err, ctx);
actix::Running::Stop
}
}
impl<S> Actor for WebsocketProxy<S>
where
S: Unpin + 'static + Sink<ws::Message>,
{
type Context = WebsocketContext<Self>;
}
impl<S> StreamHandler<Result<ws::Frame, ProtocolError>> for WebsocketProxy<S>
where
S: Unpin + Sink<ws::Message> + 'static,
{
fn handle(&mut self, item: Result<ws::Frame, ProtocolError>, ctx: &mut Self::Context) {
let frame = match item {
Ok(frame) => frame,
Err(err) => return self.error(err, ctx),
};
let msg = match frame {
ws::Frame::Text(t) => match t.try_into() {
Ok(t) => ws::Message::Text(t),
Err(e) => {
self.error(e, ctx);
return;
}
},
ws::Frame::Binary(b) => ws::Message::Binary(b),
ws::Frame::Continuation(c) => ws::Message::Continuation(c),
ws::Frame::Ping(p) => ws::Message::Ping(p),
ws::Frame::Pong(p) => ws::Message::Pong(p),
ws::Frame::Close(r) => ws::Message::Close(r),
};
ctx.write_raw(msg)
}
}
impl<S> StreamHandler<Result<ws::Message, ProtocolError>> for WebsocketProxy<S>
where
S: Unpin + Sink<ws::Message> + 'static,
{
fn handle(&mut self, item: Result<ws::Message, ProtocolError>, ctx: &mut Self::Context) {
let msg = match item {
Ok(msg) => msg,
Err(err) => return self.error(err, ctx),
};
let _ = self.send.write(msg);
}
}
impl<S> WebsocketProxy<S>
where
S: Unpin + Sink<ws::Message> + 'static,
{
fn error<E>(&mut self, err: E, ctx: &mut <Self as Actor>::Context)
where
E: std::error::Error,
{
let reason = Some(CloseReason {
code: ws::CloseCode::Error,
description: Some(err.to_string()),
});
ctx.close(reason.clone());
let _ = self.send.write(ws::Message::Close(reason));
self.send.close();
ctx.stop();
}
}
pub async fn respond_websocket(
uri: String,
req: HttpRequest,
payload: web::Payload,
) -> Result<HttpResponse, ProxyError> {
let mut res = handshake(&req).map_err(|_| UpgradeRequired)?;
let (_, conn) = awc::Client::new()
.ws(uri)
.connect()
.await
.map_err(|_| BadGateway)?;
let (send, recv) = conn.split();
let out = WebsocketContext::with_factory(payload, |ctx| {
ctx.add_stream(recv);
WebsocketProxy {
send: SinkWrite::new(send, ctx),
}
});
Ok(res.streaming(out))
}
pub struct StaticResponderConfig {
@@ -126,14 +214,10 @@ pub struct StaticResponderConfig {
pub async fn respond_static(
cfg: StaticResponderConfig,
method: Method,
req: &Request,
) -> Result<Response, Error> {
if method != Method::GET {
return Err(Error::from_string(
"This destination only support GET request.",
StatusCode::METHOD_NOT_ALLOWED,
));
req: HttpRequest,
) -> Result<HttpResponse, ProxyError> {
if req.method() != Method::GET {
return Err(ProxyError::MethodGetOnly);
}
let path = req
@@ -142,9 +226,12 @@ pub async fn respond_static(
.trim_start_matches('/')
.trim_end_matches('/');
let path = percent_encoding::percent_decode_str(path)
.decode_utf8()
.map_err(|_| Error::from_status(StatusCode::NOT_FOUND))?;
let path = match percent_encoding::percent_decode_str(path).decode_utf8() {
Ok(val) => val,
Err(_) => {
return Err(ProxyError::NotFound);
}
};
let base_path = cfg.uri.parse::<PathBuf>().unwrap();
let mut file_path = base_path.clone();
@@ -159,7 +246,7 @@ pub async fn respond_static(
}
if !file_path.starts_with(cfg.uri) {
return Err(Error::from_status(StatusCode::FORBIDDEN));
return Err(ProxyError::InvalidRequestPath);
}
if !file_path.exists() {
@@ -172,87 +259,30 @@ pub async fn respond_static(
file_path.pop();
file_path.push((file_name + &suffix).as_str());
if file_path.is_file() {
return Ok(StaticFileRequest::from_request_without_body(req)
.await?
.create_response(&file_path, cfg.utf8)?
.into_response());
return Ok(NamedFile::open(file_path).unwrap().into_response(&req));
}
}
if let Some(file) = cfg.fallback {
let fallback_path = base_path.join(file);
if fallback_path.is_file() {
return Ok(StaticFileRequest::from_request_without_body(req)
.await?
.create_response(&fallback_path, cfg.utf8)?
.into_response());
return Ok(NamedFile::open(fallback_path).unwrap().into_response(&req));
}
}
return Err(Error::from_status(StatusCode::NOT_FOUND));
return Err(ProxyError::NotFound);
}
if file_path.is_file() {
Ok(StaticFileRequest::from_request_without_body(req)
.await?
.create_response(&file_path, cfg.utf8)?
.into_response())
Ok(NamedFile::open(file_path).unwrap().into_response(&req))
} else {
if cfg.with_slash
&& !req.original_uri().path().ends_with('/')
&& (cfg.index.is_some() || cfg.browse)
{
let redirect_to = format!("{}/", req.original_uri().path());
return Ok(Response::builder()
.status(StatusCode::FOUND)
.header(header::LOCATION, redirect_to)
.finish());
}
if let Some(index_file) = &cfg.index {
let index_path = file_path.join(index_file);
if index_path.is_file() {
return Ok(StaticFileRequest::from_request_without_body(req)
.await?
.create_response(&index_path, cfg.utf8)?
.into_response());
return Ok(NamedFile::open(index_path).unwrap().into_response(&req));
}
}
if cfg.browse {
let read_dir = file_path
.read_dir()
.map_err(|_| Error::from_status(StatusCode::FORBIDDEN))?;
let mut template = DirectoryTemplate {
path: &path,
files: Vec::new(),
};
for res in read_dir {
let entry = res.map_err(|_| Error::from_status(StatusCode::FORBIDDEN))?;
if let Some(filename) = entry.file_name().to_str() {
let mut base_url = req.original_uri().path().to_string();
if !base_url.ends_with('/') {
base_url.push('/');
}
let filename_url = percent_encoding::percent_encode(
filename.as_bytes(),
percent_encoding::NON_ALPHANUMERIC,
);
template.files.push(FileRef {
url: format!("{base_url}{filename_url}"),
filename: filename.to_string(),
is_dir: entry.path().is_dir(),
});
}
}
let html = template.render();
Ok(Response::builder()
.header(header::CONTENT_TYPE, mime::TEXT_HTML_UTF_8.as_ref())
.body(Body::from_string(html)))
} else {
Err(Error::from_status(StatusCode::NOT_FOUND))
}
Err(ProxyError::NotFound)
}
}

View File

@@ -1,10 +1,6 @@
use http::Method;
use poem::{
handler,
http::{HeaderMap, StatusCode, Uri},
web::websocket::WebSocket,
Body, Error, FromRequest, IntoResponse, Request, Response, Result,
};
use actix_web::{HttpRequest, HttpResponse, ResponseError, web};
use actix_web::http::header;
use awc::Client;
use rand::seq::SliceRandom;
use crate::{
@@ -14,23 +10,14 @@ use crate::{
},
ROAD,
};
use crate::proxies::ProxyError;
#[handler]
pub async fn handle(
req: &Request,
uri: &Uri,
headers: &HeaderMap,
method: Method,
body: Body,
) -> Result<impl IntoResponse, Error> {
pub async fn handle(req: HttpRequest, payload: web::Payload, client: web::Data<Client>) -> HttpResponse {
let readable_app = ROAD.lock().await;
let (region, location) = match readable_app.filter(uri, method.clone(), headers) {
let (region, location) = match readable_app.filter(req.uri(), req.method(), req.headers()) {
Some(val) => val,
None => {
return Err(Error::from_string(
"There are no region be able to respone this request.",
StatusCode::NOT_FOUND,
))
return ProxyError::NoGateway.error_response();
}
};
@@ -41,58 +28,27 @@ pub async fn handle(
async fn forward(
end: &Destination,
req: &Request,
ori: &Uri,
headers: &HeaderMap,
method: Method,
body: Body,
) -> Result<Response, Error> {
// Handle websocket
if let Ok(ws) = WebSocket::from_request_without_body(req).await {
// Get uri
let Ok(uri) = end.get_websocket_uri() else {
return Err(Error::from_string(
"This destination was not support websockets.",
StatusCode::NOT_IMPLEMENTED,
));
};
// Build request
let mut ws_req = http::Request::builder().uri(&uri);
for (key, value) in headers.iter() {
ws_req = ws_req.header(key, value);
}
// Start the websocket connection
return Ok(responder::repond_websocket(ws_req, ws).await);
}
req: HttpRequest,
payload: web::Payload,
client: web::Data<Client>,
) -> Result<HttpResponse, ProxyError> {
// Handle normal web request
match end.get_type() {
DestinationType::Hypertext => {
let Ok(uri) = end.get_hypertext_uri() else {
return Err(Error::from_string(
"This destination was not support web requests.",
StatusCode::NOT_IMPLEMENTED,
));
return Err(ProxyError::NotImplemented);
};
responder::respond_hypertext(uri, ori, req, method, body, headers).await
responder::respond_hypertext(uri, req, payload, client).await
}
DestinationType::StaticFiles => {
let Ok(cfg) = end.get_static_config() else {
return Err(Error::from_string(
"This destination was not support static files.",
StatusCode::NOT_IMPLEMENTED,
));
return Err(ProxyError::NotImplemented);
};
responder::respond_static(cfg, method, req).await
responder::respond_static(cfg, req).await
}
_ => Err(Error::from_string(
"Unsupported destination protocol.",
StatusCode::NOT_IMPLEMENTED,
)),
_ => Err(ProxyError::NotImplemented)
}
}
@@ -100,23 +56,32 @@ pub async fn handle(
let loc = location.clone();
let end = destination.clone();
match forward(&end, req, uri, headers, method, body).await {
let ip = match req.connection_info().realip_remote_addr() {
None => "unknown".to_string(),
Some(val) => val.to_string(),
};
let ua = match req.headers().get(header::USER_AGENT) {
None => "unknown".to_string(),
Some(val) => val.to_str().unwrap().to_string(),
};
match forward(&end, req, payload, client).await {
Ok(resp) => {
tokio::spawn(async move {
let writable_app = &mut ROAD.lock().await;
writable_app.metrics.add_success_request(reg, loc, end);
writable_app.metrics.add_success_request(ip, ua, reg, loc, end);
});
Ok(resp)
resp
}
Err(err) => {
let message = format!("{:}", err);
Err(resp) => {
let message = resp.to_string();
tokio::spawn(async move {
let writable_app = &mut ROAD.lock().await;
writable_app
.metrics
.add_faliure_request(reg, loc, end, message);
.add_failure_request(ip, ua, reg, loc, end, message);
});
Err(err)
resp.error_response()
}
}
}

40
src/proxies/server.rs Normal file
View File

@@ -0,0 +1,40 @@
use std::error;
use actix_web::{App, HttpServer, web};
use actix_web::dev::Server;
use actix_web::middleware::{Compress, Logger};
use awc::Client;
use crate::config::CFG;
use crate::proxies::route;
use crate::server::ServerBindConfig;
use crate::tls::{load_certificates, use_rustls};
pub async fn build_proxies() -> Result<Vec<Server>, Box<dyn error::Error>> {
load_certificates().await?;
let cfg = CFG
.read()
.await
.get::<Vec<ServerBindConfig>>("proxies.bind")?;
let mut tasks = Vec::new();
for item in cfg {
tasks.push(build_single_proxy(item)?);
}
Ok(tasks)
}
pub fn build_single_proxy(cfg: ServerBindConfig) -> Result<Server, Box<dyn error::Error>> {
let server = HttpServer::new(|| {
App::new()
.wrap(Logger::default())
.wrap(Compress::default())
.app_data(web::Data::new(Client::default()))
.default_service(web::to(route::handle))
});
if cfg.tls {
Ok(server.bind_rustls_0_22(cfg.addr, use_rustls()?)?.run())
} else {
Ok(server.bind(cfg.addr)?.run())
}
}

7
src/server.rs Normal file
View File

@@ -0,0 +1,7 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct ServerBindConfig {
pub addr: String,
pub tls: bool,
}

View File

@@ -1,19 +1,15 @@
use poem_openapi::OpenApi;
use actix_web::{Scope, web};
use crate::sideload::overview::get_overview;
use crate::sideload::regions::list_region;
pub mod overview;
pub mod regions;
mod overview;
mod regions;
pub mod server;
pub struct SideloadApi;
static ROOT: &str = "";
#[OpenApi]
impl SideloadApi {
#[oai(path = "/", method = "get")]
async fn index(&self) -> overview::OverviewResponse {
overview::index().await
}
#[oai(path = "/regions", method = "get")]
async fn regions_index(&self) -> regions::RegionResponse {
regions::index().await
}
}
pub fn service() -> Scope {
web::scope("/cgi")
.route(ROOT, web::get().to(get_overview))
.route("/regions", web::get().to(list_region))
}

View File

@@ -1,46 +1,23 @@
use poem_openapi::{payload::Json, ApiResponse, Object};
use actix_web::web;
use serde::Serialize;
use crate::proxies::config::{Destination, Location};
use crate::proxies::metrics::RoadTrace;
use crate::ROAD;
use crate::{
proxies::{
config::{Destination, Location},
metrics::RoadTrace,
},
ROAD,
};
#[derive(ApiResponse)]
pub enum OverviewResponse {
/// Return the overview data.
#[oai(status = 200)]
Ok(Json<OverviewData>),
}
#[derive(Debug, Object, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct OverviewData {
/// Loaded regions count
#[oai(read_only)]
regions: usize,
/// Loaded locations count
#[oai(read_only)]
locations: usize,
/// Loaded destnations count
#[oai(read_only)]
destinations: usize,
/// Recent requests count
requests_count: u64,
/// Recent requests success count
faliures_count: u64,
/// Recent requests falied count
failures_count: u64,
successes_count: u64,
/// Recent requests success rate
success_rate: f64,
/// Recent successes
recent_successes: Vec<RoadTrace>,
/// Recent errors
recent_errors: Vec<RoadTrace>,
}
pub async fn index() -> OverviewResponse {
pub async fn get_overview() -> web::Json<OverviewData> {
let locked_app = ROAD.lock().await;
let regions = locked_app.regions.clone();
let locations = regions
@@ -51,13 +28,13 @@ pub async fn index() -> OverviewResponse {
.iter()
.flat_map(|item| item.destinations.clone())
.collect::<Vec<Destination>>();
OverviewResponse::Ok(Json(OverviewData {
web::Json(OverviewData {
regions: regions.len(),
locations: locations.len(),
destinations: destinations.len(),
requests_count: locked_app.metrics.requests_count,
successes_count: locked_app.metrics.requests_count - locked_app.metrics.failures_count,
faliures_count: locked_app.metrics.failures_count,
failures_count: locked_app.metrics.failures_count,
success_rate: locked_app.metrics.get_success_rate(),
recent_successes: locked_app
.metrics
@@ -71,5 +48,5 @@ pub async fn index() -> OverviewResponse {
.clone()
.into_iter()
.collect::<Vec<_>>(),
}))
}
})
}

View File

@@ -1,25 +1,9 @@
use poem_openapi::{payload::Json, ApiResponse};
use actix_web::web;
use crate::proxies::config::Region;
use crate::ROAD;
use crate::{proxies::config::Region, ROAD};
#[derive(ApiResponse)]
pub enum RegionResponse {
/// Return the region data.
#[oai(status = 200)]
Ok(Json<Region>),
/// Return the list of region data.
#[oai(status = 200)]
OkMany(Json<Vec<Region>>),
/// Return the region data after created.
#[oai(status = 201)]
Created(Json<Region>),
/// Return was not found.
#[oai(status = 404)]
NotFound,
}
pub async fn index() -> RegionResponse {
pub async fn list_region() -> web::Json<Vec<Region>> {
let locked_app = ROAD.lock().await;
RegionResponse::OkMany(Json(locked_app.regions.clone()))
}
web::Json(locked_app.regions.clone())
}

35
src/sideload/server.rs Normal file
View File

@@ -0,0 +1,35 @@
use std::error;
use actix_web::dev::Server;
use actix_web::{App, HttpServer};
use actix_web_httpauth::extractors::AuthenticationError;
use actix_web_httpauth::headers::www_authenticate::basic::Basic;
use actix_web_httpauth::middleware::HttpAuthentication;
use crate::sideload;
pub async fn build_sideload() -> Result<Server, Box<dyn error::Error>> {
Ok(
HttpServer::new(|| {
App::new()
.wrap(HttpAuthentication::basic(|req, credentials| async move {
let password = match crate::config::CFG
.read()
.await
.get_string("secret") {
Ok(val) => val,
Err(_) => return Err((AuthenticationError::new(Basic::new()).into(), req))
};
if credentials.password().unwrap_or("") != password {
Err((AuthenticationError::new(Basic::new()).into(), req))
} else {
Ok(req)
}
}))
.service(sideload::service())
}).bind(
crate::config::CFG
.read()
.await
.get_string("sideload.bind_addr")?
)?.workers(1).run()
)
}

76
src/tls.rs Normal file
View File

@@ -0,0 +1,76 @@
use std::fs::File;
use std::{error};
use std::io::BufReader;
use std::sync::Arc;
use config::ConfigError;
use lazy_static::lazy_static;
use rustls::crypto::ring::sign::RsaSigningKey;
use rustls::server::{ClientHello, ResolvesServerCert};
use rustls::sign::CertifiedKey;
use serde::{Deserialize, Serialize};
use std::sync::Mutex;
use wildmatch::WildMatch;
lazy_static! {
static ref CERTS: Mutex<Vec<CertificateConfig>> = Mutex::new(Vec::new());
}
#[derive(Debug)]
struct ProxyCertResolver;
impl ResolvesServerCert for ProxyCertResolver {
fn resolve(&self, handshake: ClientHello) -> Option<Arc<CertifiedKey>> {
let domain = handshake.server_name()?;
let certs = CERTS.lock().unwrap();
for cert in certs.iter() {
if WildMatch::new(cert.domain.as_str()).matches(domain) {
return match cert.clone().load() {
Ok(val) => Some(val),
Err(_) => None
};
}
}
None
}
}
#[derive(Clone, Serialize, Deserialize)]
struct CertificateConfig {
pub domain: String,
pub certs: String,
pub key: String,
}
impl CertificateConfig {
pub fn load(self) -> Result<Arc<CertifiedKey>, Box<dyn error::Error>> {
let certs =
rustls_pemfile::certs(&mut BufReader::new(&mut File::open(self.certs)?))
.collect::<Result<Vec<_>, _>>()?;
let key =
rustls_pemfile::private_key(&mut BufReader::new(&mut File::open(self.key)?))?
.unwrap();
let sign = RsaSigningKey::new(&key)?;
Ok(Arc::new(CertifiedKey::new(certs, Arc::new(sign))))
}
}
pub async fn load_certificates() -> Result<(), ConfigError> {
let certs = crate::config::CFG
.read()
.await
.get::<Vec<CertificateConfig>>("certificates")?;
CERTS.lock().unwrap().clone_from(&certs);
Ok(())
}
pub fn use_rustls() -> Result<rustls::ServerConfig, ConfigError> {
Ok(
rustls::ServerConfig::builder()
.with_no_client_auth()
.with_cert_resolver(Arc::new(ProxyCertResolver))
)
}

View File

@@ -2,10 +2,9 @@ pub mod runner;
use std::collections::HashMap;
use futures_util::lock::Mutex;
use lazy_static::lazy_static;
use poem_openapi::Object;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tracing::{debug, warn};
use crate::proxies::config::Region;
@@ -31,7 +30,7 @@ impl WardenInstance {
pub fn scan(&mut self, regions: Vec<Region>) {
self.applications = regions
.iter()
.flat_map(|item| item.applications.clone())
.flat_map(|item| item.applications.clone().unwrap_or_default())
.collect::<Vec<Application>>();
debug!(
applications = format!("{:?}", self.applications),
@@ -63,7 +62,7 @@ impl Default for WardenInstance {
}
}
#[derive(Debug, Object, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Application {
pub id: String,
pub exe: String,

View File

@@ -1,12 +1,12 @@
use std::{borrow::BorrowMut, collections::HashMap, io};
use super::Application;
use futures_util::lock::Mutex;
use lazy_static::lazy_static;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::{Child, Command},
};
use tokio::sync::Mutex;
lazy_static! {
static ref STDOUT: Mutex<HashMap<String, String>> = Mutex::new(HashMap::new());

29
test/websocket/server.py Normal file
View File

@@ -0,0 +1,29 @@
import asyncio
import websockets
async def handle_websocket(websocket, path):
# This function will be called whenever a new WebSocket connection is established
# Send a welcome message to the client
await websocket.send("Welcome to the WebSocket server!")
try:
# Enter the main loop to handle incoming messages
async for message in websocket:
# Print the received message
print(f"Received message: {message}")
# Send a response back to the client
response = f"Server received: {message}"
await websocket.send(response)
except websockets.exceptions.ConnectionClosedError:
print("Connection closed by the client.")
# Create the WebSocket server
start_server = websockets.serve(handle_websocket, "localhost", 8765)
print("WebSocket server started at ws://localhost:8765")
# Run the server indefinitely
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()