From ea150608b4b9b062d41d745915cff455e568a8b1 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Sat, 2 Nov 2024 13:09:52 +0800 Subject: [PATCH] :sparkles: Implement websocket gateway --- pkg/internal/http/ws/ws.go | 65 ++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/pkg/internal/http/ws/ws.go b/pkg/internal/http/ws/ws.go index 4749251..828af9a 100644 --- a/pkg/internal/http/ws/ws.go +++ b/pkg/internal/http/ws/ws.go @@ -1,11 +1,16 @@ package ws import ( + "context" + "fmt" + "git.solsynth.dev/hypernet/nexus/pkg/internal/directory" "git.solsynth.dev/hypernet/nexus/pkg/nex" "git.solsynth.dev/hypernet/nexus/pkg/nex/sec" + "git.solsynth.dev/hypernet/nexus/pkg/proto" "github.com/gofiber/contrib/websocket" jsoniter "github.com/json-iterator/go" "github.com/rs/zerolog/log" + "github.com/samber/lo" "github.com/spf13/viper" ) @@ -42,38 +47,36 @@ func Listen(c *websocket.Conn) { packet.Endpoint = val } - /* - service := directory.GetServiceInstanceByType(packet.Endpoint) - if service == nil { - _ = c.WriteMessage(mt, nex.NetworkPackage{ - Action: "error", - Message: "service not found", - }.Marshal()) - continue - } - pc, err := service.GetGrpcConn() - if err != nil { - _ = c.WriteMessage(mt, nex.NetworkPackage{ - Action: "error", - Message: fmt.Sprintf("unable to connect to service: %v", err.Error()), - }.Marshal()) - continue - } + service := directory.GetServiceInstanceByType(packet.Endpoint) + if service == nil { + _ = c.WriteMessage(mt, nex.WebSocketPackage{ + Action: "error", + Message: "service not found", + }.Marshal()) + continue + } + pc, err := service.GetGrpcConn() + if err != nil { + _ = c.WriteMessage(mt, nex.WebSocketPackage{ + Action: "error", + Message: fmt.Sprintf("unable to connect to service: %v", err.Error()), + }.Marshal()) + continue + } - sc := proto.NewStreamServiceClient(pc) - _, err = sc.EmitStreamEvent(context.Background(), &proto.StreamEventRequest{ - Event: packet.Action, - UserId: uint64(user.ID), - ClientId: uint64(clientId), - Payload: packet.RawPayload(), - }) - if err != nil { - _ = c.WriteMessage(mt, nex.NetworkPackage{ - Action: "error", - Message: fmt.Sprintf("unable send message to service: %v", err.Error()), - }.Marshal()) - continue - }*/ + sc := proto.NewStreamServiceClient(pc) + _, err = sc.PushStream(context.Background(), &proto.PushStreamRequest{ + UserId: lo.ToPtr(uint64(user.ID)), + ClientId: lo.ToPtr(clientId), + Body: packet.Marshal(), + }) + if err != nil { + _ = c.WriteMessage(mt, nex.WebSocketPackage{ + Action: "error", + Message: fmt.Sprintf("unable send message to service: %v", err.Error()), + }.Marshal()) + continue + } } // Pop connection