WebSocket Support

This commit is contained in:
LittleSheep 2024-02-14 13:53:43 +08:00
parent e40fe6049f
commit 7eee10c4ff
11 changed files with 381 additions and 112 deletions

179
Cargo.lock generated
View File

@ -2,6 +2,31 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "actix"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb72882332b6d6282f428b77ba0358cb2687e61a6f6df6a6d3871e8a177c2d4f"
dependencies = [
"actix-macros",
"actix-rt",
"actix_derive",
"bitflags 2.4.1",
"bytes",
"crossbeam-channel",
"futures-core",
"futures-sink",
"futures-task",
"futures-util",
"log",
"once_cell",
"parking_lot",
"pin-project-lite",
"smallvec",
"tokio",
"tokio-util",
]
[[package]]
name = "actix-codec"
version = "0.5.2"
@ -228,6 +253,24 @@ dependencies = [
"url",
]
[[package]]
name = "actix-web-actors"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "420b001bb709d8510c3e2659dae046e54509ff9528018d09c78381e765a1f9fa"
dependencies = [
"actix",
"actix-codec",
"actix-http",
"actix-web",
"bytes",
"bytestring",
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "actix-web-codegen"
version = "4.2.2"
@ -255,6 +298,17 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "actix_derive"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c7db3d5a9718568e4cf4a537cfd7070e6e6ff7481510d0237fb529ac850f6d3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
]
[[package]]
name = "addr2line"
version = "0.21.0"
@ -425,12 +479,6 @@ dependencies = [
"alloc-stdlib",
]
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.5.0"
@ -546,6 +594,21 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "176dc175b78f56c0f321911d9c8eb2b77a78a4860b9c19db83835fea1a46649b"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
[[package]]
name = "crunchy"
version = "0.2.2"
@ -562,12 +625,6 @@ dependencies = [
"typenum",
]
[[package]]
name = "data-encoding"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5"
[[package]]
name = "deranged"
version = "0.3.11"
@ -664,12 +721,65 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
@ -688,9 +798,13 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
@ -1290,13 +1404,16 @@ dependencies = [
name = "roadsign"
version = "0.1.0"
dependencies = [
"actix",
"actix-files",
"actix-proxy",
"actix-web",
"actix-web-actors",
"actix-web-httpauth",
"awc",
"config",
"derive_more",
"futures",
"lazy_static",
"mime",
"percent-encoding",
@ -1308,7 +1425,6 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tokio-tungstenite",
"toml",
"tracing",
"tracing-subscriber",
@ -1698,18 +1814,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.7.10"
@ -1816,25 +1920,6 @@ dependencies = [
"tracing-log",
]
[[package]]
name = "tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.0.0",
"httparse",
"log",
"rand",
"sha1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.17.0"
@ -1900,12 +1985,6 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "v_htmlescape"
version = "0.15.8"

View File

@ -26,7 +26,6 @@ tokio = { version = "1.35.1", features = [
"time",
"full",
] }
tokio-tungstenite = "0.21.0"
toml = "0.8.8"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
@ -34,3 +33,6 @@ wildmatch = "2.3.0"
derive_more = "0.99.17"
rustls = "0.22.2"
rustls-pemfile = "2.0.0"
futures = "0.3.30"
actix-web-actors = "4.3.0"
actix = "0.13.3"

View File

@ -74,9 +74,9 @@ rds cli with this command.
```shell
rds connect <id> <url> <password>
# ID will allow you find this server.rs in after commands.
# URL is to your roadsign server.rs sideload api.
# Password is your roadsign server.rs credential.
# ID will allow you find this server.py.rs in after commands.
# URL is to your roadsign server.py.rs sideload api.
# Password is your roadsign server.py.rs credential.
# ======================================================================
# !WARNING! All these things will storage in your $HOME/.roadsignrc.yaml
# ======================================================================
@ -85,8 +85,8 @@ rds connect <id> <url> <password>
Then, sync your local config to remote.
```shell
rds sync <server.rs id> <site id> <config file>
# Server ID is your server.rs added by last command.
rds sync <server.py.rs id> <site id> <config file>
# Server ID is your server.py.rs added by last command.
# Site ID is your new site id or old site id if you need update it.
# Config File is your local config file path.
```

View File

@ -5,8 +5,11 @@ id = "root"
hosts = ["localhost"]
paths = ["/"]
[[locations.destinations]]
id = "hypertext"
uri = "https://postman-echo.com/get"
id = "websocket"
uri = "http://localhost:8765"
# [[locations.destinations]]
# id = "hypertext"
# uri = "https://example.com"
# [[locations.destinations]]
# id = "static"
# uri = "files://regions?index=index.html"

View File

@ -21,6 +21,9 @@ pub mod server;
#[derive(Debug, Display)]
pub enum ProxyError {
#[display(fmt = "Upgrade required for this connection")]
UpgradeRequired,
#[display(fmt = "Remote gateway issue")]
BadGateway,
@ -43,6 +46,7 @@ pub enum ProxyError {
impl error::ResponseError for ProxyError {
fn status_code(&self) -> StatusCode {
match *self {
ProxyError::UpgradeRequired => StatusCode::UPGRADE_REQUIRED,
ProxyError::BadGateway => StatusCode::BAD_GATEWAY,
ProxyError::NoGateway => StatusCode::NOT_FOUND,
ProxyError::NotFound => StatusCode::NOT_FOUND,

View File

@ -1,55 +1,206 @@
use crate::proxies::ProxyError;
use crate::proxies::ProxyError::{BadGateway, UpgradeRequired};
use actix_files::NamedFile;
use actix_web::http::{header, Method};
use actix_web::web::BytesMut;
use actix_web::{web, Error, HttpRequest, HttpResponse};
use awc::error::HeaderValue;
use awc::http::Uri;
use awc::Client;
use futures::{channel::mpsc::unbounded, Sink, sink::SinkExt, stream::StreamExt};
use std::str::FromStr;
use std::time::Duration;
use std::{
ffi::OsStr,
path::{Path, PathBuf},
};
use actix_files::{NamedFile};
use actix_proxy::IntoHttpResponse;
use actix_web::{HttpRequest, HttpResponse, web};
use actix_web::http::{header, Method};
use actix_web::http::header::HeaderValue;
use awc::Client;
use actix::io::{SinkWrite, WriteHandler};
use actix::{Actor, ActorContext, AsyncContext, StreamHandler};
use actix_web_actors::ws;
use actix_web_actors::ws::{CloseReason, handshake, ProtocolError, WebsocketContext};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::log::warn;
use crate::proxies::ProxyError;
pub async fn respond_hypertext(
uri: String,
req: HttpRequest,
payload: web::Payload,
client: web::Data<Client>,
) -> Result<HttpResponse, ProxyError> {
let conn = req.connection_info();
let ip = conn.realip_remote_addr().unwrap_or("0.0.0.0");
let proto = conn.scheme();
let host = conn.host();
let mut append_part = req.uri().to_string();
if let Some(stripped_uri) = append_part.strip_prefix('/') {
append_part = stripped_uri.to_string();
}
let mut headers = req.headers().clone();
headers.insert(header::X_FORWARDED_FOR, ip.parse().unwrap());
headers.insert(header::X_FORWARDED_PROTO, proto.parse().unwrap());
headers.insert(header::X_FORWARDED_HOST, host.parse().unwrap());
headers.insert(
header::FORWARDED,
format!("by={};for={};host={};proto={}", ip, ip, host, proto)
.parse()
.unwrap(),
);
let append_part = req.uri().to_string().chars().skip(1).collect::<String>();
let uri = Uri::from_str(uri.as_str()).expect("Invalid upstream");
let target_url = format!("{}{}", uri, append_part);
let res = client.request(req.method().clone(), target_url).send().await;
let forwarded_req = client
.request_from(target_url.as_str(), req.head())
.insert_header((header::HOST, uri.host().expect("Invalid upstream")));
return match res {
Ok(result) => {
let mut res = result.into_http_response();
res.headers_mut().insert(header::SERVER, HeaderValue::from_static("RoadSign"));
res.headers_mut().remove(header::CONTENT_ENCODING);
Ok(res)
}
Err(error) => {
warn!("Proxy got a upstream issue... {:?}", error);
Err(ProxyError::BadGateway)
}
let forwarded_req = match req.connection_info().realip_remote_addr() {
Some(addr) => forwarded_req
.insert_header((header::X_FORWARDED_FOR, addr))
.insert_header((header::X_FORWARDED_PROTO, req.connection_info().scheme()))
.insert_header((header::X_FORWARDED_HOST, req.connection_info().host()))
.insert_header((
header::FORWARDED,
format!(
"by={};for={};host={};proto={}",
addr,
addr,
req.connection_info().host(),
req.connection_info().scheme()
),
)),
None => forwarded_req,
};
if req
.headers()
.get(header::UPGRADE)
.unwrap_or(&HeaderValue::from_static(""))
.to_str()
.unwrap_or("")
.to_lowercase()
== "websocket"
{
let uri = uri.to_string().replacen("http", "ws", 1);
return respond_websocket(uri, req, payload).await;
}
let res = forwarded_req
.timeout(Duration::from_secs(1800))
.send_stream(payload)
.await
.map_err(|err| {
warn!("Remote gateway issue... {}", err);
BadGateway
})?;
let mut client_resp = HttpResponse::build(res.status());
for (header_name, header_value) in res
.headers()
.iter()
.filter(|(h, _)| *h != header::CONNECTION && *h != header::CONTENT_ENCODING)
{
client_resp.insert_header((header_name.clone(), header_value.clone()));
}
Ok(client_resp.streaming(res))
}
pub struct WebsocketProxy<S>
where
S: Unpin + Sink<ws::Message>,
{
send: SinkWrite<ws::Message, S>,
}
impl<S> WriteHandler<ProtocolError> for WebsocketProxy<S>
where
S: Unpin + 'static + Sink<ws::Message>,
{
fn error(&mut self, err: ProtocolError, ctx: &mut Self::Context) -> actix::Running {
self.error(err, ctx);
actix::Running::Stop
}
}
impl<S> Actor for WebsocketProxy<S>
where
S: Unpin + 'static + Sink<ws::Message>,
{
type Context = WebsocketContext<Self>;
}
impl<S> StreamHandler<Result<ws::Frame, ProtocolError>> for WebsocketProxy<S>
where
S: Unpin + Sink<ws::Message> + 'static,
{
fn handle(&mut self, item: Result<ws::Frame, ProtocolError>, ctx: &mut Self::Context) {
let frame = match item {
Ok(frame) => frame,
Err(err) => return self.error(err, ctx),
};
let msg = match frame {
ws::Frame::Text(t) => match t.try_into() {
Ok(t) => ws::Message::Text(t),
Err(e) => {
self.error(e, ctx);
return;
}
},
ws::Frame::Binary(b) => ws::Message::Binary(b),
ws::Frame::Continuation(c) => ws::Message::Continuation(c),
ws::Frame::Ping(p) => ws::Message::Ping(p),
ws::Frame::Pong(p) => ws::Message::Pong(p),
ws::Frame::Close(r) => ws::Message::Close(r),
};
ctx.write_raw(msg)
}
}
impl<S> StreamHandler<Result<ws::Message, ProtocolError>> for WebsocketProxy<S>
where
S: Unpin + Sink<ws::Message> + 'static,
{
fn handle(&mut self, item: Result<ws::Message, ProtocolError>, ctx: &mut Self::Context) {
let msg = match item {
Ok(msg) => msg,
Err(err) => return self.error(err, ctx),
};
let _ = self.send.write(msg);
}
}
impl<S> WebsocketProxy<S>
where
S: Unpin + Sink<ws::Message> + 'static,
{
fn error<E>(&mut self, err: E, ctx: &mut <Self as Actor>::Context)
where
E: std::error::Error,
{
let reason = Some(CloseReason {
code: ws::CloseCode::Error,
description: Some(err.to_string()),
});
ctx.close(reason.clone());
let _ = self.send.write(ws::Message::Close(reason));
self.send.close();
ctx.stop();
}
}
pub async fn respond_websocket(
uri: String,
req: HttpRequest,
payload: web::Payload,
) -> Result<HttpResponse, ProxyError> {
let mut res = handshake(&req).map_err(|_| UpgradeRequired)?;
let (_, conn) = awc::Client::new()
.ws(uri)
.connect()
.await
.map_err(|_| BadGateway)?;
let (send, recv) = conn.split();
let out = WebsocketContext::with_factory(payload, |ctx| {
ctx.add_stream(recv);
WebsocketProxy {
send: SinkWrite::new(send, ctx),
}
});
Ok(res.streaming(out))
}
pub struct StaticResponderConfig {
@ -123,7 +274,7 @@ pub async fn respond_static(
return Err(ProxyError::NotFound);
}
return if file_path.is_file() {
if file_path.is_file() {
Ok(NamedFile::open(file_path).unwrap().into_response(&req))
} else {
if let Some(index_file) = &cfg.index {
@ -133,6 +284,6 @@ pub async fn respond_static(
}
}
return Err(ProxyError::NotFound);
};
Err(ProxyError::NotFound)
}
}

View File

@ -12,7 +12,7 @@ use crate::{
};
use crate::proxies::ProxyError;
pub async fn handle(req: HttpRequest, client: web::Data<Client>) -> HttpResponse {
pub async fn handle(req: HttpRequest, payload: web::Payload, client: web::Data<Client>) -> HttpResponse {
let readable_app = ROAD.lock().await;
let (region, location) = match readable_app.filter(req.uri(), req.method(), req.headers()) {
Some(val) => val,
@ -29,6 +29,7 @@ pub async fn handle(req: HttpRequest, client: web::Data<Client>) -> HttpResponse
async fn forward(
end: &Destination,
req: HttpRequest,
payload: web::Payload,
client: web::Data<Client>,
) -> Result<HttpResponse, ProxyError> {
// Handle normal web request
@ -38,7 +39,7 @@ pub async fn handle(req: HttpRequest, client: web::Data<Client>) -> HttpResponse
return Err(ProxyError::NotImplemented);
};
responder::respond_hypertext(uri, req, client).await
responder::respond_hypertext(uri, req, payload, client).await
}
DestinationType::StaticFiles => {
let Ok(cfg) = end.get_static_config() else {
@ -64,7 +65,7 @@ pub async fn handle(req: HttpRequest, client: web::Data<Client>) -> HttpResponse
Some(val) => val.to_str().unwrap().to_string(),
};
match forward(&end, req, client).await {
match forward(&end, req, payload, client).await {
Ok(resp) => {
tokio::spawn(async move {
let writable_app = &mut ROAD.lock().await;

View File

@ -10,7 +10,7 @@ const ClientAddressNotAvailable = {
const StaticClientAddressNotAvailable = {
name: "StaticClientAddressNotAvailable",
title: "`Astro.clientAddress` is not available in static mode.",
message: "`Astro.clientAddress` is only available when using `output: 'server.rs'` or `output: 'hybrid'`. Update your Astro config if you need SSR features.",
message: "`Astro.clientAddress` is only available when using `output: 'server.py.rs'` or `output: 'hybrid'`. Update your Astro config if you need SSR features.",
hint: "See https://docs.astro.build/en/guides/server-side-rendering/ for more information on how to enable SSR."
};
const NoMatchingStaticPathFound = {
@ -212,7 +212,7 @@ const CantRenderPage = {
name: "CantRenderPage",
title: "Astro can't render the route.",
message: "Astro cannot find any content to render for this route. There is no file or redirect associated with this route.",
hint: "If you expect to find a route here, this may be an Astro bug. Please file an issue/restart the dev server.rs"
hint: "If you expect to find a route here, this may be an Astro bug. Please file an issue/restart the dev server.py.rs"
};
function normalizeLF(code) {
@ -524,7 +524,7 @@ function extractDirectives(inputProps, clientDirectives) {
propsWithoutTransitionAttributes: {}
};
for (const [key, value] of Object.entries(inputProps)) {
if (key.startsWith("server.rs:")) {
if (key.startsWith("server.py.rs:")) {
if (key === "server:root") {
extracted.isPage = true;
}

View File

@ -10,7 +10,7 @@ import buffer from 'node:buffer';
import crypto from 'node:crypto';
import http from 'node:http';
import https$1 from 'https';
import enableDestroy from 'server.rs-destroy';
import enableDestroy from 'server.py.rs-destroy';
import path from 'node:path';
import url from 'node:url';
import send from 'send';
@ -1972,7 +1972,7 @@ class NodeApp extends App {
* import { NodeApp } from 'astro/app/node';
* import { createServer } from 'node:http';
*
* const server.rs = createServer(async (req, res) => {
* const server.py.rs = createServer(async (req, res) => {
* const request = NodeApp.createRequest(req);
* const response = await app.render(request);
* await NodeApp.writeResponse(response, res);
@ -2003,7 +2003,7 @@ class NodeApp extends App {
* import { NodeApp } from 'astro/app/node';
* import { createServer } from 'node:http';
*
* const server.rs = createServer(async (req, res) => {
* const server.py.rs = createServer(async (req, res) => {
* const request = NodeApp.createRequest(req);
* const response = await app.render(request);
* await NodeApp.writeResponse(response, res);
@ -2032,7 +2032,7 @@ class NodeApp extends App {
result = await reader.read();
}
} catch {
destination.write("Internal server.rs error");
destination.write("Internal server.py.rs error");
}
}
destination.end();
@ -2116,7 +2116,7 @@ function createStaticHandler(app, options) {
if (forwardError) {
console.error(err.toString());
res.writeHead(500);
res.end("Internal server.rs error");
res.end("Internal server.py.rs error");
return;
}
ssr();
@ -2340,7 +2340,7 @@ const _manifest = Object.assign(manifest, {
pageMap,
renderers,
});
const _args = {"mode":"standalone","client":"file:///Users/littlesheep/Documents/Projects/Capital/dist/client/","server":"file:///Users/littlesheep/Documents/Projects/Capital/dist/server.rs/","host":false,"port":4321,"assets":"_astro"};
const _args = {"mode":"standalone","client":"file:///Users/littlesheep/Documents/Projects/Capital/dist/client/","server":"file:///Users/littlesheep/Documents/Projects/Capital/dist/server.py.rs/","host":false,"port":4321,"assets":"_astro"};
const _exports = createExports(_manifest, _args);
const handler = _exports['handler'];

View File

@ -1,5 +1,5 @@
import React, { createElement } from 'react';
import ReactDOM from 'react-dom/server.rs';
import ReactDOM from 'react-dom/server.py.rs';
/**
* Astro passes `children` as a string of HTML, so we need
@ -258,6 +258,6 @@ const _renderer0 = {
supportsAstroStaticSlot: true,
};
const renderers = [Object.assign({"name":"@astrojs/react","clientEntrypoint":"@astrojs/react/client.js","serverEntrypoint":"@astrojs/react/server.rs.js"}, { ssr: _renderer0 }),];
const renderers = [Object.assign({"name":"@astrojs/react","clientEntrypoint":"@astrojs/react/client.js","serverEntrypoint":"@astrojs/react/server.py.rs.js"}, { ssr: _renderer0 }),];
export { renderers };

29
test/websocket/server.py Normal file
View File

@ -0,0 +1,29 @@
import asyncio
import websockets
async def handle_websocket(websocket, path):
# This function will be called whenever a new WebSocket connection is established
# Send a welcome message to the client
await websocket.send("Welcome to the WebSocket server!")
try:
# Enter the main loop to handle incoming messages
async for message in websocket:
# Print the received message
print(f"Received message: {message}")
# Send a response back to the client
response = f"Server received: {message}"
await websocket.send(response)
except websockets.exceptions.ConnectionClosedError:
print("Connection closed by the client.")
# Create the WebSocket server
start_server = websockets.serve(handle_websocket, "localhost", 8765)
print("WebSocket server started at ws://localhost:8765")
# Run the server indefinitely
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()