✨ Websocket stream control
This commit is contained in:
@ -14,6 +14,7 @@ import (
|
||||
|
||||
type Server struct {
|
||||
proto.UnimplementedServiceDirectoryServer
|
||||
proto.UnimplementedStreamControllerServer
|
||||
proto.UnimplementedAuthServer
|
||||
|
||||
srv *grpc.Server
|
||||
@ -24,9 +25,10 @@ func NewServer() *Server {
|
||||
srv: grpc.NewServer(),
|
||||
}
|
||||
|
||||
proto.RegisterServiceDirectoryServer(server.srv, &Server{})
|
||||
proto.RegisterAuthServer(server.srv, &Server{})
|
||||
health.RegisterHealthServer(server.srv, &Server{})
|
||||
proto.RegisterServiceDirectoryServer(server.srv, server)
|
||||
proto.RegisterStreamControllerServer(server.srv, server)
|
||||
proto.RegisterAuthServer(server.srv, server)
|
||||
health.RegisterHealthServer(server.srv, server)
|
||||
|
||||
reflection.Register(server.srv)
|
||||
|
||||
|
29
pkg/internal/grpc/stream.go
Normal file
29
pkg/internal/grpc/stream.go
Normal file
@ -0,0 +1,29 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"git.solsynth.dev/hydrogen/dealer/pkg/internal/services"
|
||||
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
|
||||
)
|
||||
|
||||
func (v *Server) PushStream(ctx context.Context, request *proto.PushStreamRequest) (*proto.PushStreamResponse, error) {
|
||||
cnt, success, errs := services.WebsocketPush(uint(request.GetUserId()), request.GetBody())
|
||||
if len(errs) > 0 {
|
||||
// Partial fail
|
||||
return &proto.PushStreamResponse{
|
||||
IsAllSuccess: false,
|
||||
AffectedCount: int64(success),
|
||||
FailedCount: int64(cnt - success),
|
||||
}, nil
|
||||
} else if cnt > 0 && success == 0 {
|
||||
// All fail
|
||||
return nil, fmt.Errorf("all push request failed: %v", errs)
|
||||
}
|
||||
|
||||
return &proto.PushStreamResponse{
|
||||
IsAllSuccess: true,
|
||||
AffectedCount: int64(success),
|
||||
FailedCount: int64(cnt - success),
|
||||
}, nil
|
||||
}
|
@ -28,3 +28,15 @@ func ClientUnregister(user models.Account, conn *websocket.Conn) {
|
||||
delete(wsConn[user.ID], conn)
|
||||
wsMutex.Unlock()
|
||||
}
|
||||
|
||||
func WebsocketPush(uid uint, body []byte) (count int, success int, errs []error) {
|
||||
for conn := range wsConn[uid] {
|
||||
if err := conn.WriteMessage(1, body); err != nil {
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
success++
|
||||
}
|
||||
count++
|
||||
}
|
||||
return
|
||||
}
|
||||
|
Reference in New Issue
Block a user