♻️ Basiclly moved to Dealer from Consul

This commit is contained in:
2024-07-15 00:01:17 +08:00
parent a60be78ce6
commit 69fb9531cb
40 changed files with 298 additions and 1451 deletions

View File

@ -1,37 +0,0 @@
package services
import (
"sync"
"git.solsynth.dev/hydrogen/passport/pkg/internal/models"
"github.com/gofiber/contrib/websocket"
)
var (
wsMutex sync.Mutex
wsConn = make(map[uint]map[*websocket.Conn]bool)
)
func ClientRegister(user models.Account, conn *websocket.Conn) {
wsMutex.Lock()
if wsConn[user.ID] == nil {
wsConn[user.ID] = make(map[*websocket.Conn]bool)
}
wsConn[user.ID][conn] = true
wsMutex.Unlock()
}
func ClientUnregister(user models.Account, conn *websocket.Conn) {
wsMutex.Lock()
if wsConn[user.ID] == nil {
wsConn[user.ID] = make(map[*websocket.Conn]bool)
}
delete(wsConn[user.ID], conn)
wsMutex.Unlock()
if status, err := GetStatus(user.ID); err != nil || !status.IsInvisible {
if len(wsConn[user.ID]) == 0 {
_ = SetAccountLastSeen(user.ID)
}
}
}

View File

@ -1,82 +0,0 @@
package services
import (
"git.solsynth.dev/hydrogen/passport/pkg/internal/models"
"github.com/gofiber/contrib/websocket"
"github.com/gofiber/fiber/v2"
"time"
)
type kexRequest struct {
OwnerID uint
Conn *websocket.Conn
Deadline time.Time
}
var kexRequests = make(map[string]map[string]kexRequest)
func KexRequest(conn *websocket.Conn, requestId, keypairId, algorithm string, ownerId uint, deadline int64) {
if kexRequests[keypairId] == nil {
kexRequests[keypairId] = make(map[string]kexRequest)
}
ddl := time.Now().Add(time.Second * time.Duration(deadline))
request := kexRequest{
OwnerID: ownerId,
Conn: conn,
Deadline: ddl,
}
flag := false
for c := range wsConn[ownerId] {
if c == conn {
continue
}
if c.WriteMessage(1, models.UnifiedCommand{
Action: "kex.request",
Payload: fiber.Map{
"request_id": requestId,
"keypair_id": keypairId,
"algorithm": algorithm,
"owner_id": ownerId,
"deadline": deadline,
},
}.Marshal()) == nil {
flag = true
}
}
if flag {
kexRequests[keypairId][requestId] = request
}
}
func KexProvide(userId uint, requestId, keypairId string, pkt []byte) {
if kexRequests[keypairId] == nil {
return
}
val, ok := kexRequests[keypairId][requestId]
if !ok {
return
} else if val.OwnerID != userId {
return
} else {
_ = val.Conn.WriteMessage(1, pkt)
}
}
func KexCleanup() {
if len(kexRequests) <= 0 {
return
}
for kp, data := range kexRequests {
for idx, req := range data {
if req.Deadline.Unix() <= time.Now().Unix() {
delete(kexRequests[kp], idx)
}
}
}
}

View File

@ -2,7 +2,11 @@ package services
import (
"context"
"fmt"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/passport/pkg/internal/gap"
"reflect"
"time"
"firebase.google.com/go/messaging"
"git.solsynth.dev/hydrogen/passport/pkg/internal/database"
@ -56,15 +60,23 @@ func NewNotification(notification models.Notification) error {
return nil
}
// PushNotification will push the notification what ever it is exists record in the database
// Recommend push another goroutine when you need to push a lot of notification
// And just use block statement when you just push one notification, the time of create a new sub-process is much more than push notification
// PushNotification will push the notification whatever it exists record in the
// database Recommend push another goroutine when you need to push a lot of
// notifications And just use a block statement when you just push one
// notification, the time of create a new subprocess is much more than push
// notification
func PushNotification(notification models.Notification) error {
for conn := range wsConn[notification.RecipientID] {
_ = conn.WriteMessage(1, models.UnifiedCommand{
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := proto.NewStreamControllerClient(gap.H.GetDealerGrpcConn()).PushStream(ctx, &proto.PushStreamRequest{
UserId: uint64(notification.RecipientID),
Body: models.UnifiedCommand{
Action: "notifications.new",
Payload: notification,
}.Marshal())
}.Marshal(),
})
if err != nil {
return fmt.Errorf("failed to push via websocket: %v", err)
}
// Skip push notification

View File

@ -1,7 +1,10 @@
package services
import (
"context"
"fmt"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/passport/pkg/internal/gap"
"time"
"git.solsynth.dev/hydrogen/passport/pkg/internal/database"
@ -32,7 +35,16 @@ func GetStatus(uid uint) (models.Status, error) {
}
func GetUserOnline(uid uint) bool {
return wsConn[uid] != nil && len(wsConn[uid]) > 0
pc := proto.NewStreamControllerClient(gap.H.GetDealerGrpcConn())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := pc.CountStreamConnection(ctx, &proto.CountConnectionRequest{
UserId: uint64(uid),
})
if err != nil {
return false
}
return resp.Count > 0
}
func GetStatusDisturbable(uid uint) error {
@ -49,7 +61,7 @@ func GetStatusDisturbable(uid uint) error {
func GetStatusOnline(uid uint) error {
status, err := GetStatus(uid)
isOnline := wsConn[uid] != nil && len(wsConn[uid]) > 0
isOnline := GetUserOnline(uid)
if isOnline && err != nil {
return nil
} else if err == nil && status.IsInvisible {