🔀 Merge branch 'refactor/seprate-auth'

# Conflicts:
#	DysonNetwork.Sphere/Chat/Realtime/LiveKitService.cs
#	DysonNetwork.Sphere/Chat/RealtimeCallController.cs
#	DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs
#	DysonNetwork.sln.DotSettings.user
This commit is contained in:
2025-07-20 02:28:42 +08:00
388 changed files with 26638 additions and 105969 deletions

View File

@@ -0,0 +1,45 @@
namespace DysonNetwork.Pusher.Connection;
public class ClientTypeMiddleware(RequestDelegate next)
{
public async Task Invoke(HttpContext context)
{
var headers = context.Request.Headers;
bool isWebPage;
// Priority 1: Check for custom header
if (headers.TryGetValue("X-Client", out var clientType))
{
isWebPage = clientType.ToString().Length == 0;
}
else
{
var userAgent = headers.UserAgent.ToString();
var accept = headers.Accept.ToString();
// Priority 2: Check known app User-Agent (backward compatibility)
if (!string.IsNullOrEmpty(userAgent) && userAgent.Contains("Solian"))
isWebPage = false;
// Priority 3: Accept header can help infer intent
else if (!string.IsNullOrEmpty(accept) && accept.Contains("text/html"))
isWebPage = true;
else if (!string.IsNullOrEmpty(accept) && accept.Contains("application/json"))
isWebPage = false;
else
isWebPage = true;
}
context.Items["IsWebPage"] = isWebPage;
var redirectWhiteList = new[] { "/ws", "/.well-known", "/swagger" };
if(redirectWhiteList.Any(w => context.Request.Path.StartsWithSegments(w)))
await next(context);
else if (!isWebPage && !context.Request.Path.StartsWithSegments("/api"))
context.Response.Redirect(
$"/api{context.Request.Path.Value}{context.Request.QueryString.Value}",
permanent: false
);
else
await next(context);
}
}

View File

@@ -0,0 +1,17 @@
using System.Net.WebSockets;
using DysonNetwork.Shared.Proto;
namespace DysonNetwork.Pusher.Connection;
public interface IWebSocketPacketHandler
{
string PacketType { get; }
Task HandleAsync(
Account currentUser,
string deviceId,
WebSocketPacket packet,
WebSocket socket,
WebSocketService srv
);
}

View File

@@ -0,0 +1,107 @@
using System.Net.WebSockets;
using DysonNetwork.Shared.Proto;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Swashbuckle.AspNetCore.Annotations;
namespace DysonNetwork.Pusher.Connection;
[ApiController]
[Route("/ws")]
public class WebSocketController(WebSocketService ws, ILogger<WebSocketContext> logger) : ControllerBase
{
[Route("/ws")]
[Authorize]
[SwaggerIgnore]
public async Task TheGateway()
{
HttpContext.Items.TryGetValue("CurrentUser", out var currentUserValue);
HttpContext.Items.TryGetValue("CurrentSession", out var currentSessionValue);
if (currentUserValue is not Account currentUser ||
currentSessionValue is not AuthSession currentSession)
{
HttpContext.Response.StatusCode = StatusCodes.Status401Unauthorized;
return;
}
var accountId = currentUser.Id!;
var deviceId = currentSession.Challenge.DeviceId!;
if (string.IsNullOrEmpty(deviceId))
{
HttpContext.Response.StatusCode = StatusCodes.Status400BadRequest;
return;
}
using var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
var cts = new CancellationTokenSource();
var connectionKey = (accountId, deviceId);
if (!ws.TryAdd(connectionKey, webSocket, cts))
{
await webSocket.CloseAsync(
WebSocketCloseStatus.PolicyViolation,
"Too many connections from the same device and account.",
CancellationToken.None
);
return;
}
logger.LogInformation(
$"Connection established with user @{currentUser.Name}#{currentUser.Id} and device #{deviceId}");
try
{
await _ConnectionEventLoop(deviceId, currentUser, webSocket, cts.Token);
}
catch (Exception ex)
{
Console.WriteLine($"WebSocket Error: {ex.Message}");
}
finally
{
ws.Disconnect(connectionKey);
logger.LogInformation(
$"Connection disconnected with user @{currentUser.Name}#{currentUser.Id} and device #{deviceId}");
}
}
private async Task _ConnectionEventLoop(
string deviceId,
Account currentUser,
WebSocket webSocket,
CancellationToken cancellationToken
)
{
var connectionKey = (AccountId: currentUser.Id, DeviceId: deviceId);
var buffer = new byte[1024 * 4];
try
{
var receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer),
cancellationToken
);
while (!receiveResult.CloseStatus.HasValue)
{
receiveResult = await webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer),
cancellationToken
);
var packet = WebSocketPacket.FromBytes(buffer[..receiveResult.Count]);
_ = ws.HandlePacket(currentUser, connectionKey.DeviceId, packet, webSocket);
}
}
catch (OperationCanceledException)
{
if (
webSocket.State != WebSocketState.Closed
&& webSocket.State != WebSocketState.Aborted
)
{
ws.Disconnect(connectionKey);
}
}
}
}

