91 lines
2.9 KiB
Go
91 lines
2.9 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"fmt"
|
|
"git.solsynth.dev/hypernet/nexus/pkg/nex/rx"
|
|
"git.solsynth.dev/hypernet/pusher/pkg/internal/gap"
|
|
"git.solsynth.dev/hypernet/pusher/pkg/internal/provider"
|
|
"git.solsynth.dev/hypernet/pusher/pkg/pushkit"
|
|
"github.com/go-playground/validator/v10"
|
|
"github.com/goccy/go-json"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
var validate = validator.New(validator.WithRequiredStructEnabled())
|
|
|
|
func SubscribeToQueue() error {
|
|
mq, err := rx.NewMqConn(gap.Nx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to initialize Nex.Rx connection: %v", err)
|
|
}
|
|
|
|
_, err = mq.Nt.Subscribe(pushkit.PushNotificationMqTopic, func(msg *nats.Msg) {
|
|
var req pushkit.NotificationPushRequest
|
|
if err := json.Unmarshal(msg.Data, &req); err != nil {
|
|
log.Warn().Err(err).Msg("Dropped a notify request, unable to parse request body")
|
|
return
|
|
} else if err := validate.Struct(&req); err != nil {
|
|
log.Warn().Err(err).Msg("Dropped a notify request, failed to validate request body")
|
|
return
|
|
}
|
|
|
|
provider.PushNotification(req)
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe notification topic: %v", err)
|
|
}
|
|
|
|
_, err = mq.Nt.Subscribe(pushkit.PushNotificationBatchMqTopic, func(msg *nats.Msg) {
|
|
var req pushkit.NotificationPushBatchRequest
|
|
if err := json.Unmarshal(msg.Data, &req); err != nil {
|
|
log.Warn().Err(err).Msg("Dropped a notify batch request, unable to parse request body")
|
|
return
|
|
} else if err := validate.Struct(&req); err != nil {
|
|
log.Warn().Err(err).Msg("Dropped a notify batch request, failed to validate request body")
|
|
return
|
|
}
|
|
|
|
provider.PushNotificationBatch(req)
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe notification batch topic: %v", err)
|
|
}
|
|
|
|
_, err = mq.Nt.Subscribe(pushkit.PushEmailMqTopic, func(msg *nats.Msg) {
|
|
var req pushkit.EmailDeliverRequest
|
|
if err := json.Unmarshal(msg.Data, &req); err != nil {
|
|
log.Warn().Err(err).Msg("Dropped a push email request, unable to parse request body")
|
|
return
|
|
} else if err := validate.Struct(&req); err != nil {
|
|
log.Warn().Err(err).Msg("Dropped a push email request, failed to validate request body")
|
|
return
|
|
}
|
|
|
|
provider.SendMail(req.To, req.Email)
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe email topic: %v", err)
|
|
}
|
|
|
|
_, err = mq.Nt.Subscribe(pushkit.PushEmailBatchMqTopic, func(msg *nats.Msg) {
|
|
var req pushkit.EmailDeliverBatchRequest
|
|
if err := json.Unmarshal(msg.Data, &req); err != nil {
|
|
log.Warn().Err(err).Msg("Dropped a push email batch request, unable to parse request body")
|
|
return
|
|
} else if err := validate.Struct(&req); err != nil {
|
|
log.Warn().Err(err).Msg("Dropped a push email batch request, failed to validate request body")
|
|
return
|
|
}
|
|
|
|
for _, to := range req.To {
|
|
_ = provider.SendMail(to, req.Email)
|
|
}
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe email batch topic: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|