Passport/pkg/internal/services/notifications.go

221 lines
6.5 KiB
Go
Raw Normal View History

package services
import (
2024-06-06 14:48:43 +00:00
"context"
"fmt"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
2024-10-26 16:06:23 +00:00
"git.solsynth.dev/hypernet/nexus/pkg/proto"
"git.solsynth.dev/hypernet/passport/pkg/authkit/models"
2024-10-26 16:06:23 +00:00
"git.solsynth.dev/hypernet/pusher/pkg/pushkit"
2024-09-17 06:50:05 +00:00
"reflect"
"time"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
2024-07-10 09:38:39 +00:00
"git.solsynth.dev/hypernet/passport/pkg/internal/gap"
"git.solsynth.dev/hypernet/passport/pkg/internal/database"
)
func AddNotifySubscriber(user models.Account, provider, id, tk, ua string) (models.NotificationSubscriber, error) {
var prev models.NotificationSubscriber
var subscriber models.NotificationSubscriber
if err := database.C.Where(&models.NotificationSubscriber{
DeviceID: id,
2024-02-07 15:15:16 +00:00
AccountID: user.ID,
}); err != nil {
subscriber = models.NotificationSubscriber{
UserAgent: ua,
Provider: provider,
DeviceID: id,
DeviceToken: tk,
AccountID: user.ID,
}
} else {
prev = subscriber
2024-02-07 15:15:16 +00:00
}
subscriber.UserAgent = ua
subscriber.Provider = provider
subscriber.DeviceToken = tk
var err error
if !reflect.DeepEqual(subscriber, prev) {
err = database.C.Save(&subscriber).Error
}
2024-02-07 15:15:16 +00:00
return subscriber, err
}
2024-07-03 15:07:59 +00:00
// NewNotification will create a notification and push via the push method it
// Pleases provide the notification with the account field is not empty
func NewNotification(notification models.Notification) error {
2024-09-17 06:50:05 +00:00
if ok := CheckNotificationNotifiable(notification.Account, notification.Topic); !ok {
log.Info().Str("topic", notification.Topic).Uint("uid", notification.AccountID).Msg("Notification dismissed by user...")
return nil
}
if err := database.C.Save(&notification).Error; err != nil {
return err
}
if err := PushNotification(notification, true); err != nil {
2024-06-30 03:57:57 +00:00
return err
}
return nil
}
func NewNotificationBatch(notifications []models.Notification) error {
if len(notifications) == 0 {
return nil
}
notifiable := CheckNotificationNotifiableBatch(lo.Map(notifications, func(item models.Notification, index int) models.Account {
return item.Account
}), notifications[0].Topic)
2024-09-21 14:54:54 +00:00
notifications = lo.Filter(notifications, func(item models.Notification, index int) bool {
return notifiable[index]
})
if err := database.C.CreateInBatches(notifications, 1000).Error; err != nil {
return err
}
2024-09-21 14:54:54 +00:00
PushNotificationBatch(notifications, true)
return nil
}
2024-09-17 06:50:05 +00:00
// PushNotification will push a notification to the user, via websocket, firebase, or APNs
// Please provide the notification with the account field is not empty
2024-09-21 14:54:54 +00:00
func PushNotification(notification models.Notification, skipNotifiableCheck ...bool) error {
if len(skipNotifiableCheck) == 0 || !skipNotifiableCheck[0] {
if ok := CheckNotificationNotifiable(notification.Account, notification.Topic); !ok {
log.Info().Str("topic", notification.Topic).Uint("uid", notification.AccountID).Msg("Notification dismissed by user...")
return nil
}
2024-09-17 06:50:05 +00:00
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
2024-10-26 16:06:23 +00:00
_, err := proto.NewStreamServiceClient(gap.Nx.GetNexusGrpcConn()).PushStream(ctx, &proto.PushStreamRequest{
UserId: lo.ToPtr(uint64(notification.AccountID)),
Body: nex.WebSocketPackage{
2024-05-09 15:35:13 +00:00
Action: "notifications.new",
2024-05-13 14:31:19 +00:00
Payload: notification,
}.Marshal(),
})
if err != nil {
return fmt.Errorf("failed to push via websocket: %v", err)
2024-03-31 08:03:59 +00:00
}
2024-07-03 15:07:59 +00:00
// Skip push notification
2024-07-15 16:05:09 +00:00
if GetStatusDisturbable(notification.AccountID) != nil {
2024-06-26 12:05:28 +00:00
return nil
}
2024-06-06 14:48:43 +00:00
var subscribers []models.NotificationSubscriber
if err := database.C.Where(&models.NotificationSubscriber{
2024-07-15 16:05:09 +00:00
AccountID: notification.AccountID,
}).Find(&subscribers).Error; err != nil {
return err
}
var providers []string
var tokens []string
for _, subscriber := range subscribers {
providers = append(providers, subscriber.Provider)
tokens = append(tokens, subscriber.DeviceToken)
}
2024-11-22 16:34:53 +00:00
log.Debug().Str("topic", notification.Topic).Any("uid", notification.AccountID).Msg("Pushing notify to user...")
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
2024-10-26 16:06:23 +00:00
err = gap.Px.PushNotifyBatch(pushkit.NotificationPushBatchRequest{
Providers: providers,
Tokens: tokens,
Notification: notification.EncodeToPushkit(),
})
2024-11-22 16:34:53 +00:00
if err != nil {
log.Warn().Err(err).Str("topic", notification.Topic).Msg("Failed to push notification to Pusher")
}
return err
}
2024-09-21 14:54:54 +00:00
func PushNotificationBatch(notifications []models.Notification, skipNotifiableCheck ...bool) {
2024-09-17 06:50:05 +00:00
if len(notifications) == 0 {
return
}
2024-09-21 14:54:54 +00:00
var accountIdx []uint
if len(skipNotifiableCheck) == 0 || !skipNotifiableCheck[0] {
notifiable := CheckNotificationNotifiableBatch(lo.Map(notifications, func(item models.Notification, index int) models.Account {
return item.Account
}), notifications[0].Topic)
accountIdx = lo.Map(
lo.Filter(notifications, func(item models.Notification, index int) bool {
return notifiable[index]
}),
func(item models.Notification, index int) uint {
return item.AccountID
},
)
} else {
accountIdx = lo.Map(
notifications,
func(item models.Notification, index int) uint {
return item.AccountID
},
)
}
2024-09-17 06:50:05 +00:00
2024-11-22 16:34:53 +00:00
log.Debug().Str("topic", notifications[0].Topic).Any("uid", accountIdx).Msg("Pushing notify to users...")
2024-09-17 06:50:05 +00:00
if len(accountIdx) == 0 {
return
}
var subscribers []models.NotificationSubscriber
database.C.Where("account_id IN ?", accountIdx).Find(&subscribers)
2024-10-26 16:06:23 +00:00
stream := proto.NewStreamServiceClient(gap.Nx.GetNexusGrpcConn())
for _, notification := range notifications {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, _ = stream.PushStream(ctx, &proto.PushStreamRequest{
UserId: lo.ToPtr(uint64(notification.AccountID)),
Body: nex.WebSocketPackage{
Action: "notifications.new",
Payload: notification,
}.Marshal(),
})
cancel()
// Skip push notification
if GetStatusDisturbable(notification.AccountID) != nil {
continue
}
var providers []string
var tokens []string
for _, subscriber := range lo.Filter(subscribers, func(item models.NotificationSubscriber, index int) bool {
return item.AccountID == notification.AccountID
}) {
providers = append(providers, subscriber.Provider)
tokens = append(tokens, subscriber.DeviceToken)
}
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
2024-11-22 16:34:53 +00:00
if err := gap.Px.PushNotifyBatch(pushkit.NotificationPushBatchRequest{
Providers: providers,
Tokens: tokens,
Notification: notification.EncodeToPushkit(),
2024-11-22 16:34:53 +00:00
}); err != nil {
log.Warn().Err(err).Str("topic", notification.Topic).Msg("Failed to push notification to Pusher")
}
cancel()
}
}