diff --git a/pkg/internal/grpc/postman.go b/pkg/internal/grpc/postman.go index 17fd228..043db0d 100644 --- a/pkg/internal/grpc/postman.go +++ b/pkg/internal/grpc/postman.go @@ -4,37 +4,50 @@ import ( "context" "git.solsynth.dev/hydrogen/dealer/pkg/internal/services" "git.solsynth.dev/hydrogen/dealer/pkg/proto" + "sync" ) func (v *Server) DeliverNotification(ctx context.Context, request *proto.DeliverNotificationRequest) (*proto.DeliverResponse, error) { - services.PublishDeliveryTask(request) + services.DealDeliveryTask(request) return &proto.DeliverResponse{}, nil } func (v *Server) DeliverNotificationBatch(ctx context.Context, request *proto.DeliverNotificationBatchRequest) (*proto.DeliverResponse, error) { + var wg sync.WaitGroup for idx, provider := range request.GetProviders() { token := request.GetDeviceTokens()[idx] - services.PublishDeliveryTask(&proto.DeliverNotificationRequest{ - Provider: provider, - DeviceToken: token, - Notify: request.GetNotify(), - }) + provider := provider + go func() { + wg.Add(1) + services.DealDeliveryTask(&proto.DeliverNotificationRequest{ + Provider: provider, + DeviceToken: token, + Notify: request.GetNotify(), + }) + wg.Done() + }() } return &proto.DeliverResponse{}, nil } func (v *Server) DeliverEmail(ctx context.Context, request *proto.DeliverEmailRequest) (*proto.DeliverResponse, error) { - services.PublishDeliveryTask(request) + services.DealDeliveryTask(request) return &proto.DeliverResponse{}, nil } func (v *Server) DeliverEmailBatch(ctx context.Context, request *proto.DeliverEmailBatchRequest) (*proto.DeliverResponse, error) { + var wg sync.WaitGroup for _, to := range request.GetTo() { - services.PublishDeliveryTask(&proto.DeliverEmailRequest{ - To: to, - Email: request.GetEmail(), - }) + to := to + go func() { + wg.Add(1) + services.DealDeliveryTask(&proto.DeliverEmailRequest{ + To: to, + Email: request.GetEmail(), + }) + wg.Done() + }() } return &proto.DeliverResponse{}, nil diff --git a/pkg/internal/services/postman.go b/pkg/internal/services/postman.go index 17f548b..51acd0d 100644 --- a/pkg/internal/services/postman.go +++ b/pkg/internal/services/postman.go @@ -17,29 +17,20 @@ import ( "time" ) -var deliveryTaskQueue = make(chan any, 256) - -func PublishDeliveryTask(task any) { - deliveryTaskQueue <- task -} - -func ConsumeDeliveryTasks() { - for { - task := <-deliveryTaskQueue - switch tk := task.(type) { - case *proto.DeliverEmailRequest: - if tk.GetEmail().HtmlBody != nil { - _ = SendMailHTML(tk.GetTo(), tk.GetEmail().GetSubject(), tk.GetEmail().GetHtmlBody()) - } else { - _ = SendMail(tk.GetTo(), tk.GetEmail().GetSubject(), tk.GetEmail().GetTextBody()) - } - case *proto.DeliverNotificationRequest: - switch tk.GetProvider() { - case "firebase": - _ = PushFirebaseNotify(tk.GetDeviceToken(), tk.GetNotify()) - case "apple": - _ = PushAppleNotify(tk.GetDeviceToken(), tk.GetNotify()) - } +func DealDeliveryTask(task any) { + switch tk := task.(type) { + case *proto.DeliverEmailRequest: + if tk.GetEmail().HtmlBody != nil { + _ = SendMailHTML(tk.GetTo(), tk.GetEmail().GetSubject(), tk.GetEmail().GetHtmlBody()) + } else { + _ = SendMail(tk.GetTo(), tk.GetEmail().GetSubject(), tk.GetEmail().GetTextBody()) + } + case *proto.DeliverNotificationRequest: + switch tk.GetProvider() { + case "firebase": + _ = PushFirebaseNotify(tk.GetDeviceToken(), tk.GetNotify()) + case "apple": + _ = PushAppleNotify(tk.GetDeviceToken(), tk.GetNotify()) } } } diff --git a/pkg/main.go b/pkg/main.go index d79b80d..fe219cc 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -41,11 +41,6 @@ func main() { log.Warn().Err(err).Msg("An error occurred when setup APNs, apple notification push is unavailable...") } - // Set up tasks queue consumers - for idx := 0; idx < max(1, viper.GetInt("performance.notification_deliver.worker_count")); idx++ { - go services.ConsumeDeliveryTasks() - } - // Server go server.NewServer().Listen() diff --git a/settings.toml b/settings.toml index fefff9f..8bc7901 100644 --- a/settings.toml +++ b/settings.toml @@ -16,9 +16,6 @@ firebase_credentials = "" use_registration_magic_token = false -[performance] -notification_deliver.worker_count = 4 - [dealer] addr = "127.0.0.1:8442"