Realtime Notify

This commit is contained in:
LittleSheep 2024-03-31 16:03:59 +08:00
parent 7873bafa4f
commit e8aac7bb66
11 changed files with 140 additions and 72 deletions

View File

@ -32,12 +32,13 @@ func (v *Server) NotifyUser(_ context.Context, in *proto.NotifyRequest) (*proto.
Content: in.GetContent(), Content: in.GetContent(),
Links: links, Links: links,
IsImportant: in.GetIsImportant(), IsImportant: in.GetIsImportant(),
IsRealtime: in.GetIsRealtime(),
ReadAt: nil, ReadAt: nil,
RecipientID: user.ID, RecipientID: user.ID,
SenderID: &client.ID, SenderID: &client.ID,
} }
if in.GetRealtime() { if in.GetIsRealtime() {
if err := services.PushNotification(notification); err != nil { if err := services.PushNotification(notification); err != nil {
return nil, err return nil, err
} }

View File

@ -87,7 +87,7 @@ type NotifyRequest struct {
RecipientId uint64 `protobuf:"varint,5,opt,name=recipient_id,json=recipientId,proto3" json:"recipient_id,omitempty"` RecipientId uint64 `protobuf:"varint,5,opt,name=recipient_id,json=recipientId,proto3" json:"recipient_id,omitempty"`
ClientId string `protobuf:"bytes,6,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"` ClientId string `protobuf:"bytes,6,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"`
ClientSecret string `protobuf:"bytes,7,opt,name=client_secret,json=clientSecret,proto3" json:"client_secret,omitempty"` ClientSecret string `protobuf:"bytes,7,opt,name=client_secret,json=clientSecret,proto3" json:"client_secret,omitempty"`
Realtime bool `protobuf:"varint,8,opt,name=realtime,proto3" json:"realtime,omitempty"` IsRealtime bool `protobuf:"varint,8,opt,name=is_realtime,json=isRealtime,proto3" json:"is_realtime,omitempty"`
} }
func (x *NotifyRequest) Reset() { func (x *NotifyRequest) Reset() {
@ -171,9 +171,9 @@ func (x *NotifyRequest) GetClientSecret() string {
return "" return ""
} }
func (x *NotifyRequest) GetRealtime() bool { func (x *NotifyRequest) GetIsRealtime() bool {
if x != nil { if x != nil {
return x.Realtime return x.IsRealtime
} }
return false return false
} }
@ -232,7 +232,7 @@ var file_notify_proto_rawDesc = []byte{
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x34, 0x0a, 0x0a, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x4c, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x34, 0x0a, 0x0a, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x4c,
0x69, 0x6e, 0x6b, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x69, 0x6e, 0x6b, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x05, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x28, 0x09, 0x52, 0x05, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x22, 0x90, 0x02, 0x0a, 0x0d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x22, 0x95, 0x02, 0x0a, 0x0d,
0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a,
0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x07, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65,
@ -248,16 +248,17 @@ var file_notify_proto_rawDesc = []byte{
0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a,
0x0d, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x18, 0x07, 0x0d, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x18, 0x07,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x63, 0x72, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x63, 0x72,
0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x61, 0x6c, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x65, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x69, 0x73, 0x5f, 0x72, 0x65, 0x61, 0x6c, 0x74, 0x69, 0x6d,
0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x72, 0x65, 0x61, 0x6c, 0x74, 0x69, 0x6d, 0x65, 0x22, 0x26, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x69, 0x73, 0x52, 0x65, 0x61, 0x6c, 0x74,
0x0a, 0x0b, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x17, 0x0a, 0x69, 0x6d, 0x65, 0x22, 0x26, 0x0a, 0x0b, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x52, 0x65, 0x70,
0x07, 0x69, 0x73, 0x5f, 0x73, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x6c, 0x79, 0x12, 0x17, 0x0a, 0x07, 0x69, 0x73, 0x5f, 0x73, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20,
0x69, 0x73, 0x53, 0x65, 0x6e, 0x74, 0x32, 0x42, 0x0a, 0x06, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x53, 0x65, 0x6e, 0x74, 0x32, 0x42, 0x0a, 0x06, 0x4e,
0x12, 0x38, 0x0a, 0x0a, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x55, 0x73, 0x65, 0x72, 0x12, 0x14, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x12, 0x38, 0x0a, 0x0a, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x55,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x52, 0x65, 0x71, 0x73, 0x65, 0x72, 0x12, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4e, 0x6f, 0x74, 0x69,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x4e, 0x6f, 0x74, 0x66, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x69, 0x66, 0x79, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x6f, 0x2e, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
} }
var ( var (

View File

@ -21,7 +21,7 @@ message NotifyRequest {
uint64 recipient_id = 5; uint64 recipient_id = 5;
string client_id = 6; string client_id = 6;
string client_secret = 7; string client_secret = 7;
bool realtime = 8; bool is_realtime = 8;
} }
message NotifyReply { message NotifyReply {

View File

@ -12,6 +12,7 @@ type Notification struct {
Content string `json:"content"` Content string `json:"content"`
Links datatypes.JSONSlice[NotificationLink] `json:"links"` Links datatypes.JSONSlice[NotificationLink] `json:"links"`
IsImportant bool `json:"is_important"` IsImportant bool `json:"is_important"`
IsRealtime bool `json:"is_realtime" gorm:"-"`
ReadAt *time.Time `json:"read_at"` ReadAt *time.Time `json:"read_at"`
SenderID *uint `json:"sender_id"` SenderID *uint `json:"sender_id"`
RecipientID uint `json:"recipient_id"` RecipientID uint `json:"recipient_id"`

View File

@ -14,6 +14,7 @@ func notifyUser(c *fiber.Ctx) error {
Content string `json:"content" validate:"required,max=3072"` Content string `json:"content" validate:"required,max=3072"`
Links []models.NotificationLink `json:"links"` Links []models.NotificationLink `json:"links"`
IsImportant bool `json:"is_important"` IsImportant bool `json:"is_important"`
IsRealtime bool `json:"is_realtime"`
UserID uint `json:"user_id" validate:"required"` UserID uint `json:"user_id" validate:"required"`
} }
@ -33,17 +34,24 @@ func notifyUser(c *fiber.Ctx) error {
notification := models.Notification{ notification := models.Notification{
Subject: data.Subject, Subject: data.Subject,
Content: data.Subject, Content: data.Content,
Links: data.Links, Links: data.Links,
IsImportant: data.IsImportant, IsImportant: data.IsImportant,
IsRealtime: data.IsRealtime,
ReadAt: nil, ReadAt: nil,
RecipientID: user.ID, RecipientID: user.ID,
SenderID: &client.ID, SenderID: &client.ID,
} }
if data.IsRealtime {
if err := services.PushNotification(notification); err != nil {
return fiber.NewError(fiber.StatusBadRequest, err.Error())
}
} else {
if err := services.NewNotification(notification); err != nil { if err := services.NewNotification(notification); err != nil {
return fiber.NewError(fiber.StatusBadRequest, err.Error()) return fiber.NewError(fiber.StatusBadRequest, err.Error())
} }
}
return c.SendStatus(fiber.StatusOK) return c.SendStatus(fiber.StatusOK)
} }

View File

@ -16,14 +16,10 @@ func listenNotifications(c *websocket.Conn) {
// Event loop // Event loop
var err error var err error
for { for {
message := services.WsNotifyQueue[user.ID] if _, _, err = c.ReadMessage(); err != nil {
if message != nil {
if err = c.WriteMessage(1, message); err != nil {
break break
} }
} }
}
// Pop connection // Pop connection
services.WsConn[user.ID] = lo.Filter(services.WsConn[user.ID], func(item *websocket.Conn, idx int) bool { services.WsConn[user.ID] = lo.Filter(services.WsConn[user.ID], func(item *websocket.Conn, idx int) bool {

View File

@ -5,8 +5,12 @@ import (
"github.com/gofiber/contrib/websocket" "github.com/gofiber/contrib/websocket"
) )
type WsPushRequest struct {
Payload []byte
RecipientID uint
}
var WsConn = make(map[uint][]*websocket.Conn) var WsConn = make(map[uint][]*websocket.Conn)
var WsNotifyQueue = make(map[uint][]byte)
func CheckOnline(user models.Account) bool { func CheckOnline(user models.Account) bool {
return len(WsConn[user.ID]) > 0 return len(WsConn[user.ID]) > 0

View File

@ -31,14 +31,19 @@ func NewNotification(notification models.Notification) error {
go func() { go func() {
err := PushNotification(notification) err := PushNotification(notification)
if err != nil {
log.Error().Err(err).Msg("Unexpected error occurred during the notification.") log.Error().Err(err).Msg("Unexpected error occurred during the notification.")
}
}() }()
return nil return nil
} }
func PushNotification(notification models.Notification) error { func PushNotification(notification models.Notification) error {
WsNotifyQueue[notification.RecipientID], _ = jsoniter.Marshal(notification) raw, _ := jsoniter.Marshal(notification)
for _, conn := range WsConn[notification.RecipientID] {
_ = conn.WriteMessage(1, raw)
}
var subscribers []models.NotificationSubscriber var subscribers []models.NotificationSubscriber
if err := database.C.Where(&models.NotificationSubscriber{ if err := database.C.Where(&models.NotificationSubscriber{

View File

@ -1,8 +1,8 @@
<template> <template>
<v-menu eager :close-on-content-click="false"> <v-menu eager :close-on-content-click="false">
<template #activator="{ props }"> <template #activator="{ props }">
<v-btn v-bind="props" stacked rounded="circle" size="small" variant="text" :loading="loading"> <v-btn v-bind="props" icon size="small" variant="text" :loading="loading">
<v-badge v-if="pagination.total > 0" color="error" :content="pagination.total"> <v-badge v-if="notify.total > 0" color="error" :content="notify.total">
<v-icon icon="mdi-bell" /> <v-icon icon="mdi-bell" />
</v-badge> </v-badge>
@ -10,19 +10,19 @@
</v-btn> </v-btn>
</template> </template>
<v-list v-if="notifications.length <= 0" class="w-[380px]" density="compact"> <v-list v-if="notify.notifications.length <= 0" class="w-[380px]" density="compact">
<v-list-item> <v-list-item>
<v-alert class="text-sm" variant="tonal" type="info">You are done! There is no unread notifications for you.</v-alert> <v-alert class="text-sm" variant="tonal" type="info">You are done! There is no unread notifications for you.</v-alert>
</v-list-item> </v-list-item>
</v-list> </v-list>
<v-list v-else class="w-[380px]" density="compact" lines="three"> <v-list v-else class="w-[380px]" density="compact" lines="three">
<v-list-item v-for="item in notifications"> <v-list-item v-for="(item, idx) in notify.notifications">
<template #title>{{ item.subject }}</template> <template #title>{{ item.subject }}</template>
<template #subtitle>{{ item.content }}</template> <template #subtitle>{{ item.content }}</template>
<template #append> <template #append>
<v-btn icon="mdi-check" size="x-small" variant="text" :disabled="loading" @click="markAsRead(item)" /> <v-btn icon="mdi-check" size="x-small" variant="text" :disabled="loading" @click="markAsRead(item, idx)" />
</template> </template>
<div class="flex text-xs gap-1"> <div class="flex text-xs gap-1">
@ -39,39 +39,17 @@
<script setup lang="ts"> <script setup lang="ts">
import { request } from "@/scripts/request" import { request } from "@/scripts/request"
import { getAtk } from "@/stores/userinfo" import { getAtk } from "@/stores/userinfo"
import { reactive, ref } from "vue" import { computed, onMounted, onUnmounted, ref } from "vue";
import { useNotifications } from "@/stores/notifications";
const loading = ref(false) const notify = useNotifications()
const error = ref<string | null>(null) const error = ref<string | null>(null)
const submitting = ref(false)
const loading = computed(() => notify.loading || submitting.value)
const notifications = ref<any[]>([]) async function markAsRead(item: any, idx: number) {
const pagination = reactive({ page: 1, pageSize: 25, total: 0 }) submitting.value = true
async function readNotifications() {
loading.value = true
const res = await request(
"/api/notifications?" +
new URLSearchParams({
take: pagination.pageSize.toString(),
offset: ((pagination.page - 1) * pagination.pageSize).toString(),
}),
{
headers: { Authorization: `Bearer ${getAtk()}` },
},
)
if (res.status === 200) {
const data = await res.json()
notifications.value = data["data"]
pagination.total = data["count"]
}
loading.value = false
}
readNotifications()
async function markAsRead(item: any) {
loading.value = true
const res = await request(`/api/notifications/${item.id}/read`, { const res = await request(`/api/notifications/${item.id}/read`, {
method: "PUT", method: "PUT",
headers: { Authorization: `Bearer ${getAtk()}` }, headers: { Authorization: `Bearer ${getAtk()}` },
@ -79,9 +57,14 @@ async function markAsRead(item: any) {
if (res.status !== 200) { if (res.status !== 200) {
error.value = await res.text() error.value = await res.text()
} else { } else {
await readNotifications() notify.remove(idx)
error.value = null error.value = null
} }
loading.value = false submitting.value = false
} }
notify.list()
onMounted(() => notify.connect())
onUnmounted(() => notify.disconnect())
</script> </script>

View File

@ -0,0 +1,64 @@
import { defineStore } from "pinia";
import { ref } from "vue";
import { checkLoggedIn, getAtk } from "@/stores/userinfo";
import { request } from "@/scripts/request";
export const useNotifications = defineStore("notifications", () => {
let socket: WebSocket;
const loading = ref(false);
const notifications = ref<any[]>([]);
const total = ref(0)
async function list() {
loading.value = true;
const res = await request(
"/api/notifications?" +
new URLSearchParams({
take: (25).toString(),
offset: (0).toString()
}),
{
headers: { Authorization: `Bearer ${getAtk()}` }
}
);
if (res.status === 200) {
const data = await res.json();
notifications.value = data["data"];
total.value = data["count"];
}
loading.value = false;
}
function remove(idx: number) {
notifications.value.splice(idx, 1)
total.value--;
}
async function connect() {
if (!(checkLoggedIn())) return;
const uri = `ws://${window.location.host}/api/notifications/listen`;
socket = new WebSocket(uri + `?tk=${getAtk() as string}`);
socket.addEventListener("open", (event) => {
console.log("[NOTIFICATIONS] The listen websocket has been established... ", event.type);
});
socket.addEventListener("close", (event) => {
console.warn("[NOTIFICATIONS] The listen websocket is disconnected... ", event.reason, event.code);
});
socket.addEventListener("message", (event) => {
const data = JSON.parse(event.data);
notifications.value.push(data);
total.value++;
});
}
function disconnect() {
socket.close();
}
return { loading, notifications, total, list, remove, connect, disconnect };
});

View File

@ -1,22 +1,27 @@
import { fileURLToPath, URL } from "node:url" import { fileURLToPath, URL } from "node:url";
import { defineConfig } from "vite" import { defineConfig } from "vite";
import vue from "@vitejs/plugin-vue" import vue from "@vitejs/plugin-vue";
import vueJsx from "@vitejs/plugin-vue-jsx" import vueJsx from "@vitejs/plugin-vue-jsx";
import unocss from "unocss/vite" import unocss from "unocss/vite";
// https://vitejs.dev/config/ // https://vitejs.dev/config/
export default defineConfig({ export default defineConfig({
plugins: [vue(), vueJsx(), unocss()], plugins: [vue(), vueJsx(), unocss()],
resolve: { resolve: {
alias: { alias: {
"@": fileURLToPath(new URL("./src", import.meta.url)), "@": fileURLToPath(new URL("./src", import.meta.url))
}, }
}, },
server: { server: {
proxy: { proxy: {
"/api/notifications/listen": {
target: "ws://localhost:8444",
ws: true
},
"/api": "http://localhost:8444", "/api": "http://localhost:8444",
"/.well-known": "http://localhost:8444" "/.well-known": "http://localhost:8444"
} }
} }
}) });