View File

@@ -0,0 +1,87 @@
using System.Text.Json;
using DysonNetwork.Shared.Proto;
using NodaTime;
using NodaTime.Serialization.SystemTextJson;
namespace DysonNetwork.Pusher.Connection;
public class WebSocketPacket
{
public string Type { get; set; } = null!;
public object? Data { get; set; } = null!;
public string? Endpoint { get; set; }
public string? ErrorMessage { get; set; }
/// <summary>
/// Creates a WebSocketPacket from raw WebSocket message bytes
/// </summary>
/// <param name="bytes">Raw WebSocket message bytes</param>
/// <returns>Deserialized WebSocketPacket</returns>
public static WebSocketPacket FromBytes(byte[] bytes)
{
var json = System.Text.Encoding.UTF8.GetString(bytes);
var jsonOpts = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
DictionaryKeyPolicy = JsonNamingPolicy.SnakeCaseLower,
};
return JsonSerializer.Deserialize<WebSocketPacket>(json, jsonOpts) ??
throw new JsonException("Failed to deserialize WebSocketPacket");
}
/// <summary>
/// Deserializes the Data property to the specified type T
/// </summary>
/// <typeparam name="T">Target type to deserialize to</typeparam>
/// <returns>Deserialized data of type T</returns>
public T? GetData<T>()
{
if (Data is T typedData)
return typedData;
var jsonOpts = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
DictionaryKeyPolicy = JsonNamingPolicy.SnakeCaseLower,
};
return JsonSerializer.Deserialize<T>(
JsonSerializer.Serialize(Data, jsonOpts),
jsonOpts
);
}
/// <summary>
/// Serializes this WebSocketPacket to a byte array for sending over WebSocket
/// </summary>
/// <returns>Byte array representation of the packet</returns>
public byte[] ToBytes()
{
var jsonOpts = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
DictionaryKeyPolicy = JsonNamingPolicy.SnakeCaseLower,
}.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb);
var json = JsonSerializer.Serialize(this, jsonOpts);
return System.Text.Encoding.UTF8.GetBytes(json);
}
public Shared.Proto.WebSocketPacket ToProtoValue()
{
return new Shared.Proto.WebSocketPacket
{
Type = Type,
Data = GrpcTypeHelper.ConvertClassToValue(Data),
ErrorMessage = ErrorMessage
};
}
public static WebSocketPacket FromProtoValue(Shared.Proto.WebSocketPacket packet)
{
return new WebSocketPacket
{
Type = packet.Type,
Data = GrpcTypeHelper.ConvertValueToObject(packet.Data),
ErrorMessage = packet.ErrorMessage
};
}
}

View File

