♻️ Move dealer to nexus

This commit is contained in:
2024-11-02 13:23:27 +08:00
parent fce8669059
commit cef4764d8c
38 changed files with 454 additions and 550 deletions

View File

@ -2,80 +2,26 @@ package services
import (
"context"
"fmt"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
authm "git.solsynth.dev/hypernet/passport/pkg/authkit/models"
"time"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/database"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/gap"
jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/models"
)
func CheckUserPerm(userId, otherId uint, key string, val any) error {
var user models.Account
if err := database.C.Where("id = ?", userId).First(&user).Error; err != nil {
return fmt.Errorf("account not found: %v", err)
}
var other models.Account
if err := database.C.Where("id = ?", otherId).First(&other).Error; err != nil {
return fmt.Errorf("other not found: %v", err)
}
func NotifyAccountMessagerBatch(users []authm.Account, notification *proto.NotifyRequest) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
encodedData, _ := jsoniter.Marshal(val)
pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider)
if err != nil {
return err
}
out, err := proto.NewAuthClient(pc).EnsureUserPermGranted(ctx, &proto.CheckUserPermRequest{
UserId: uint64(user.ID),
OtherId: uint64(other.ID),
Key: key,
Value: encodedData,
})
if err != nil {
return err
} else if !out.IsValid {
return fmt.Errorf("missing permission: %v", key)
}
return nil
}
func NotifyAccountMessager(user models.Account, notification *proto.NotifyRequest) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider)
if err != nil {
return err
}
_, err = proto.NewNotifierClient(pc).NotifyUser(ctx, &proto.NotifyUserRequest{
UserId: uint64(user.ID),
Notify: notification,
})
return err
}
func NotifyAccountMessagerBatch(users []models.Account, notification *proto.NotifyRequest) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider)
pc, err := gap.Nx.GetClientGrpcConn(nex.ServiceTypeAuth)
if err != nil {
return err
}
_, err = proto.NewNotifierClient(pc).NotifyUserBatch(ctx, &proto.NotifyUserBatchRequest{
UserId: lo.Map(users, func(item models.Account, idx int) uint64 {
UserId: lo.Map(users, func(item authm.Account, idx int) uint64 {
return uint64(item.ID)
}),
Notify: notification,

View File

@ -4,10 +4,14 @@ import (
"context"
"errors"
"fmt"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/gap"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"git.solsynth.dev/hypernet/passport/pkg/authkit"
authm "git.solsynth.dev/hypernet/passport/pkg/authkit/models"
"git.solsynth.dev/hypernet/pusher/pkg/pushkit"
"time"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/database"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/models"
jsoniter "github.com/json-iterator/go"
@ -109,12 +113,12 @@ func NewCall(channel models.Channel, founder models.ChannelMember) (models.Call,
ChannelID: call.ChannelID,
}).Preload("Account").Find(&members).Error; err == nil {
call, _ = GetCall(call.Channel, call.ID)
var pendingUsers []models.Account
var pendingUsers []uint64
for _, member := range members {
if member.ID != call.Founder.ID {
pendingUsers = append(pendingUsers, member.Account)
pendingUsers = append(pendingUsers, uint64(member.AccountID))
}
PushCommand(member.Account.ID, models.UnifiedCommand{
PushCommand(member.AccountID, nex.WebSocketPackage{
Action: "calls.new",
Payload: call,
})
@ -122,21 +126,21 @@ func NewCall(channel models.Channel, founder models.ChannelMember) (models.Call,
channel, _ = GetChannel(channel.ID)
err = NotifyAccountMessagerBatch(
err = authkit.NotifyUserBatch(
gap.Nx,
pendingUsers,
&proto.NotifyRequest{
Topic: "messaging.callStart",
Title: fmt.Sprintf("Call in (%s)", channel.DisplayText()),
Body: fmt.Sprintf("%s is calling", call.Founder.Account.Name),
Avatar: &call.Founder.Account.Avatar,
Metadata: EncodeJSONBody(map[string]any{
"user_id": call.Founder.Account.ID,
"user_name": call.Founder.Account.Name,
"user_nick": call.Founder.Account.Nick,
pushkit.Notification{
Topic: "messaging.callStart",
Title: fmt.Sprintf("Call in (%s)", channel.DisplayText()),
Body: fmt.Sprintf("%s is calling", call.Founder.Name),
Metadata: map[string]any{
"avatar": call.Founder.Avatar,
"user_id": call.Founder.AccountID,
"user_name": call.Founder.Name,
"user_nick": call.Founder.Nick,
"channel_id": call.ChannelID,
}),
IsRealtime: false,
IsForcePush: true,
},
Priority: 5,
},
)
if err != nil {
@ -164,7 +168,7 @@ func EndCall(call models.Call) (models.Call, error) {
}).Preload("Account").Find(&members).Error; err == nil {
call, _ = GetCall(call.Channel, call.ID)
for _, member := range members {
PushCommand(member.Account.ID, models.UnifiedCommand{
PushCommand(member.AccountID, nex.WebSocketPackage{
Action: "calls.end",
Payload: call,
})
@ -182,7 +186,7 @@ func KickParticipantInCall(call models.Call, username string) error {
return err
}
func EncodeCallToken(user models.Account, call models.Call) (string, error) {
func EncodeCallToken(user authm.Account, call models.Call) (string, error) {
isAdmin := user.ID == call.FounderID || user.ID == call.Channel.AccountID
grant := &auth.VideoGrant{

View File

@ -4,6 +4,9 @@ import (
"context"
"fmt"
localCache "git.solsynth.dev/hydrogen/messaging/pkg/internal/cache"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/gap"
"git.solsynth.dev/hypernet/passport/pkg/authkit"
authm "git.solsynth.dev/hypernet/passport/pkg/authkit/models"
"github.com/eko/gocache/lib/v4/cache"
"github.com/eko/gocache/lib/v4/marshaler"
"github.com/eko/gocache/lib/v4/store"
@ -25,7 +28,7 @@ func ListChannelMember(channelId uint) ([]models.ChannelMember, error) {
return members, nil
}
func GetChannelMember(user models.Account, channelId uint) (models.ChannelMember, error) {
func GetChannelMember(user authm.Account, channelId uint) (models.ChannelMember, error) {
var member models.ChannelMember
if err := database.C.
@ -37,9 +40,9 @@ func GetChannelMember(user models.Account, channelId uint) (models.ChannelMember
return member, nil
}
func AddChannelMemberWithCheck(user models.Account, target models.Channel) error {
if err := CheckUserPerm(user.ID, target.AccountID, "ChannelAdd", true); err != nil {
return fmt.Errorf("unable to add user into your channel")
func AddChannelMemberWithCheck(user authm.Account, target models.Channel) error {
if err := authkit.EnsureUserPermGranted(gap.Nx, user.ID, target.AccountID, "ChannelAdd", true); err != nil {
return fmt.Errorf("unable to add user into your channel due to access denied: %v", err)
}
member := models.ChannelMember{
@ -51,7 +54,7 @@ func AddChannelMemberWithCheck(user models.Account, target models.Channel) error
return err
}
func AddChannelMember(user models.Account, target models.Channel) error {
func AddChannelMember(user authm.Account, target models.Channel) error {
member := models.ChannelMember{
ChannelID: target.ID,
AccountID: user.ID,
@ -96,7 +99,7 @@ func EditChannelMember(membership models.ChannelMember) (models.ChannelMember, e
return membership, nil
}
func RemoveChannelMember(user models.Account, target models.Channel) error {
func RemoveChannelMember(user authm.Account, target models.Channel) error {
var member models.ChannelMember
if err := database.C.Where(&models.ChannelMember{

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
localCache "git.solsynth.dev/hydrogen/messaging/pkg/internal/cache"
authm "git.solsynth.dev/hypernet/passport/pkg/authkit/models"
"github.com/eko/gocache/lib/v4/cache"
"github.com/eko/gocache/lib/v4/marshaler"
"github.com/eko/gocache/lib/v4/store"
@ -137,7 +138,7 @@ func GetAvailableChannelWithAlias(alias string, user uint, realmId ...uint) (mod
return channel, member, nil
}
func GetAvailableChannel(id uint, user models.Account) (models.Channel, models.ChannelMember, error) {
func GetAvailableChannel(id uint, user authm.Account) (models.Channel, models.ChannelMember, error) {
var err error
var member models.ChannelMember
var channel models.Channel
@ -167,7 +168,7 @@ func PreloadDirectChannelMembers(tx *gorm.DB) *gorm.DB {
}).Preload("Members.Account")
}
func ListChannel(user *models.Account, realmId ...uint) ([]models.Channel, error) {
func ListChannel(user *authm.Account, realmId ...uint) ([]models.Channel, error) {
var identities []models.ChannelMember
var idRange []uint
if user != nil {
@ -195,7 +196,7 @@ func ListChannel(user *models.Account, realmId ...uint) ([]models.Channel, error
return channels, nil
}
func ListChannelWithUser(user models.Account, realmId ...uint) ([]models.Channel, error) {
func ListChannelWithUser(user authm.Account, realmId ...uint) ([]models.Channel, error) {
var channels []models.Channel
tx := database.C.Where(&models.Channel{AccountID: user.ID}).Preload("Realm")
if len(realmId) > 0 {
@ -211,7 +212,7 @@ func ListChannelWithUser(user models.Account, realmId ...uint) ([]models.Channel
return channels, nil
}
func ListAvailableChannel(tx *gorm.DB, user models.Account, realmId ...uint) ([]models.Channel, error) {
func ListAvailableChannel(tx *gorm.DB, user authm.Account, realmId ...uint) ([]models.Channel, error) {
var channels []models.Channel
var members []models.ChannelMember
if err := database.C.Where(&models.ChannelMember{

View File

@ -2,15 +2,15 @@ package services
import (
"fmt"
authm "git.solsynth.dev/hypernet/passport/pkg/authkit/models"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/database"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/models"
"github.com/spf13/viper"
)
func GetDirectChannelByUser(user models.Account, other models.Account) (models.Channel, error) {
memberTable := fmt.Sprintf("%schannel_members", viper.GetString("database.prefix"))
channelTable := fmt.Sprintf("%schannels", viper.GetString("database.prefix"))
func GetDirectChannelByUser(user authm.Account, other authm.Account) (models.Channel, error) {
memberTable := "channel_members"
channelTable := "channels"
var channel models.Channel
if err := database.C.Preload("Members").

View File

@ -2,10 +2,13 @@ package services
import (
"fmt"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/gap"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"git.solsynth.dev/hypernet/passport/pkg/authkit"
"git.solsynth.dev/hypernet/pusher/pkg/pushkit"
"strings"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/database"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/models"
jsoniter "github.com/json-iterator/go"
@ -86,9 +89,9 @@ func NewEvent(event models.Event) (models.Event, error) {
event, _ = GetEvent(event.Channel, event.ID)
idxList := lo.Map(members, func(item models.ChannelMember, index int) uint64 {
return uint64(item.Account.ID)
return uint64(item.AccountID)
})
PushCommandBatch(idxList, models.UnifiedCommand{
PushCommandBatch(idxList, nex.WebSocketPackage{
Action: "events.new",
Payload: event,
})
@ -106,8 +109,8 @@ func NotifyMessageEvent(members []models.ChannelMember, event models.Event) {
raw, _ := jsoniter.Marshal(event.Body)
_ = jsoniter.Unmarshal(raw, &body)
var pendingUsers []models.Account
var mentionedUsers []models.Account
var pendingUsers []uint64
var mentionedUsers []uint64
for _, member := range members {
if member.ID != event.SenderID {
@ -115,31 +118,31 @@ func NotifyMessageEvent(members []models.ChannelMember, event models.Event) {
case models.NotifyLevelNone:
continue
case models.NotifyLevelMentioned:
if len(body.RelatedUsers) != 0 && lo.Contains(body.RelatedUsers, member.Account.ID) {
mentionedUsers = append(mentionedUsers, member.Account)
if len(body.RelatedUsers) != 0 && lo.Contains(body.RelatedUsers, member.AccountID) {
mentionedUsers = append(mentionedUsers, uint64(member.AccountID))
}
continue
default:
break
}
if lo.Contains(body.RelatedUsers, member.Account.ID) {
mentionedUsers = append(mentionedUsers, member.Account)
if lo.Contains(body.RelatedUsers, member.AccountID) {
mentionedUsers = append(mentionedUsers, uint64(member.AccountID))
} else {
pendingUsers = append(pendingUsers, member.Account)
pendingUsers = append(pendingUsers, uint64(member.AccountID))
}
}
}
var displayText string
var displaySubtitle *string
var displaySubtitle string
switch event.Type {
case models.EventMessageNew:
if body.Algorithm == "plain" {
displayText = body.Text
}
case models.EventMessageEdit:
displaySubtitle = lo.ToPtr("Edited a message")
displaySubtitle = "Edited a message"
if body.Algorithm == "plain" {
displayText = body.Text
}
@ -162,22 +165,22 @@ func NotifyMessageEvent(members []models.ChannelMember, event models.Event) {
}
if len(pendingUsers) > 0 {
err := NotifyAccountMessagerBatch(
err := authkit.NotifyUserBatch(
gap.Nx,
pendingUsers,
&proto.NotifyRequest{
pushkit.Notification{
Topic: "messaging.message",
Title: fmt.Sprintf("%s (%s)", event.Sender.Account.Nick, event.Channel.DisplayText()),
Title: fmt.Sprintf("%s (%s)", event.Sender.Nick, event.Channel.DisplayText()),
Subtitle: displaySubtitle,
Body: displayText,
Avatar: &event.Sender.Account.Avatar,
Metadata: EncodeJSONBody(map[string]any{
"user_id": event.Sender.Account.ID,
"user_name": event.Sender.Account.Name,
"user_nick": event.Sender.Account.Nick,
Metadata: map[string]any{
"avatar": event.Sender.Avatar,
"user_id": event.Sender.AccountID,
"user_name": event.Sender.Name,
"user_nick": event.Sender.Nick,
"channel_id": event.ChannelID,
}),
IsRealtime: true,
IsForcePush: false,
},
Priority: 5,
},
)
if err != nil {
@ -186,28 +189,28 @@ func NotifyMessageEvent(members []models.ChannelMember, event models.Event) {
}
if len(mentionedUsers) > 0 {
if displaySubtitle != nil && len(*displaySubtitle) > 0 {
*displaySubtitle += ", and metioned you"
if len(displaySubtitle) > 0 {
displaySubtitle += ", and mentioned you"
} else {
displaySubtitle = lo.ToPtr("Metioned you")
displaySubtitle = "Mentioned you"
}
err := NotifyAccountMessagerBatch(
err := authkit.NotifyUserBatch(
gap.Nx,
mentionedUsers,
&proto.NotifyRequest{
pushkit.Notification{
Topic: "messaging.message",
Title: fmt.Sprintf("%s (%s)", event.Sender.Account.Nick, event.Channel.DisplayText()),
Title: fmt.Sprintf("%s (%s)", event.Sender.Nick, event.Channel.DisplayText()),
Subtitle: displaySubtitle,
Body: displayText,
Avatar: &event.Sender.Account.Avatar,
Metadata: EncodeJSONBody(map[string]any{
"user_id": event.Sender.Account.ID,
"user_name": event.Sender.Account.Name,
"user_nick": event.Sender.Account.Nick,
Metadata: map[string]any{
"avatar": event.Sender.Avatar,
"user_id": event.Sender.AccountID,
"user_name": event.Sender.Name,
"user_nick": event.Sender.Nick,
"channel_id": event.ChannelID,
}),
IsRealtime: true,
IsForcePush: false,
},
Priority: 5,
},
)
if err != nil {

View File

@ -2,6 +2,7 @@ package services
import (
"context"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
@ -14,7 +15,7 @@ import (
func GetRealmWithExtID(id uint) (models.Realm, error) {
var realm models.Realm
pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider)
pc, err := gap.Nx.GetClientGrpcConn(nex.ServiceTypeAuth)
if err != nil {
return realm, err
}
@ -31,7 +32,7 @@ func GetRealmWithExtID(id uint) (models.Realm, error) {
func GetRealmWithAlias(alias string) (models.Realm, error) {
var realm models.Realm
pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider)
pc, err := gap.Nx.GetClientGrpcConn(nex.ServiceTypeAuth)
if err != nil {
return realm, err
}
@ -47,7 +48,7 @@ func GetRealmWithAlias(alias string) (models.Realm, error) {
}
func GetRealmMember(realmId uint, userId uint) (*proto.RealmMemberInfo, error) {
pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider)
pc, err := gap.Nx.GetClientGrpcConn(nex.ServiceTypeAuth)
if err != nil {
return nil, err
}
@ -63,7 +64,7 @@ func GetRealmMember(realmId uint, userId uint) (*proto.RealmMemberInfo, error) {
}
func ListRealmMember(realmId uint) ([]*proto.RealmMemberInfo, error) {
pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider)
pc, err := gap.Nx.GetClientGrpcConn(nex.ServiceTypeAuth)
if err != nil {
return nil, err
}

View File

@ -3,7 +3,6 @@ package services
import (
"context"
"fmt"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
localCache "git.solsynth.dev/hydrogen/messaging/pkg/internal/cache"
@ -41,18 +40,11 @@ func SetTypingStatus(channelId uint, userId uint) error {
}
if !hitCache {
var account models.Account
if err := database.C.Where("external_id = ?", userId).First(&account).Error; err != nil {
return fmt.Errorf("account not found: %v", err)
}
var member models.ChannelMember
if err := database.C.
Where("account_id = ? AND channel_id = ?", account.ID, channelId).
Where("account_id = ? AND channel_id = ?", userId, channelId).
First(&member).Error; err != nil {
return fmt.Errorf("channel member not found: %v", err)
} else {
member.Account = account
}
var channel models.Channel
@ -65,7 +57,7 @@ func SetTypingStatus(channelId uint, userId uint) error {
}
for _, item := range channel.Members {
broadcastTarget = append(broadcastTarget, uint64(item.Account.ID))
broadcastTarget = append(broadcastTarget, uint64(item.AccountID))
}
data = map[string]any{
@ -77,11 +69,7 @@ func SetTypingStatus(channelId uint, userId uint) error {
}
// Cache queries
cacheManager := cache.New[any](localCache.S)
marshal := marshaler.New(cacheManager)
contx := context.Background()
marshal.Set(
_ = marshal.Set(
contx,
GetTypingStatusQueryCacheKey(channelId, userId),
statusQueryCacheEntry{broadcastTarget, data},
@ -89,7 +77,7 @@ func SetTypingStatus(channelId uint, userId uint) error {
)
}
sc := proto.NewStreamControllerClient(gap.H.GetDealerGrpcConn())
sc := proto.NewStreamControllerClient(gap.Nx.GetNexusGrpcConn())
_, err := sc.PushStreamBatch(context.Background(), &proto.PushStreamBatchRequest{
UserId: broadcastTarget,
Body: hyper.NetworkPackage{

View File

@ -2,31 +2,31 @@ package services
import (
"context"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"time"
"github.com/samber/lo"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/gap"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/models"
)
func PushCommand(userId uint, task models.UnifiedCommand) {
func PushCommand(userId uint, task nex.WebSocketPackage) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
pc := gap.H.GetDealerGrpcConn()
pc := gap.Nx.GetNexusGrpcConn()
_, _ = proto.NewStreamControllerClient(pc).PushStream(ctx, &proto.PushStreamRequest{
UserId: lo.ToPtr(uint64(userId)),
Body: task.Marshal(),
})
}
func PushCommandBatch(userId []uint64, task models.UnifiedCommand) {
func PushCommandBatch(userId []uint64, task nex.WebSocketPackage) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
pc := gap.H.GetDealerGrpcConn()
pc := gap.Nx.GetNexusGrpcConn()
_, _ = proto.NewStreamControllerClient(pc).PushStreamBatch(ctx, &proto.PushStreamBatchRequest{
UserId: userId,
Body: task.Marshal(),