♻️ Remove most of the dealer deps and move to nexus

This commit is contained in:
2024-10-24 00:13:16 +08:00
parent e412d5e742
commit b4fb7b53af
29 changed files with 2424 additions and 258 deletions

View File

@ -1,24 +1,26 @@
package database
import (
"fmt"
"git.solsynth.dev/hydrogen/passport/pkg/internal/gap"
"git.solsynth.dev/hypernet/nexus/pkg/nex/cruda"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
"github.com/spf13/viper"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
"gorm.io/gorm/schema"
)
var C *gorm.DB
func NewGorm() error {
var err error
dsn, err := cruda.NewCrudaConn(gap.Nx).AllocDatabase("passport")
if err != nil {
return fmt.Errorf("failed to alloc database from nexus: %v", err)
}
dialector := postgres.Open(viper.GetString("database.dsn"))
C, err = gorm.Open(dialector, &gorm.Config{NamingStrategy: schema.NamingStrategy{
TablePrefix: viper.GetString("database.prefix"),
}, Logger: logger.New(&log.Logger, logger.Config{
C, err = gorm.Open(postgres.Open(dsn), &gorm.Config{Logger: logger.New(&log.Logger, logger.Config{
Colorful: true,
IgnoreRecordNotFoundError: true,
LogLevel: lo.Ternary(viper.GetBool("debug.database"), logger.Info, logger.Silent),

View File

@ -2,17 +2,17 @@ package gap
import (
"fmt"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"git.solsynth.dev/hypernet/nexus/pkg/proto"
"github.com/rs/zerolog/log"
"strings"
"github.com/spf13/viper"
)
var H *hyper.HyperConn
var Nx *nex.Conn
func RegisterService() error {
func InitializeToNexus() error {
grpcBind := strings.SplitN(viper.GetString("grpc_bind"), ":", 2)
httpBind := strings.SplitN(viper.GetString("bind"), ":", 2)
@ -22,16 +22,16 @@ func RegisterService() error {
httpOutbound := fmt.Sprintf("%s:%s", outboundIp, httpBind[1])
var err error
H, err = hyper.NewHyperConn(viper.GetString("dealer.addr"), &proto.ServiceInfo{
Nx, err = nex.NewNexusConn(viper.GetString("dealer.addr"), &proto.ServiceInfo{
Id: viper.GetString("id"),
Type: hyper.ServiceTypeAuthProvider,
Type: nex.ServiceTypeAuth,
Label: "Passport",
GrpcAddr: grpcOutbound,
HttpAddr: &httpOutbound,
})
if err == nil {
go func() {
err := H.KeepRegisterService()
err := Nx.RunRegistering()
if err != nil {
log.Error().Err(err).Msg("An error occurred while registering service...")
}

View File

@ -7,11 +7,16 @@ import (
"git.solsynth.dev/hydrogen/passport/pkg/internal/models"
"github.com/samber/lo"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/passport/pkg/internal/services"
jsoniter "github.com/json-iterator/go"
"git.solsynth.dev/hypernet/nexus/pkg/proto"
)
type authenticateServer struct {
proto.UnimplementedAuthServiceServer
}
func (v *Server) Authenticate(_ context.Context, in *proto.AuthRequest) (*proto.AuthReply, error) {
ctx, perms, atk, rtk, err := services.Authenticate(in.GetAccessToken(), in.GetRefreshToken(), 0)
if err != nil {

View File

@ -2,8 +2,8 @@ package grpc
import (
"context"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/passport/pkg/internal/services"
"git.solsynth.dev/hydrogen/passport/pkg/proto"
)
func (v *Server) RecordEvent(ctx context.Context, request *proto.RecordEventRequest) (*proto.RecordEventResponse, error) {

View File

@ -3,15 +3,15 @@ package grpc
import (
"context"
"fmt"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"github.com/rs/zerolog/log"
"git.solsynth.dev/hydrogen/passport/pkg/internal/database"
"github.com/samber/lo"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/passport/pkg/internal/models"
"git.solsynth.dev/hydrogen/passport/pkg/internal/services"
"git.solsynth.dev/hydrogen/passport/pkg/proto"
)
func (v *Server) NotifyUser(_ context.Context, in *proto.NotifyUserRequest) (*proto.NotifyResponse, error) {
@ -21,7 +21,7 @@ func (v *Server) NotifyUser(_ context.Context, in *proto.NotifyUserRequest) (*pr
return nil, fmt.Errorf("unable to get account: %v", err)
}
metadata := hyper.DecodeMap(in.GetNotify().GetMetadata())
metadata := nex.DecodeMap(in.GetNotify().GetMetadata())
notification := models.Notification{
Topic: in.GetNotify().GetTopic(),
@ -63,7 +63,7 @@ func (v *Server) NotifyUserBatch(_ context.Context, in *proto.NotifyUserBatchReq
return nil, fmt.Errorf("unable to get account: %v", err)
}
metadata := hyper.DecodeMap(in.GetNotify().GetMetadata())
metadata := nex.DecodeMap(in.GetNotify().GetMetadata())
var checklist = make(map[uint]bool, len(users))
var notifications []models.Notification
@ -111,7 +111,7 @@ func (v *Server) NotifyAllUser(_ context.Context, in *proto.NotifyRequest) (*pro
return nil, fmt.Errorf("unable to get account: %v", err)
}
metadata := hyper.DecodeMap(in.GetMetadata())
metadata := nex.DecodeMap(in.GetMetadata())
var checklist = make(map[uint]bool, len(users))
var notifications []models.Notification

View File

@ -3,12 +3,12 @@ package grpc
import (
"context"
"fmt"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/passport/pkg/internal/database"
"git.solsynth.dev/hydrogen/passport/pkg/internal/models"
"git.solsynth.dev/hydrogen/passport/pkg/internal/services"
"git.solsynth.dev/hydrogen/passport/pkg/proto"
"github.com/samber/lo"
)
@ -27,7 +27,7 @@ func (v *Server) ListCommunityRealm(ctx context.Context, empty *proto.ListRealmR
Description: item.Description,
IsPublic: item.IsPublic,
IsCommunity: item.IsCommunity,
AccessPolicy: hyper.EncodeMap(item.AccessPolicy),
AccessPolicy: nex.EncodeMap(item.AccessPolicy),
}
if item.Avatar != nil {
info.Avatar = *item.Avatar
@ -59,7 +59,7 @@ func (v *Server) ListAvailableRealm(ctx context.Context, request *proto.LookupUs
Description: item.Description,
IsPublic: item.IsPublic,
IsCommunity: item.IsCommunity,
AccessPolicy: hyper.EncodeMap(item.AccessPolicy),
AccessPolicy: nex.EncodeMap(item.AccessPolicy),
}
if item.Avatar != nil {
info.Avatar = *item.Avatar
@ -91,7 +91,7 @@ func (v *Server) ListOwnedRealm(ctx context.Context, request *proto.LookupUserRe
Description: item.Description,
IsPublic: item.IsPublic,
IsCommunity: item.IsCommunity,
AccessPolicy: hyper.EncodeMap(item.AccessPolicy),
AccessPolicy: nex.EncodeMap(item.AccessPolicy),
}
if item.Avatar != nil {
info.Avatar = *item.Avatar
@ -132,7 +132,7 @@ func (v *Server) GetRealm(ctx context.Context, request *proto.LookupRealmRequest
Description: realm.Description,
IsPublic: realm.IsPublic,
IsCommunity: realm.IsCommunity,
AccessPolicy: hyper.EncodeMap(realm.AccessPolicy),
AccessPolicy: nex.EncodeMap(realm.AccessPolicy),
}
if realm.Avatar != nil {
info.Avatar = *realm.Avatar

View File

@ -5,7 +5,8 @@ import (
"google.golang.org/grpc/reflection"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/passport/pkg/proto"
nroto "git.solsynth.dev/hypernet/nexus/pkg/proto"
"github.com/spf13/viper"
"google.golang.org/grpc"
@ -13,10 +14,10 @@ import (
)
type Server struct {
proto.UnimplementedAuthServer
nroto.UnimplementedAuthServiceServer
nroto.UnimplementedDirectoryServiceServer
proto.UnimplementedNotifierServer
proto.UnimplementedRealmServer
proto.UnimplementedStreamControllerServer
proto.UnimplementedEventRecorderServer
health.UnimplementedHealthServer
@ -28,10 +29,9 @@ func NewServer() *Server {
srv: grpc.NewServer(),
}
proto.RegisterAuthServer(server.srv, server)
nroto.RegisterAuthServiceServer(server.srv, server)
proto.RegisterNotifierServer(server.srv, server)
proto.RegisterRealmServer(server.srv, server)
proto.RegisterStreamControllerServer(server.srv, server)
proto.RegisterEventRecorderServer(server.srv, server)
health.RegisterHealthServer(server.srv, server)

View File

@ -2,20 +2,22 @@ package grpc
import (
"context"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/passport/pkg/internal/services"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"git.solsynth.dev/hypernet/nexus/pkg/proto"
)
func (v *Server) EmitStreamEvent(ctx context.Context, request *proto.StreamEventRequest) (*proto.StreamEventResponse, error) {
func (v *Server) BroadcastEvent(ctx context.Context, request *proto.EventInfo) (*proto.EventResponse, error) {
switch request.GetEvent() {
case "ClientRegister":
case "ws.client.register":
// No longer need update user online status
// Based on realtime sever connection status
break
case "ClientUnregister":
case "ws.client.unregister":
// Update user last seen at
_ = services.SetAccountLastSeen(uint(request.GetUserId()))
data := nex.DecodeMap(request.GetData())
_ = services.SetAccountLastSeen(uint(data["user"].(float64)))
}
return &proto.StreamEventResponse{}, nil
return &proto.EventResponse{}, nil
}

View File

@ -1,27 +0,0 @@
package exts
import (
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"github.com/gofiber/fiber/v2"
"github.com/spf13/viper"
"time"
)
func SetAuthCookies(c *fiber.Ctx, atk, rtk string) {
c.Cookie(&fiber.Cookie{
Name: hyper.CookieAtk,
Value: atk,
Domain: viper.GetString("security.cookie_domain"),
SameSite: viper.GetString("security.cookie_samesite"),
Expires: time.Now().Add(60 * time.Minute),
Path: "/",
})
c.Cookie(&fiber.Cookie{
Name: hyper.CookieRtk,
Value: rtk,
Domain: viper.GetString("security.cookie_domain"),
SameSite: viper.GetString("security.cookie_samesite"),
Expires: time.Now().Add(24 * 30 * time.Hour),
Path: "/",
})
}

View File

@ -312,7 +312,7 @@ func DeleteAccount(id uint) error {
return err
} else {
InvalidAuthCacheWithUser(id)
_, _ = proto.NewServiceDirectoryClient(gap.H.GetDealerGrpcConn()).BroadcastDeletion(context.Background(), &proto.DeletionRequest{
_, _ = proto.NewServiceDirectoryClient(gap.Nx.GetDealerGrpcConn()).BroadcastDeletion(context.Background(), &proto.DeletionRequest{
ResourceType: "account",
ResourceId: fmt.Sprintf("%d", id),
})

View File

@ -16,24 +16,7 @@ import (
"github.com/rs/zerolog/log"
)
func Authenticate(atk, rtk string, rty int) (ctx models.AuthContext, perms map[string]any, newAtk, newRtk string, err error) {
var claims PayloadClaims
claims, err = DecodeJwt(atk)
if err != nil {
if len(rtk) > 0 && rty < 1 {
// Auto refresh and retry
newAtk, newRtk, err = RefreshToken(rtk)
if err == nil {
return Authenticate(newAtk, newRtk, rty+1)
}
}
err = fiber.NewError(fiber.StatusUnauthorized, fmt.Sprintf("invalid auth key: %v", err))
return
}
newAtk = atk
newRtk = rtk
func Authenticate(atk, rtk string, rty int) (ctx models.AuthContext, perms map[string]any, err error) {
if ctx, err = GetAuthContext(claims.ID); err == nil {
var heldPerms map[string]any
rawHeldPerms, _ := jsoniter.Marshal(ctx.Account.PermNodes)

View File

@ -88,7 +88,7 @@ func GetFactorCode(factor models.AuthFactor) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := proto.NewPostmanClient(gap.H.GetDealerGrpcConn()).DeliverEmail(ctx, &proto.DeliverEmailRequest{
_, err := proto.NewPostmanClient(gap.Nx.GetDealerGrpcConn()).DeliverEmail(ctx, &proto.DeliverEmailRequest{
To: user.GetPrimaryEmail().Content,
Email: &proto.EmailRequest{
Subject: subject,

View File

@ -3,10 +3,10 @@ package services
import (
"context"
"fmt"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"reflect"
"time"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
jsoniter "github.com/json-iterator/go"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
@ -18,6 +18,8 @@ import (
"git.solsynth.dev/hydrogen/passport/pkg/internal/models"
)
// TODO Awaiting for the new notification pusher
func AddNotifySubscriber(user models.Account, provider, id, tk, ua string) (models.NotificationSubscriber, error) {
var prev models.NotificationSubscriber
var subscriber models.NotificationSubscriber
@ -49,7 +51,7 @@ func AddNotifySubscriber(user models.Account, provider, id, tk, ua string) (mode
}
// NewNotification will create a notification and push via the push method it
// Please provide the notification with the account field is not empty
// Pleases provide the notification with the account field is not empty
func NewNotification(notification models.Notification) error {
if ok := CheckNotificationNotifiable(notification.Account, notification.Topic); !ok {
log.Info().Str("topic", notification.Topic).Uint("uid", notification.AccountID).Msg("Notification dismissed by user...")
@ -99,9 +101,9 @@ func PushNotification(notification models.Notification, skipNotifiableCheck ...b
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := proto.NewStreamControllerClient(gap.H.GetDealerGrpcConn()).PushStream(ctx, &proto.PushStreamRequest{
_, err := proto.NewStreamControllerClient(gap.Nx.GetNexusGrpcConn()).PushStream(ctx, &proto.PushStreamRequest{
UserId: lo.ToPtr(uint64(notification.AccountID)),
Body: hyper.NetworkPackage{
Body: nex.WebSocketPackage{
Action: "notifications.new",
Payload: notification,
}.Marshal(),
@ -133,7 +135,7 @@ func PushNotification(notification models.Notification, skipNotifiableCheck ...b
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = proto.NewPostmanClient(gap.H.GetDealerGrpcConn()).DeliverNotificationBatch(ctx, &proto.DeliverNotificationBatchRequest{
_, err = proto.NewPostmanClient(gap.Nx.GetNexusGrpcConn()).DeliverNotificationBatch(ctx, &proto.DeliverNotificationBatchRequest{
Providers: providers,
DeviceTokens: tokens,
Notify: &proto.NotifyRequest{
@ -186,12 +188,12 @@ func PushNotificationBatch(notifications []models.Notification, skipNotifiableCh
var subscribers []models.NotificationSubscriber
database.C.Where("account_id IN ?", accountIdx).Find(&subscribers)
stream := proto.NewStreamControllerClient(gap.H.GetDealerGrpcConn())
stream := proto.NewStreamControllerClient(gap.Nx.GetNexusGrpcConn())
for _, notification := range notifications {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, _ = stream.PushStream(ctx, &proto.PushStreamRequest{
UserId: lo.ToPtr(uint64(notification.AccountID)),
Body: hyper.NetworkPackage{
Body: nex.WebSocketPackage{
Action: "notifications.new",
Payload: notification,
}.Marshal(),
@ -215,7 +217,7 @@ func PushNotificationBatch(notifications []models.Notification, skipNotifiableCh
metadata, _ := jsoniter.Marshal(notification.Metadata)
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
_, _ = proto.NewPostmanClient(gap.H.GetDealerGrpcConn()).DeliverNotificationBatch(ctx, &proto.DeliverNotificationBatchRequest{
_, _ = proto.NewPostmanClient(gap.Nx.GetNexusGrpcConn()).DeliverNotificationBatch(ctx, &proto.DeliverNotificationBatchRequest{
Providers: providers,
DeviceTokens: tokens,
Notify: &proto.NotifyRequest{

View File

@ -60,7 +60,7 @@ func CacheUserStatus(uid uint, status models.Status) {
}
func GetUserOnline(uid uint) bool {
pc := proto.NewStreamControllerClient(gap.H.GetDealerGrpcConn())
pc := proto.NewStreamControllerClient(gap.Nx.GetDealerGrpcConn())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := pc.CountStreamConnection(ctx, &proto.CountConnectionRequest{

View File

@ -145,7 +145,7 @@ func NotifyMagicToken(token models.MagicToken) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := proto.NewPostmanClient(gap.H.GetDealerGrpcConn()).DeliverEmail(ctx, &proto.DeliverEmailRequest{
_, err := proto.NewPostmanClient(gap.Nx.GetDealerGrpcConn()).DeliverEmail(ctx, &proto.DeliverEmailRequest{
To: user.GetPrimaryEmail().Content,
Email: &proto.EmailRequest{
Subject: subject,