From 0fbb4833016c8fb2249943951fe8ffeb8ae34158 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Tue, 16 Jul 2024 14:44:00 +0800 Subject: [PATCH 1/2] :recycle: Moved to dealer --- go.mod | 7 +- go.sum | 8 +-- pkg/internal/gap/client.go | 12 ---- pkg/internal/gap/server.go | 47 +++++++------- pkg/internal/server/exts/auth.go | 4 +- pkg/internal/services/accounts.go | 76 ++++++++++++---------- pkg/internal/services/attachments.go | 3 +- pkg/internal/services/auth.go | 4 +- pkg/internal/services/calls.go | 2 +- pkg/internal/services/channel_members.go | 4 +- pkg/internal/services/events.go | 2 +- pkg/internal/services/jwt.go | 81 ------------------------ pkg/internal/services/realms.go | 25 ++++---- pkg/main.go | 6 +- settings.toml | 6 +- 15 files changed, 102 insertions(+), 185 deletions(-) delete mode 100644 pkg/internal/gap/client.go delete mode 100644 pkg/internal/services/jwt.go diff --git a/go.mod b/go.mod index 349d74d..1764ba4 100644 --- a/go.mod +++ b/go.mod @@ -5,14 +5,13 @@ go 1.22 toolchain go1.22.1 require ( + git.solsynth.dev/hydrogen/dealer v0.0.0-20240716024524-cfb73fde1951 git.solsynth.dev/hydrogen/paperclip v0.0.0-20240622051057-0f56dba45745 - git.solsynth.dev/hydrogen/passport v0.0.0-20240623083719-86b2cd81400a github.com/go-playground/validator/v10 v10.17.0 github.com/gofiber/contrib/websocket v1.3.0 github.com/gofiber/fiber/v2 v2.52.4 github.com/gofiber/template/html/v2 v2.1.1 - github.com/golang-jwt/jwt/v5 v5.2.0 - github.com/hashicorp/consul/api v1.29.1 + github.com/google/uuid v1.6.0 github.com/jordan-wright/email v4.0.1-0.20210109023952-943e75fe5223+incompatible github.com/json-iterator/go v1.1.12 github.com/livekit/protocol v1.14.0 @@ -54,8 +53,8 @@ require ( github.com/go-sql-driver/mysql v1.7.1 // indirect github.com/gofiber/template v1.8.3 // indirect github.com/gofiber/utils v1.1.0 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.1 // indirect + github.com/hashicorp/consul/api v1.29.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-hclog v1.6.3 // indirect diff --git a/go.sum b/go.sum index 35d1011..c48ee9e 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,7 @@ +git.solsynth.dev/hydrogen/dealer v0.0.0-20240716024524-cfb73fde1951 h1:RgZJK4PXhrjhUX75BclBCg2xfetMkswcMwMqfTBtryQ= +git.solsynth.dev/hydrogen/dealer v0.0.0-20240716024524-cfb73fde1951/go.mod h1:eZwAwP7ahL7TO8GWBlYFYDdjlna+8zHYbDfNabnuUEU= git.solsynth.dev/hydrogen/paperclip v0.0.0-20240622051057-0f56dba45745 h1:40BUsQMNXjqHyytkyF9py1HjTAWlRgO6R57YXUrHNy4= git.solsynth.dev/hydrogen/paperclip v0.0.0-20240622051057-0f56dba45745/go.mod h1:FsQGSLTl0gvo+9Jmbot02S72suyF9tFTrzDj70Xhifo= -git.solsynth.dev/hydrogen/passport v0.0.0-20240623081149-7ddbea8bcb86 h1:cxjBOhemnyxf49CJyPmnt1RKFHerK45rXXBPwsIFrhA= -git.solsynth.dev/hydrogen/passport v0.0.0-20240623081149-7ddbea8bcb86/go.mod h1:tUr7x1v0trG3ALDacdDuhJiPRPgFhJ1Si9OqNlYbgSk= -git.solsynth.dev/hydrogen/passport v0.0.0-20240623083719-86b2cd81400a h1:oxNbS6Q+sSOfYUQTcx9/awCmlk/lpVP997hgpMmFsyw= -git.solsynth.dev/hydrogen/passport v0.0.0-20240623083719-86b2cd81400a/go.mod h1:tUr7x1v0trG3ALDacdDuhJiPRPgFhJ1Si9OqNlYbgSk= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -103,8 +101,6 @@ github.com/gofiber/template/html/v2 v2.1.1/go.mod h1:2G0GHHOUx70C1LDncoBpe4T6maQ github.com/gofiber/utils v1.1.0 h1:vdEBpn7AzIUJRhe+CiTOJdUcTg4Q9RK+pEa0KPbLdrM= github.com/gofiber/utils v1.1.0/go.mod h1:poZpsnhBykfnY1Mc0KeEa6mSHrS3dV0+oBWyeQmb2e0= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= -github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= -github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= diff --git a/pkg/internal/gap/client.go b/pkg/internal/gap/client.go deleted file mode 100644 index 6a6a39e..0000000 --- a/pkg/internal/gap/client.go +++ /dev/null @@ -1,12 +0,0 @@ -package gap - -import ( - "git.solsynth.dev/hydrogen/passport/pkg/hyper" - "github.com/spf13/viper" -) - -var H *hyper.HyperConn - -func NewHyperClient() { - H = hyper.NewHyperConn(viper.GetString("consul.addr")) -} diff --git a/pkg/internal/gap/server.go b/pkg/internal/gap/server.go index 77489c4..c6c026b 100644 --- a/pkg/internal/gap/server.go +++ b/pkg/internal/gap/server.go @@ -2,38 +2,41 @@ package gap import ( "fmt" - "strconv" + "git.solsynth.dev/hydrogen/dealer/pkg/hyper" + "git.solsynth.dev/hydrogen/dealer/pkg/proto" + "github.com/rs/zerolog/log" "strings" - "github.com/hashicorp/consul/api" "github.com/spf13/viper" ) -func Register() error { - cfg := api.DefaultConfig() - cfg.Address = viper.GetString("consul.addr") - - client, err := api.NewClient(cfg) - if err != nil { - return err - } +var H *hyper.HyperConn +func RegisterService() error { grpcBind := strings.SplitN(viper.GetString("grpc_bind"), ":", 2) + httpBind := strings.SplitN(viper.GetString("bind"), ":", 2) outboundIp, _ := GetOutboundIP() - port, _ := strconv.Atoi(grpcBind[1]) - registration := new(api.AgentServiceRegistration) - registration.ID = viper.GetString("id") - registration.Name = "Hydrogen.Messaging" - registration.Address = outboundIp.String() - registration.Port = port - registration.Check = &api.AgentServiceCheck{ - GRPC: fmt.Sprintf("%s:%s", outboundIp, grpcBind[1]), - Timeout: "5s", - Interval: "1m", - DeregisterCriticalServiceAfter: "3m", + grpcOutbound := fmt.Sprintf("%s:%s", outboundIp, grpcBind[1]) + httpOutbound := fmt.Sprintf("%s:%s", outboundIp, httpBind[1]) + + var err error + H, err = hyper.NewHyperConn(viper.GetString("dealer.addr"), &proto.ServiceInfo{ + Id: viper.GetString("id"), + Type: hyper.ServiceTypeMessagingProvider, + Label: "Messaging", + GrpcAddr: grpcOutbound, + HttpAddr: &httpOutbound, + }) + if err == nil { + go func() { + err := H.KeepRegisterService() + if err != nil { + log.Error().Err(err).Msg("An error occurred while registering service...") + } + }() } - return client.Agent().ServiceRegister(registration) + return err } diff --git a/pkg/internal/server/exts/auth.go b/pkg/internal/server/exts/auth.go index fd0e0fa..58b641b 100644 --- a/pkg/internal/server/exts/auth.go +++ b/pkg/internal/server/exts/auth.go @@ -1,13 +1,13 @@ package exts import ( + "git.solsynth.dev/hydrogen/dealer/pkg/proto" "git.solsynth.dev/hydrogen/messaging/pkg/internal/services" - "git.solsynth.dev/hydrogen/passport/pkg/proto" "github.com/gofiber/fiber/v2" ) func LinkAccountMiddleware(c *fiber.Ctx) error { - if val, ok := c.Locals("p_user").(*proto.Userinfo); ok { + if val, ok := c.Locals("p_user").(*proto.UserInfo); ok { if account, err := services.LinkAccount(val); err != nil { return fiber.NewError(fiber.StatusInternalServerError, err.Error()) } else { diff --git a/pkg/internal/services/accounts.go b/pkg/internal/services/accounts.go index f662356..4fc985c 100644 --- a/pkg/internal/services/accounts.go +++ b/pkg/internal/services/accounts.go @@ -3,57 +3,69 @@ package services import ( "context" "fmt" + "git.solsynth.dev/hydrogen/dealer/pkg/hyper" "git.solsynth.dev/hydrogen/messaging/pkg/internal/database" "git.solsynth.dev/hydrogen/messaging/pkg/internal/gap" + jsoniter "github.com/json-iterator/go" "time" + "git.solsynth.dev/hydrogen/dealer/pkg/proto" "git.solsynth.dev/hydrogen/messaging/pkg/internal/models" - "git.solsynth.dev/hydrogen/passport/pkg/proto" - "github.com/spf13/viper" ) -func GetAccountFriend(userId, relatedId uint, status int) (*proto.FriendshipResponse, error) { +func CheckUserPerm(userId, otherId uint, key string, val any) error { var user models.Account if err := database.C.Where("id = ?", userId).First(&user).Error; err != nil { - return nil, err + return fmt.Errorf("account not found: %v", err) } - var related models.Account - if err := database.C.Where("id = ?", relatedId).First(&related).Error; err != nil { - return nil, err + var other models.Account + if err := database.C.Where("id = ?", otherId).First(&other).Error; err != nil { + return fmt.Errorf("other not found: %v", err) } ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - pc, err := gap.H.DiscoverServiceGRPC("Hydrogen.Passport") - if err != nil { - return nil, err - } - return proto.NewFriendshipsClient(pc).GetFriendship(ctx, &proto.FriendshipTwoSideLookupRequest{ - AccountId: uint64(user.ExternalID), - RelatedId: uint64(related.ExternalID), - Status: uint32(status), - }) -} + encodedData, _ := jsoniter.Marshal(val) -func NotifyAccountMessager(user models.Account, t, s, c string, realtime bool, forcePush bool, links ...*proto.NotifyLink) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - - pc, err := gap.H.DiscoverServiceGRPC("Hydrogen.Passport") + pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider) if err != nil { return err } - _, err = proto.NewNotifyClient(pc).NotifyUser(ctx, &proto.NotifyRequest{ - ClientId: viper.GetString("passport.client_id"), - ClientSecret: viper.GetString("passport.client_secret"), - Type: fmt.Sprintf("messaging.%s", t), - Subject: s, - Content: c, - Links: links, - RecipientId: uint64(user.ExternalID), - IsRealtime: realtime, - IsForcePush: forcePush, + out, err := proto.NewAuthClient(pc).EnsureUserPermGranted(ctx, &proto.CheckUserPermRequest{ + UserId: uint64(user.ExternalID), + OtherId: uint64(other.ExternalID), + Key: key, + Value: encodedData, + }) + + if err != nil { + return err + } else if !out.IsValid { + return fmt.Errorf("missing permission: %v", key) + } + + return nil +} + +func NotifyAccountMessager(user models.Account, title, body string, subtitle *string, realtime bool, forcePush bool) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider) + if err != nil { + return err + } + _, err = proto.NewNotifierClient(pc).NotifyUser(ctx, &proto.NotifyUserRequest{ + UserId: uint64(user.ID), + Notify: &proto.NotifyRequest{ + Topic: "messaging.message", + Title: title, + Subtitle: subtitle, + Body: body, + IsRealtime: realtime, + IsForcePush: forcePush, + }, }) return err diff --git a/pkg/internal/services/attachments.go b/pkg/internal/services/attachments.go index a44db6e..35f9140 100644 --- a/pkg/internal/services/attachments.go +++ b/pkg/internal/services/attachments.go @@ -2,6 +2,7 @@ package services import ( "context" + "git.solsynth.dev/hydrogen/dealer/pkg/hyper" "git.solsynth.dev/hydrogen/messaging/pkg/internal/gap" "git.solsynth.dev/hydrogen/paperclip/pkg/proto" @@ -9,7 +10,7 @@ import ( ) func CheckAttachmentByIDExists(id uint, usage string) bool { - pc, err := gap.H.DiscoverServiceGRPC("Hydrogen.Passport") + pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider) if err != nil { return false } diff --git a/pkg/internal/services/auth.go b/pkg/internal/services/auth.go index 7d9dcf6..1401aea 100644 --- a/pkg/internal/services/auth.go +++ b/pkg/internal/services/auth.go @@ -3,14 +3,14 @@ package services import ( "errors" "fmt" + "git.solsynth.dev/hydrogen/dealer/pkg/proto" "git.solsynth.dev/hydrogen/messaging/pkg/internal/database" "git.solsynth.dev/hydrogen/messaging/pkg/internal/models" - "git.solsynth.dev/hydrogen/passport/pkg/proto" "gorm.io/gorm" "reflect" ) -func LinkAccount(userinfo *proto.Userinfo) (models.Account, error) { +func LinkAccount(userinfo *proto.UserInfo) (models.Account, error) { var account models.Account if userinfo == nil { return account, fmt.Errorf("remote userinfo was not found") diff --git a/pkg/internal/services/calls.go b/pkg/internal/services/calls.go index b7513f3..8d605ac 100644 --- a/pkg/internal/services/calls.go +++ b/pkg/internal/services/calls.go @@ -99,9 +99,9 @@ func NewCall(channel models.Channel, founder models.ChannelMember) (models.Call, for _, member := range members { if member.ID != call.Founder.ID { err = NotifyAccountMessager(member.Account, - "incomingCall", fmt.Sprintf("Call in #%s", channel.Alias), fmt.Sprintf("%s started a new call", call.Founder.Account.Name), + nil, false, true, ) diff --git a/pkg/internal/services/channel_members.go b/pkg/internal/services/channel_members.go index 2e5e573..d2a9675 100644 --- a/pkg/internal/services/channel_members.go +++ b/pkg/internal/services/channel_members.go @@ -33,8 +33,8 @@ func GetChannelMember(user models.Account, channelId uint) (models.ChannelMember } func AddChannelMemberWithCheck(user models.Account, target models.Channel) error { - if _, err := GetAccountFriend(user.ID, target.AccountID, 1); err != nil { - return fmt.Errorf("you only can invite your friends to your channel") + if err := CheckUserPerm(user.ID, target.AccountID, "ChannelAdd", true); err != nil { + return fmt.Errorf("unable to add user into your channel") } member := models.ChannelMember{ diff --git a/pkg/internal/services/events.go b/pkg/internal/services/events.go index 177d7c7..c74e808 100644 --- a/pkg/internal/services/events.go +++ b/pkg/internal/services/events.go @@ -136,9 +136,9 @@ func NotifyMessageEvent(members []models.ChannelMember, event models.Event) { } err := NotifyAccountMessager(member.Account, - "incomingMessage", fmt.Sprintf("%s in %s", event.Sender.Account.Nick, channelDisplay), fmt.Sprintf("%s", displayText), + nil, true, false, ) diff --git a/pkg/internal/services/jwt.go b/pkg/internal/services/jwt.go deleted file mode 100644 index 77b5b22..0000000 --- a/pkg/internal/services/jwt.go +++ /dev/null @@ -1,81 +0,0 @@ -package services - -import ( - "fmt" - "github.com/gofiber/fiber/v2" - "time" - - "github.com/golang-jwt/jwt/v5" - "github.com/spf13/viper" -) - -type PayloadClaims struct { - jwt.RegisteredClaims - - Type string `json:"typ"` -} - -const ( - JwtAccessType = "access" - JwtRefreshType = "refresh" -) - -const ( - CookieAccessKey = "identity_auth_key" - CookieRefreshKey = "identity_refresh_key" -) - -func EncodeJwt(id string, typ, sub string, aud []string, exp time.Time) (string, error) { - tk := jwt.NewWithClaims(jwt.SigningMethodHS512, PayloadClaims{ - jwt.RegisteredClaims{ - Subject: sub, - Audience: aud, - Issuer: fmt.Sprintf("https://%s", viper.GetString("domain")), - ExpiresAt: jwt.NewNumericDate(exp), - NotBefore: jwt.NewNumericDate(time.Now()), - IssuedAt: jwt.NewNumericDate(time.Now()), - ID: id, - }, - typ, - }) - - return tk.SignedString([]byte(viper.GetString("secret"))) -} - -func DecodeJwt(str string) (PayloadClaims, error) { - var claims PayloadClaims - tk, err := jwt.ParseWithClaims(str, &claims, func(token *jwt.Token) (interface{}, error) { - if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { - return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"]) - } - return []byte(viper.GetString("secret")), nil - }) - if err != nil { - return claims, err - } - - if data, ok := tk.Claims.(*PayloadClaims); ok { - return *data, nil - } else { - return claims, fmt.Errorf("unexpected token payload: not payload claims type") - } -} - -func SetJwtCookieSet(c *fiber.Ctx, access, refresh string) { - c.Cookie(&fiber.Cookie{ - Name: CookieAccessKey, - Value: access, - Domain: viper.GetString("security.cookie_domain"), - SameSite: viper.GetString("security.cookie_samesite"), - Expires: time.Now().Add(60 * time.Minute), - Path: "/", - }) - c.Cookie(&fiber.Cookie{ - Name: CookieRefreshKey, - Value: refresh, - Domain: viper.GetString("security.cookie_domain"), - SameSite: viper.GetString("security.cookie_samesite"), - Expires: time.Now().Add(24 * 30 * time.Hour), - Path: "/", - }) -} diff --git a/pkg/internal/services/realms.go b/pkg/internal/services/realms.go index cdc1f94..a9697b6 100644 --- a/pkg/internal/services/realms.go +++ b/pkg/internal/services/realms.go @@ -4,10 +4,11 @@ import ( "context" "errors" "fmt" + "git.solsynth.dev/hydrogen/dealer/pkg/hyper" + "git.solsynth.dev/hydrogen/dealer/pkg/proto" "git.solsynth.dev/hydrogen/messaging/pkg/internal/database" "git.solsynth.dev/hydrogen/messaging/pkg/internal/gap" "git.solsynth.dev/hydrogen/messaging/pkg/internal/models" - "git.solsynth.dev/hydrogen/passport/pkg/proto" "github.com/samber/lo" "gorm.io/gorm" "reflect" @@ -15,11 +16,11 @@ import ( func GetRealmWithExtID(id uint) (models.Realm, error) { var realm models.Realm - pc, err := gap.H.DiscoverServiceGRPC("Hydrogen.Passport") + pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider) if err != nil { return realm, err } - response, err := proto.NewRealmsClient(pc).GetRealm(context.Background(), &proto.RealmLookupRequest{ + response, err := proto.NewRealmClient(pc).GetRealm(context.Background(), &proto.LookupRealmRequest{ Id: lo.ToPtr(uint64(id)), }) if err != nil { @@ -30,11 +31,11 @@ func GetRealmWithExtID(id uint) (models.Realm, error) { func GetRealmWithAlias(alias string) (models.Realm, error) { var realm models.Realm - pc, err := gap.H.DiscoverServiceGRPC("Hydrogen.Passport") + pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider) if err != nil { return realm, err } - response, err := proto.NewRealmsClient(pc).GetRealm(context.Background(), &proto.RealmLookupRequest{ + response, err := proto.NewRealmClient(pc).GetRealm(context.Background(), &proto.LookupRealmRequest{ Alias: &alias, }) if err != nil { @@ -43,16 +44,16 @@ func GetRealmWithAlias(alias string) (models.Realm, error) { return LinkRealm(response) } -func GetRealmMember(realmId uint, userId uint) (*proto.RealmMemberResponse, error) { +func GetRealmMember(realmId uint, userId uint) (*proto.RealmMemberInfo, error) { var realm models.Realm if err := database.C.Where("id = ?", realmId).First(&realm).Error; err != nil { return nil, err } - pc, err := gap.H.DiscoverServiceGRPC("Hydrogen.Passport") + pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider) if err != nil { return nil, err } - response, err := proto.NewRealmsClient(pc).GetRealmMember(context.Background(), &proto.RealmMemberLookupRequest{ + response, err := proto.NewRealmClient(pc).GetRealmMember(context.Background(), &proto.RealmMemberLookupRequest{ RealmId: uint64(realm.ExternalID), UserId: lo.ToPtr(uint64(userId)), }) @@ -63,12 +64,12 @@ func GetRealmMember(realmId uint, userId uint) (*proto.RealmMemberResponse, erro } } -func ListRealmMember(realmId uint) ([]*proto.RealmMemberResponse, error) { - pc, err := gap.H.DiscoverServiceGRPC("Hydrogen.Passport") +func ListRealmMember(realmId uint) ([]*proto.RealmMemberInfo, error) { + pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider) if err != nil { return nil, err } - response, err := proto.NewRealmsClient(pc).ListRealmMember(context.Background(), &proto.RealmMemberLookupRequest{ + response, err := proto.NewRealmClient(pc).ListRealmMember(context.Background(), &proto.RealmMemberLookupRequest{ RealmId: uint64(realmId), }) if err != nil { @@ -78,7 +79,7 @@ func ListRealmMember(realmId uint) ([]*proto.RealmMemberResponse, error) { } } -func LinkRealm(info *proto.RealmResponse) (models.Realm, error) { +func LinkRealm(info *proto.RealmInfo) (models.Realm, error) { var realm models.Realm if info == nil { return realm, fmt.Errorf("remote realm info was not found") diff --git a/pkg/main.go b/pkg/main.go index 90117ec..42fa07f 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -44,12 +44,10 @@ func main() { } // Connect other services - services.SetupLiveKit() - if err := gap.Register(); err != nil { + if err := gap.RegisterService(); err != nil { log.Fatal().Err(err).Msg("An error occurred when connecting to consul...") - } else { - gap.NewHyperClient() } + services.SetupLiveKit() // Server server.NewServer() diff --git a/settings.toml b/settings.toml index 8836028..3375789 100644 --- a/settings.toml +++ b/settings.toml @@ -7,11 +7,11 @@ grpc_bind = "0.0.0.0:7447" domain = "im.solsynth.dev" secret = "LtTjzAGFLshwXhN4ZD4nG5KlMv1MWcsvfv03TSZYnT1VhiAnLIZFTnHUwR0XhGgi" -[consul] -addr = "127.0.0.1:8500" +[dealer] +addr = "127.0.0.1:7442" [debug] -database = true +database = false print_routes = false [calling] From 7d63123fd27ddb20e921e2228b33381f067f98be Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Tue, 16 Jul 2024 14:53:57 +0800 Subject: [PATCH 2/2] :recycle: Use dealer's websocket service instead of own --- pkg/internal/server/api/events_ws.go | 48 ---------------------------- pkg/internal/server/api/index.go | 9 ------ pkg/internal/services/connections.go | 47 --------------------------- pkg/internal/services/encryptor.go | 12 ------- pkg/internal/services/websocket.go | 20 ++++++++++++ 5 files changed, 20 insertions(+), 116 deletions(-) delete mode 100644 pkg/internal/server/api/events_ws.go delete mode 100644 pkg/internal/services/connections.go delete mode 100644 pkg/internal/services/encryptor.go create mode 100644 pkg/internal/services/websocket.go diff --git a/pkg/internal/server/api/events_ws.go b/pkg/internal/server/api/events_ws.go deleted file mode 100644 index afbc714..0000000 --- a/pkg/internal/server/api/events_ws.go +++ /dev/null @@ -1,48 +0,0 @@ -package api - -import ( - "git.solsynth.dev/hydrogen/messaging/pkg/internal/models" - "git.solsynth.dev/hydrogen/messaging/pkg/internal/services" - "github.com/gofiber/contrib/websocket" - jsoniter "github.com/json-iterator/go" - "github.com/rs/zerolog/log" -) - -func messageGateway(c *websocket.Conn) { - user := c.Locals("user").(models.Account) - - // Push connection - services.ClientRegister(user, c) - log.Debug().Uint("user", user.ID).Msg("New websocket connection established...") - - // Event loop - var task models.UnifiedCommand - - var messageType int - var packet []byte - var err error - - for { - if messageType, packet, err = c.ReadMessage(); err != nil { - break - } else if err := jsoniter.Unmarshal(packet, &task); err != nil { - _ = c.WriteMessage(messageType, models.UnifiedCommand{ - Action: "error", - Message: "unable to unmarshal your command, requires json request", - }.Marshal()) - continue - } - - message := services.DealCommand(task, user) - - if message != nil { - if err = c.WriteMessage(messageType, message.Marshal()); err != nil { - break - } - } - } - - // Pop connection - services.ClientUnregister(user, c) - log.Debug().Uint("user", user.ID).Msg("A websocket connection disconnected...") -} diff --git a/pkg/internal/server/api/index.go b/pkg/internal/server/api/index.go index 96a3adc..e91d598 100644 --- a/pkg/internal/server/api/index.go +++ b/pkg/internal/server/api/index.go @@ -1,8 +1,6 @@ package api import ( - "git.solsynth.dev/hydrogen/messaging/pkg/internal/gap" - "github.com/gofiber/contrib/websocket" "github.com/gofiber/fiber/v2" ) @@ -47,12 +45,5 @@ func MapAPIs(app *fiber.App) { channels.Delete("/:channel/calls/ongoing", endCall) channels.Post("/:channel/calls/ongoing/token", exchangeCallToken) } - - api.Use(func(c *fiber.Ctx) error { - if err := gap.H.EnsureAuthenticated(c); err != nil { - return err - } - return c.Next() - }).Get("/ws", websocket.New(messageGateway)) } } diff --git a/pkg/internal/services/connections.go b/pkg/internal/services/connections.go deleted file mode 100644 index 76efb01..0000000 --- a/pkg/internal/services/connections.go +++ /dev/null @@ -1,47 +0,0 @@ -package services - -import ( - "sync" - - "git.solsynth.dev/hydrogen/messaging/pkg/internal/models" - "github.com/gofiber/contrib/websocket" -) - -var ( - wsMutex sync.Mutex - wsConn = make(map[uint]map[*websocket.Conn]bool) -) - -func ClientRegister(user models.Account, conn *websocket.Conn) { - wsMutex.Lock() - if wsConn[user.ID] == nil { - wsConn[user.ID] = make(map[*websocket.Conn]bool) - } - wsConn[user.ID][conn] = true - wsMutex.Unlock() -} - -func ClientUnregister(user models.Account, conn *websocket.Conn) { - wsMutex.Lock() - if wsConn[user.ID] == nil { - wsConn[user.ID] = make(map[*websocket.Conn]bool) - } - delete(wsConn[user.ID], conn) - wsMutex.Unlock() -} - -func PushCommand(userId uint, task models.UnifiedCommand) { - for conn := range wsConn[userId] { - _ = conn.WriteMessage(1, task.Marshal()) - } -} - -func DealCommand(task models.UnifiedCommand, user models.Account) *models.UnifiedCommand { - switch task.Action { - default: - return &models.UnifiedCommand{ - Action: "error", - Message: "command not found", - } - } -} diff --git a/pkg/internal/services/encryptor.go b/pkg/internal/services/encryptor.go deleted file mode 100644 index 8700731..0000000 --- a/pkg/internal/services/encryptor.go +++ /dev/null @@ -1,12 +0,0 @@ -package services - -import "golang.org/x/crypto/bcrypt" - -func HashPassword(raw string) string { - data, _ := bcrypt.GenerateFromPassword([]byte(raw), 12) - return string(data) -} - -func VerifyPassword(text string, password string) bool { - return bcrypt.CompareHashAndPassword([]byte(password), []byte(text)) == nil -} diff --git a/pkg/internal/services/websocket.go b/pkg/internal/services/websocket.go new file mode 100644 index 0000000..e4b8501 --- /dev/null +++ b/pkg/internal/services/websocket.go @@ -0,0 +1,20 @@ +package services + +import ( + "context" + "git.solsynth.dev/hydrogen/dealer/pkg/proto" + "git.solsynth.dev/hydrogen/messaging/pkg/internal/gap" + "git.solsynth.dev/hydrogen/messaging/pkg/internal/models" + "time" +) + +func PushCommand(userId uint, task models.UnifiedCommand) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + pc := gap.H.GetDealerGrpcConn() + _, _ = proto.NewStreamControllerClient(pc).PushStream(ctx, &proto.PushStreamRequest{ + UserId: uint64(userId), + Body: task.Marshal(), + }) +}