@@ -0,0 +1,179 @@
using System.Collections.Concurrent;
using System.Net.WebSockets;
using dotnet_etcd.interfaces;
using DysonNetwork.Shared.Data;
using DysonNetwork.Shared.Proto;
using Grpc.Core;
namespace DysonNetwork.Pusher.Connection;
public class WebSocketService
{
private readonly IConfiguration _configuration;
private readonly ILogger<WebSocketService> _logger;
private readonly IEtcdClient _etcdClient;
private readonly IDictionary<string, IWebSocketPacketHandler> _handlerMap;
public WebSocketService(
IEnumerable<IWebSocketPacketHandler> handlers,
IEtcdClient etcdClient,
ILogger<WebSocketService> logger,
IConfiguration configuration
)
{
_etcdClient = etcdClient;
_logger = logger;
_configuration = configuration;
_handlerMap = handlers.ToDictionary(h => h.PacketType);
}
private static readonly ConcurrentDictionary<
(string AccountId, string DeviceId),
(WebSocket Socket, CancellationTokenSource Cts)
> ActiveConnections = new();
private static readonly ConcurrentDictionary<string, string> ActiveSubscriptions = new(); // deviceId -> chatRoomId
public bool TryAdd(
(string AccountId, string DeviceId) key,
WebSocket socket,
CancellationTokenSource cts
)
{
if (ActiveConnections.TryGetValue(key, out _))
Disconnect(key,
"Just connected somewhere else with the same identifier."); // Disconnect the previous one using the same identifier
return ActiveConnections.TryAdd(key, (socket, cts));
}
public void Disconnect((string AccountId, string DeviceId) key, string? reason = null)
{
if (!ActiveConnections.TryGetValue(key, out var data)) return;
data.Socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
reason ?? "Server just decided to disconnect.",
CancellationToken.None
);
data.Cts.Cancel();
ActiveConnections.TryRemove(key, out _);
}
public bool GetDeviceIsConnected(string deviceId)
{
return ActiveConnections.Any(c => c.Key.DeviceId == deviceId);
}
public bool GetAccountIsConnected(string accountId)
{
return ActiveConnections.Any(c => c.Key.AccountId == accountId);
}
public void SendPacketToAccount(string userId, WebSocketPacket packet)
{
var connections = ActiveConnections.Where(c => c.Key.AccountId == userId);
var packetBytes = packet.ToBytes();
var segment = new ArraySegment<byte>(packetBytes);
foreach (var connection in connections)
{
connection.Value.Socket.SendAsync(
segment,
WebSocketMessageType.Binary,
true,
CancellationToken.None
);
}
}
public void SendPacketToDevice(string deviceId, WebSocketPacket packet)
{
var connections = ActiveConnections.Where(c => c.Key.DeviceId == deviceId);
var packetBytes = packet.ToBytes();
var segment = new ArraySegment<byte>(packetBytes);
foreach (var connection in connections)
{
connection.Value.Socket.SendAsync(
segment,
WebSocketMessageType.Binary,
true,
CancellationToken.None
);
}
}
public async Task HandlePacket(
Account currentUser,
string deviceId,
WebSocketPacket packet,
WebSocket socket
)
{
if (_handlerMap.TryGetValue(packet.Type, out var handler))
{
await handler.HandleAsync(currentUser, deviceId, packet, socket, this);
return;
}
if (packet.Endpoint is not null)
{
try
{
// Get the service URL from etcd for the specified endpoint
var serviceKey = $"/services/{packet.Endpoint}";
var response = await _etcdClient.GetAsync(serviceKey);
if (response.Kvs.Count > 0)
{
var serviceUrl = response.Kvs[0].Value.ToStringUtf8();
var clientCertPath = _configuration["Service:ClientCert"]!;
var clientKeyPath = _configuration["Service:ClientKey"]!;
var clientCertPassword = _configuration["Service:CertPassword"];
var callInvoker =
GrpcClientHelper.CreateCallInvoker(
serviceUrl,
clientCertPath,
clientKeyPath,
clientCertPassword
);
var client = new PusherHandlerService.PusherHandlerServiceClient(callInvoker);
try
{
await client.ReceiveWebSocketPacketAsync(new ReceiveWebSocketPacketRequest
{
Account = currentUser,
DeviceId = deviceId,
Packet = packet.ToProtoValue()
});
}
catch (RpcException ex)
{
_logger.LogError(ex, $"Error forwarding packet to endpoint: {packet.Endpoint}");
}
return;
}
_logger.LogWarning($"No service registered for endpoint: {packet.Endpoint}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error forwarding packet to endpoint: {packet.Endpoint}");
}
}
await socket.SendAsync(
new ArraySegment<byte>(new WebSocketPacket
{
Type = WebSocketPacketType.Error,
ErrorMessage = $"Unprocessable packet: {packet.Type}"
}.ToBytes()),
WebSocketMessageType.Binary,
true,
CancellationToken.None
);
}
}