🎉 Initial Commit

This commit is contained in:
2024-10-25 00:56:22 +08:00
commit 7597bff972
26 changed files with 2052 additions and 0 deletions

View File

@@ -0,0 +1,39 @@
package gap
import (
"fmt"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"git.solsynth.dev/hypernet/nexus/pkg/proto"
"github.com/rs/zerolog/log"
"strings"
"github.com/spf13/viper"
)
var Nx *nex.Conn
func InitializeToNexus() error {
grpcBind := strings.SplitN(viper.GetString("grpc_bind"), ":", 2)
outboundIp, _ := nex.GetOutboundIP()
grpcOutbound := fmt.Sprintf("%s:%s", outboundIp, grpcBind[1])
var err error
Nx, err = nex.NewNexusConn(viper.GetString("nexus_addr"), &proto.ServiceInfo{
Id: viper.GetString("id"),
Type: nex.ServiceTypePusher,
Label: "Pusher",
GrpcAddr: grpcOutbound,
})
if err == nil {
go func() {
err := Nx.RunRegistering()
if err != nil {
log.Error().Err(err).Msg("An error occurred while registering service...")
}
}()
}
return err
}

View File

@@ -0,0 +1,26 @@
package grpc
import (
"context"
health "google.golang.org/grpc/health/grpc_health_v1"
"time"
)
func (v *Server) Check(ctx context.Context, request *health.HealthCheckRequest) (*health.HealthCheckResponse, error) {
return &health.HealthCheckResponse{
Status: health.HealthCheckResponse_SERVING,
}, nil
}
func (v *Server) Watch(request *health.HealthCheckRequest, server health.Health_WatchServer) error {
for {
if server.Send(&health.HealthCheckResponse{
Status: health.HealthCheckResponse_SERVING,
}) != nil {
break
}
time.Sleep(1000 * time.Millisecond)
}
return nil
}

View File

@@ -0,0 +1,36 @@
package grpc
import (
"context"
"git.solsynth.dev/hypernet/pusher/pkg/internal/provider"
"git.solsynth.dev/hypernet/pusher/pkg/proto"
"git.solsynth.dev/hypernet/pusher/pkg/pushkit"
)
func (v *Server) PushNotification(ctx context.Context, request *proto.PushNotificationRequest) (*proto.DeliveryResponse, error) {
err := provider.PushNotification(pushkit.NotificationPushRequest{
Provider: request.GetProvider(),
Token: request.GetDeviceToken(),
Notification: pushkit.NewNotificationFromProto(request.GetNotify()),
})
return &proto.DeliveryResponse{IsSuccess: err == nil}, nil
}
func (v *Server) PushNotificationBatch(ctx context.Context, request *proto.PushNotificationBatchRequest) (*proto.DeliveryResponse, error) {
go provider.PushNotificationBatch(pushkit.NotificationPushBatchRequest{
Providers: request.GetProviders(),
Tokens: request.GetDeviceTokens(),
Notification: pushkit.NewNotificationFromProto(request.GetNotify()),
})
return &proto.DeliveryResponse{IsSuccess: true}, nil
}
func (v *Server) DeliverEmail(ctx context.Context, request *proto.DeliverEmailRequest) (*proto.DeliveryResponse, error) {
//TODO implement me
panic("implement me")
}
func (v *Server) DeliverEmailBatch(ctx context.Context, request *proto.DeliverEmailBatchRequest) (*proto.DeliveryResponse, error) {
//TODO implement me
panic("implement me")
}

View File

@@ -0,0 +1,39 @@
package grpc
import (
"git.solsynth.dev/hypernet/pusher/pkg/proto"
"github.com/spf13/viper"
"google.golang.org/grpc"
health "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
"net"
)
type Server struct {
proto.UnimplementedPusherServiceServer
health.UnimplementedHealthServer
srv *grpc.Server
}
func NewServer() *Server {
server := &Server{
srv: grpc.NewServer(),
}
proto.RegisterPusherServiceServer(server.srv, server)
health.RegisterHealthServer(server.srv, server)
reflection.Register(server.srv)
return server
}
func (v *Server) Listen() error {
listener, err := net.Listen("tcp", viper.GetString("grpc_bind"))
if err != nil {
return err
}
return v.srv.Serve(listener)
}

5
pkg/internal/meta.go Normal file
View File

@@ -0,0 +1,5 @@
package pkg
const (
AppVersion = "1.0.0"
)

View File

@@ -0,0 +1,48 @@
package provider
import (
"git.solsynth.dev/hypernet/pusher/pkg/pushkit"
"github.com/sideshow/apns2"
payload2 "github.com/sideshow/apns2/payload"
"github.com/spf13/viper"
)
type AppleNotifyProvider struct {
topic string
conn *apns2.Client
}
func (v *AppleNotifyProvider) Push(in pushkit.Notification, tk string) error {
data := payload2.
NewPayload().
AlertTitle(in.Title).
AlertBody(in.Body).
Category(in.Topic).
Custom("metadata", in.Metadata).
Sound("default").
MutableContent()
if len(in.Subtitle) > 0 {
data = data.AlertSubtitle(in.Subtitle)
}
if avatar, ok := in.Metadata["avatar"]; ok {
data = data.Custom("avatar", avatar)
}
if picture, ok := in.Metadata["picture"]; ok {
data = data.Custom("picture", picture)
}
rawData, err := data.MarshalJSON()
if err != nil {
return err
}
payload := &apns2.Notification{
DeviceToken: tk,
Topic: viper.GetString(v.topic),
Payload: rawData,
}
_, err := v.conn.Push(payload)
return err
}
func (v *AppleNotifyProvider) GetName() string {
return "apns"
}

