✨ Typing status
This commit is contained in:
@ -1,7 +1,7 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"git.solsynth.dev/hydrogen/paperclip/pkg/proto"
|
||||
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
|
||||
"github.com/spf13/viper"
|
||||
"google.golang.org/grpc"
|
||||
health "google.golang.org/grpc/health/grpc_health_v1"
|
||||
@ -10,7 +10,7 @@ import (
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
proto.UnimplementedAttachmentsServer
|
||||
proto.UnimplementedStreamControllerServer
|
||||
}
|
||||
|
||||
var S *grpc.Server
|
||||
@ -19,6 +19,7 @@ func NewGRPC() {
|
||||
S = grpc.NewServer()
|
||||
|
||||
health.RegisterHealthServer(S, &Server{})
|
||||
proto.RegisterStreamControllerServer(S, &Server{})
|
||||
|
||||
reflection.Register(S)
|
||||
}
|
||||
|
51
pkg/internal/grpc/stream.go
Normal file
51
pkg/internal/grpc/stream.go
Normal file
@ -0,0 +1,51 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
|
||||
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
|
||||
"git.solsynth.dev/hydrogen/messaging/pkg/internal/gap"
|
||||
"git.solsynth.dev/hydrogen/messaging/pkg/internal/server/exts"
|
||||
"git.solsynth.dev/hydrogen/messaging/pkg/internal/services"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
)
|
||||
|
||||
func (v *Server) EmitStreamEvent(_ context.Context, in *proto.StreamEventRequest) (*proto.StreamEventResponse, error) {
|
||||
sc := proto.NewStreamControllerClient(gap.H.GetDealerGrpcConn())
|
||||
|
||||
switch in.GetEvent() {
|
||||
case "status.typing":
|
||||
var data struct {
|
||||
ChannelID uint `json:"channel_id"`
|
||||
}
|
||||
|
||||
err := jsoniter.Unmarshal(in.GetPayload(), &data)
|
||||
if err == nil {
|
||||
err = exts.ValidateStruct(data)
|
||||
}
|
||||
if err != nil {
|
||||
sc.PushStream(context.Background(), &proto.PushStreamRequest{
|
||||
ClientId: &in.ClientId,
|
||||
Body: hyper.NetworkPackage{
|
||||
Action: "error",
|
||||
Message: fmt.Sprintf("unable parse payload: %v", err),
|
||||
}.Marshal(),
|
||||
})
|
||||
}
|
||||
|
||||
err = services.SetTypingStatus(data.ChannelID, uint(in.GetUserId()))
|
||||
if err != nil {
|
||||
sc.PushStream(context.Background(), &proto.PushStreamRequest{
|
||||
ClientId: &in.ClientId,
|
||||
Body: hyper.NetworkPackage{
|
||||
Action: "error",
|
||||
Message: fmt.Sprintf("unable boardcast status: %v", err),
|
||||
}.Marshal(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return &proto.StreamEventResponse{}, nil
|
||||
}
|
@ -16,3 +16,7 @@ func BindAndValidate(c *fiber.Ctx, out any) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ValidateStruct(in any) error {
|
||||
return validation.Struct(in)
|
||||
}
|
||||
|
61
pkg/internal/services/status.go
Normal file
61
pkg/internal/services/status.go
Normal file
@ -0,0 +1,61 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
|
||||
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
|
||||
"git.solsynth.dev/hydrogen/messaging/pkg/internal/database"
|
||||
"git.solsynth.dev/hydrogen/messaging/pkg/internal/gap"
|
||||
"git.solsynth.dev/hydrogen/messaging/pkg/internal/models"
|
||||
)
|
||||
|
||||
func SetTypingStatus(channelId uint, userId uint) error {
|
||||
var account models.Account
|
||||
if err := database.C.Where("id = ?", userId).First(&account).Error; err != nil {
|
||||
return fmt.Errorf("account not found: %v", err)
|
||||
}
|
||||
|
||||
var member models.ChannelMember
|
||||
if err := database.C.
|
||||
Where("account_id = ? AND channel_id = ?", userId, channelId).
|
||||
First(&member).Error; err != nil {
|
||||
return fmt.Errorf("channel member not found: %v", err)
|
||||
} else {
|
||||
member.Account = account
|
||||
}
|
||||
|
||||
var channel models.Channel
|
||||
if err := database.C.
|
||||
Preload("Members").
|
||||
Where("id = ?", channelId).
|
||||
First(&channel).Error; err != nil {
|
||||
return fmt.Errorf("channel not found: %v", err)
|
||||
}
|
||||
|
||||
var boardcastTarget []uint64
|
||||
for _, item := range channel.Members {
|
||||
if item.AccountID == member.AccountID {
|
||||
continue
|
||||
}
|
||||
boardcastTarget = append(boardcastTarget, uint64(item.AccountID))
|
||||
}
|
||||
|
||||
sc := proto.NewStreamControllerClient(gap.H.GetDealerGrpcConn())
|
||||
_, err := sc.PushStreamBatch(context.Background(), &proto.PushStreamBatchRequest{
|
||||
UserId: boardcastTarget,
|
||||
Body: hyper.NetworkPackage{
|
||||
Action: "status.typing",
|
||||
Payload: map[string]any{
|
||||
"user_id": userId,
|
||||
"member_id": member.ID,
|
||||
"channel_id": channelId,
|
||||
"member": member,
|
||||
"channel": channel,
|
||||
},
|
||||
}.Marshal(),
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
Reference in New Issue
Block a user