From 7d63123fd27ddb20e921e2228b33381f067f98be Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Tue, 16 Jul 2024 14:53:57 +0800 Subject: [PATCH] :recycle: Use dealer's websocket service instead of own --- pkg/internal/server/api/events_ws.go | 48 ---------------------------- pkg/internal/server/api/index.go | 9 ------ pkg/internal/services/connections.go | 47 --------------------------- pkg/internal/services/encryptor.go | 12 ------- pkg/internal/services/websocket.go | 20 ++++++++++++ 5 files changed, 20 insertions(+), 116 deletions(-) delete mode 100644 pkg/internal/server/api/events_ws.go delete mode 100644 pkg/internal/services/connections.go delete mode 100644 pkg/internal/services/encryptor.go create mode 100644 pkg/internal/services/websocket.go diff --git a/pkg/internal/server/api/events_ws.go b/pkg/internal/server/api/events_ws.go deleted file mode 100644 index afbc714..0000000 --- a/pkg/internal/server/api/events_ws.go +++ /dev/null @@ -1,48 +0,0 @@ -package api - -import ( - "git.solsynth.dev/hydrogen/messaging/pkg/internal/models" - "git.solsynth.dev/hydrogen/messaging/pkg/internal/services" - "github.com/gofiber/contrib/websocket" - jsoniter "github.com/json-iterator/go" - "github.com/rs/zerolog/log" -) - -func messageGateway(c *websocket.Conn) { - user := c.Locals("user").(models.Account) - - // Push connection - services.ClientRegister(user, c) - log.Debug().Uint("user", user.ID).Msg("New websocket connection established...") - - // Event loop - var task models.UnifiedCommand - - var messageType int - var packet []byte - var err error - - for { - if messageType, packet, err = c.ReadMessage(); err != nil { - break - } else if err := jsoniter.Unmarshal(packet, &task); err != nil { - _ = c.WriteMessage(messageType, models.UnifiedCommand{ - Action: "error", - Message: "unable to unmarshal your command, requires json request", - }.Marshal()) - continue - } - - message := services.DealCommand(task, user) - - if message != nil { - if err = c.WriteMessage(messageType, message.Marshal()); err != nil { - break - } - } - } - - // Pop connection - services.ClientUnregister(user, c) - log.Debug().Uint("user", user.ID).Msg("A websocket connection disconnected...") -} diff --git a/pkg/internal/server/api/index.go b/pkg/internal/server/api/index.go index 96a3adc..e91d598 100644 --- a/pkg/internal/server/api/index.go +++ b/pkg/internal/server/api/index.go @@ -1,8 +1,6 @@ package api import ( - "git.solsynth.dev/hydrogen/messaging/pkg/internal/gap" - "github.com/gofiber/contrib/websocket" "github.com/gofiber/fiber/v2" ) @@ -47,12 +45,5 @@ func MapAPIs(app *fiber.App) { channels.Delete("/:channel/calls/ongoing", endCall) channels.Post("/:channel/calls/ongoing/token", exchangeCallToken) } - - api.Use(func(c *fiber.Ctx) error { - if err := gap.H.EnsureAuthenticated(c); err != nil { - return err - } - return c.Next() - }).Get("/ws", websocket.New(messageGateway)) } } diff --git a/pkg/internal/services/connections.go b/pkg/internal/services/connections.go deleted file mode 100644 index 76efb01..0000000 --- a/pkg/internal/services/connections.go +++ /dev/null @@ -1,47 +0,0 @@ -package services - -import ( - "sync" - - "git.solsynth.dev/hydrogen/messaging/pkg/internal/models" - "github.com/gofiber/contrib/websocket" -) - -var ( - wsMutex sync.Mutex - wsConn = make(map[uint]map[*websocket.Conn]bool) -) - -func ClientRegister(user models.Account, conn *websocket.Conn) { - wsMutex.Lock() - if wsConn[user.ID] == nil { - wsConn[user.ID] = make(map[*websocket.Conn]bool) - } - wsConn[user.ID][conn] = true - wsMutex.Unlock() -} - -func ClientUnregister(user models.Account, conn *websocket.Conn) { - wsMutex.Lock() - if wsConn[user.ID] == nil { - wsConn[user.ID] = make(map[*websocket.Conn]bool) - } - delete(wsConn[user.ID], conn) - wsMutex.Unlock() -} - -func PushCommand(userId uint, task models.UnifiedCommand) { - for conn := range wsConn[userId] { - _ = conn.WriteMessage(1, task.Marshal()) - } -} - -func DealCommand(task models.UnifiedCommand, user models.Account) *models.UnifiedCommand { - switch task.Action { - default: - return &models.UnifiedCommand{ - Action: "error", - Message: "command not found", - } - } -} diff --git a/pkg/internal/services/encryptor.go b/pkg/internal/services/encryptor.go deleted file mode 100644 index 8700731..0000000 --- a/pkg/internal/services/encryptor.go +++ /dev/null @@ -1,12 +0,0 @@ -package services - -import "golang.org/x/crypto/bcrypt" - -func HashPassword(raw string) string { - data, _ := bcrypt.GenerateFromPassword([]byte(raw), 12) - return string(data) -} - -func VerifyPassword(text string, password string) bool { - return bcrypt.CompareHashAndPassword([]byte(password), []byte(text)) == nil -} diff --git a/pkg/internal/services/websocket.go b/pkg/internal/services/websocket.go new file mode 100644 index 0000000..e4b8501 --- /dev/null +++ b/pkg/internal/services/websocket.go @@ -0,0 +1,20 @@ +package services + +import ( + "context" + "git.solsynth.dev/hydrogen/dealer/pkg/proto" + "git.solsynth.dev/hydrogen/messaging/pkg/internal/gap" + "git.solsynth.dev/hydrogen/messaging/pkg/internal/models" + "time" +) + +func PushCommand(userId uint, task models.UnifiedCommand) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + pc := gap.H.GetDealerGrpcConn() + _, _ = proto.NewStreamControllerClient(pc).PushStream(ctx, &proto.PushStreamRequest{ + UserId: uint64(userId), + Body: task.Marshal(), + }) +}