View File

@@ -0,0 +1,67 @@
package provider
import (
"fmt"
"git.solsynth.dev/hypernet/pusher/pkg/pushkit"
"github.com/rs/zerolog/log"
"sync"
"time"
)
var notifyProviders = make(map[string]NotificationProvider)
func AddProvider(in NotificationProvider) {
notifyProviders[in.GetName()] = in
}
func PushNotification(in pushkit.NotificationPushRequest) error {
prov, ok := notifyProviders[in.Provider]
if !ok {
return fmt.Errorf("provider not found")
}
start := time.Now()
err := prov.Push(in.Notification, in.Token)
if err != nil {
log.Warn().Err(err).
Str("tk", in.Token).
Str("provider", prov.GetName()).
Dur("elapsed", time.Since(start)).
Msg("Push notification failed once")
} else {
log.Debug().
Str("tk", in.Token).
Str("provider", prov.GetName()).
Dur("elapsed", time.Since(start)).
Msg("Pushed one notification")
}
return err
}
func PushNotificationBatch(in pushkit.NotificationPushBatchRequest) {
var wg sync.WaitGroup
for idx, key := range in.Providers {
prov, ok := notifyProviders[key]
if !ok {
continue
}
go func() {
wg.Add(1)
defer wg.Done()
start := time.Now()
err := prov.Push(in.Notification, in.Tokens[idx])
if err != nil {
log.Warn().Err(err).
Str("tk", in.Tokens[idx]).
Str("provider", prov.GetName()).
Dur("elapsed", time.Since(start)).
Msg("Push notification failed once")
} else {
log.Debug().
Str("tk", in.Tokens[idx]).
Str("provider", prov.GetName()).
Dur("elapsed", time.Since(start)).
Msg("Pushed one notification")
}
}()
}
}

View File

@@ -0,0 +1,40 @@
package provider
import (
"context"
firebase "firebase.google.com/go"
"firebase.google.com/go/messaging"
"fmt"
"git.solsynth.dev/hypernet/pusher/pkg/pushkit"
)
type FirebaseNotifyProvider struct {
conn *firebase.App
}
func (v *FirebaseNotifyProvider) Push(in pushkit.Notification, tk string) error {
ctx := context.Background()
client, err := v.conn.Messaging(ctx)
if err != nil {
return fmt.Errorf("failed to create firebase client")
}
var subtitle string
if len(in.Subtitle) > 0 {
subtitle = "\n" + in.Subtitle
}
message := &messaging.Message{
Notification: &messaging.Notification{
Title: in.Title,
Body: subtitle + in.Body,
},
Token: tk,
}
_, err = client.Send(ctx, message)
return err
}
func (v *FirebaseNotifyProvider) GetName() string {
return "fcm"
}

View File

@@ -0,0 +1,36 @@
package provider
import (
"context"
firebase "firebase.google.com/go"
"github.com/sideshow/apns2"
"github.com/sideshow/apns2/token"
"google.golang.org/api/option"
)
func InitFCM(in string) error {
opt := option.WithCredentialsFile(in)
app, err := firebase.NewApp(context.Background(), nil, opt)
if err != nil {
return err
} else {
AddProvider(&FirebaseNotifyProvider{app})
}
return nil
}
func InitAPN(in, keyId, teamId, topic string) error {
authKey, err := token.AuthKeyFromFile(in)
if err != nil {
return err
} else {
AddProvider(&AppleNotifyProvider{topic, apns2.NewTokenClient(&token.Token{
AuthKey: authKey,
KeyID: keyId,
TeamID: teamId,
}).Production()})
}
return nil
}

View File

@@ -0,0 +1,11 @@
package provider
import (
"git.solsynth.dev/hypernet/pusher/pkg/pushkit"
)
type NotificationProvider interface {
Push(in pushkit.Notification, tk string) error
GetName() string
}

View File

@@ -0,0 +1,51 @@
package scheduler
import (
"fmt"
"git.solsynth.dev/hypernet/nexus/pkg/nex/rx"
"git.solsynth.dev/hypernet/pusher/pkg/internal/gap"
"git.solsynth.dev/hypernet/pusher/pkg/internal/provider"
"git.solsynth.dev/hypernet/pusher/pkg/pushkit"
"github.com/go-playground/validator/v10"
"github.com/goccy/go-json"
"github.com/nats-io/nats.go"
)
var validate = validator.New(validator.WithRequiredStructEnabled())
func SubscribeToQueue() error {
mq, err := rx.NewMqConn(gap.Nx)
if err != nil {
return fmt.Errorf("failed to initialize Nex.Rx connection: %v", err)
}
_, err = mq.Nt.Subscribe(pushkit.PushNotificationMqTopic, func(msg *nats.Msg) {
var req pushkit.NotificationPushRequest
if json.Unmarshal(msg.Data, &req) != nil {
return
} else if validate.Struct(&req) != nil {
return
}
go provider.PushNotification(req)
})
if err != nil {
return fmt.Errorf("failed to subscribe notification topic: %v", err)
}
_, err = mq.Nt.Subscribe(pushkit.PushNotificationBatchMqTopic, func(msg *nats.Msg) {
var req pushkit.NotificationPushBatchRequest
if json.Unmarshal(msg.Data, &req) != nil {
return
} else if validate.Struct(&req) != nil {
return
}
go provider.PushNotificationBatch(req)
})
if err != nil {
return fmt.Errorf("failed to subscribe notification batch topic: %v", err)
}
return nil
}