From df5676cbe4d8175d758b40bea1ee6eceb7c6030c Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Fri, 23 Aug 2024 19:08:07 +0800 Subject: [PATCH] :sparkles: Provide client id in stream push request --- pkg/internal/grpc/stream.go | 36 +++++-- pkg/internal/server/api/ws.go | 23 +++-- pkg/internal/services/connections.go | 56 ++++++++-- pkg/proto/stream.pb.go | 146 ++++++++++++++++----------- pkg/proto/stream.proto | 11 +- 5 files changed, 185 insertions(+), 87 deletions(-) diff --git a/pkg/internal/grpc/stream.go b/pkg/internal/grpc/stream.go index 410c69d..f675383 100644 --- a/pkg/internal/grpc/stream.go +++ b/pkg/internal/grpc/stream.go @@ -17,7 +17,17 @@ func (v *Server) CountStreamConnection(ctx context.Context, request *proto.Count } func (v *Server) PushStream(ctx context.Context, request *proto.PushStreamRequest) (*proto.PushStreamResponse, error) { - cnt, success, errs := services.WebsocketPush(uint(request.GetUserId()), request.GetBody()) + var cnt int + var success int + var errs []error + if request.UserId != nil { + cnt, success, errs = services.WebsocketPush(uint(request.GetUserId()), request.GetBody()) + } else if request.ClientId != nil { + cnt, success, errs = services.WebsocketPushDirect(request.GetClientId(), request.GetBody()) + } else { + return nil, fmt.Errorf("you must give one of the user id or client id") + } + if len(errs) > 0 { // Partial fail return &proto.PushStreamResponse{ @@ -38,12 +48,24 @@ func (v *Server) PushStream(ctx context.Context, request *proto.PushStreamReques } func (v *Server) PushStreamBatch(ctx context.Context, request *proto.PushStreamBatchRequest) (*proto.PushStreamResponse, error) { - cnt, success, errs := services.WebsocketPushBatch( - lo.Map(request.GetUserId(), func(item uint64, idx int) uint { - return uint(item) - }, - ), request.GetBody(), - ) + var cnt int + var success int + var errs []error + if len(request.UserId) != 0 { + cnt, success, errs = services.WebsocketPushBatch( + lo.Map(request.GetUserId(), func(item uint64, idx int) uint { + return uint(item) + }, + ), request.GetBody(), + ) + } + if len(request.ClientId) != 0 { + cCnt, cSuccess, cErrs := services.WebsocketPushBatchDirect(request.GetClientId(), request.GetBody()) + cnt += cCnt + success += cSuccess + errs = append(errs, cErrs...) + } + if len(errs) > 0 { // Partial fail return &proto.PushStreamResponse{ diff --git a/pkg/internal/server/api/ws.go b/pkg/internal/server/api/ws.go index 17d6f42..77603c1 100644 --- a/pkg/internal/server/api/ws.go +++ b/pkg/internal/server/api/ws.go @@ -3,6 +3,7 @@ package api import ( "context" "fmt" + "git.solsynth.dev/hydrogen/dealer/pkg/hyper" "git.solsynth.dev/hydrogen/dealer/pkg/internal/directory" "git.solsynth.dev/hydrogen/dealer/pkg/internal/models" @@ -18,8 +19,11 @@ func listenWebsocket(c *websocket.Conn) { user := c.Locals("user").(models.Account) // Push connection - services.ClientRegister(user, c) - log.Debug().Uint("user", user.ID).Msg("New websocket connection established...") + clientId := services.ClientRegister(user, c) + log.Debug(). + Uint("user", user.ID). + Uint64("clientId", clientId). + Msg("New websocket connection established...") // Event loop var mt int @@ -63,11 +67,11 @@ func listenWebsocket(c *websocket.Conn) { sc := proto.NewStreamControllerClient(pc) _, err = sc.EmitStreamEvent(context.Background(), &proto.StreamEventRequest{ - Event: packet.Action, - UserId: uint64(user.ID), - Payload: packet.RawPayload(), + Event: packet.Action, + UserId: uint64(user.ID), + ClientId: uint64(clientId), + Payload: packet.RawPayload(), }) - if err != nil { _ = c.WriteMessage(mt, hyper.NetworkPackage{ Action: "error", @@ -78,6 +82,9 @@ func listenWebsocket(c *websocket.Conn) { } // Pop connection - services.ClientUnregister(user, c) - log.Debug().Uint("user", user.ID).Msg("A websocket connection disconnected...") + services.ClientUnregister(user, clientId) + log.Debug(). + Uint("user", user.ID). + Uint64("clientId", clientId). + Msg("A websocket connection disconnected...") } diff --git a/pkg/internal/services/connections.go b/pkg/internal/services/connections.go index 6729f77..e716540 100644 --- a/pkg/internal/services/connections.go +++ b/pkg/internal/services/connections.go @@ -2,6 +2,7 @@ package services import ( "context" + "math/rand" "sync" "git.solsynth.dev/hydrogen/dealer/pkg/hyper" @@ -13,15 +14,16 @@ import ( var ( wsMutex sync.Mutex - wsConn = make(map[uint]map[*websocket.Conn]bool) + wsConn = make(map[uint]map[uint64]*websocket.Conn) ) -func ClientRegister(user models.Account, conn *websocket.Conn) { +func ClientRegister(user models.Account, conn *websocket.Conn) uint64 { wsMutex.Lock() if wsConn[user.ID] == nil { - wsConn[user.ID] = make(map[*websocket.Conn]bool) + wsConn[user.ID] = make(map[uint64]*websocket.Conn) } - wsConn[user.ID][conn] = true + clientId := rand.Uint64() + wsConn[user.ID][clientId] = conn wsMutex.Unlock() pc, err := directory.GetServiceInstanceByType(hyper.ServiceTypeAuthProvider).GetGrpcConn() @@ -31,14 +33,16 @@ func ClientRegister(user models.Account, conn *websocket.Conn) { UserId: uint64(user.ID), }) } + + return clientId } -func ClientUnregister(user models.Account, conn *websocket.Conn) { +func ClientUnregister(user models.Account, id uint64) { wsMutex.Lock() if wsConn[user.ID] == nil { - wsConn[user.ID] = make(map[*websocket.Conn]bool) + wsConn[user.ID] = make(map[uint64]*websocket.Conn) } - delete(wsConn[user.ID], conn) + delete(wsConn[user.ID], id) wsMutex.Unlock() pc, err := directory.GetServiceInstanceByType(hyper.ServiceTypeAuthProvider).GetGrpcConn() @@ -55,7 +59,7 @@ func ClientCount(uid uint) int { } func WebsocketPush(uid uint, body []byte) (count int, success int, errs []error) { - for conn := range wsConn[uid] { + for _, conn := range wsConn[uid] { if err := conn.WriteMessage(1, body); err != nil { errs = append(errs, err) } else { @@ -66,9 +70,9 @@ func WebsocketPush(uid uint, body []byte) (count int, success int, errs []error) return } -func WebsocketPushBatch(uidList []uint, body []byte) (count int, success int, errs []error) { - for _, uid := range uidList { - for conn := range wsConn[uid] { +func WebsocketPushDirect(clientId uint64, body []byte) (count int, success int, errs []error) { + for _, m := range wsConn { + if conn, ok := m[clientId]; ok { if err := conn.WriteMessage(1, body); err != nil { errs = append(errs, err) } else { @@ -79,3 +83,33 @@ func WebsocketPushBatch(uidList []uint, body []byte) (count int, success int, er } return } + +func WebsocketPushBatch(uidList []uint, body []byte) (count int, success int, errs []error) { + for _, uid := range uidList { + for _, conn := range wsConn[uid] { + if err := conn.WriteMessage(1, body); err != nil { + errs = append(errs, err) + } else { + success++ + } + count++ + } + } + return +} + +func WebsocketPushBatchDirect(clientIdList []uint64, body []byte) (count int, success int, errs []error) { + for _, clientId := range clientIdList { + for _, m := range wsConn { + if conn, ok := m[clientId]; ok { + if err := conn.WriteMessage(1, body); err != nil { + errs = append(errs, err) + } else { + success++ + } + count++ + } + } + } + return +} diff --git a/pkg/proto/stream.pb.go b/pkg/proto/stream.pb.go index c720b06..81144db 100644 --- a/pkg/proto/stream.pb.go +++ b/pkg/proto/stream.pb.go @@ -119,8 +119,9 @@ type PushStreamRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - UserId uint64 `protobuf:"varint,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` - Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` + UserId *uint64 `protobuf:"varint,1,opt,name=user_id,json=userId,proto3,oneof" json:"user_id,omitempty"` + ClientId *uint64 `protobuf:"varint,2,opt,name=client_id,json=clientId,proto3,oneof" json:"client_id,omitempty"` + Body []byte `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"` } func (x *PushStreamRequest) Reset() { @@ -156,8 +157,15 @@ func (*PushStreamRequest) Descriptor() ([]byte, []int) { } func (x *PushStreamRequest) GetUserId() uint64 { - if x != nil { - return x.UserId + if x != nil && x.UserId != nil { + return *x.UserId + } + return 0 +} + +func (x *PushStreamRequest) GetClientId() uint64 { + if x != nil && x.ClientId != nil { + return *x.ClientId } return 0 } @@ -174,8 +182,9 @@ type PushStreamBatchRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - UserId []uint64 `protobuf:"varint,1,rep,packed,name=user_id,json=userId,proto3" json:"user_id,omitempty"` - Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` + UserId []uint64 `protobuf:"varint,1,rep,packed,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + ClientId []uint64 `protobuf:"varint,2,rep,packed,name=client_id,json=clientId,proto3" json:"client_id,omitempty"` + Body []byte `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"` } func (x *PushStreamBatchRequest) Reset() { @@ -217,6 +226,13 @@ func (x *PushStreamBatchRequest) GetUserId() []uint64 { return nil } +func (x *PushStreamBatchRequest) GetClientId() []uint64 { + if x != nil { + return x.ClientId + } + return nil +} + func (x *PushStreamBatchRequest) GetBody() []byte { if x != nil { return x.Body @@ -292,9 +308,10 @@ type StreamEventRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Event string `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"` - UserId uint64 `protobuf:"varint,2,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` - Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3" json:"payload,omitempty"` + Event string `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"` + UserId uint64 `protobuf:"varint,2,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + ClientId uint64 `protobuf:"varint,3,opt,name=client_id,json=clientId,proto3" json:"client_id,omitempty"` + Payload []byte `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` } func (x *StreamEventRequest) Reset() { @@ -343,6 +360,13 @@ func (x *StreamEventRequest) GetUserId() uint64 { return 0 } +func (x *StreamEventRequest) GetClientId() uint64 { + if x != nil { + return x.ClientId + } + return 0 +} + func (x *StreamEventRequest) GetPayload() []byte { if x != nil { return x.Payload @@ -398,54 +422,61 @@ var file_stream_proto_rawDesc = []byte{ 0x52, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x64, 0x22, 0x2f, 0x0a, 0x17, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x40, 0x0a, 0x11, 0x50, 0x75, 0x73, - 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x17, - 0x0a, 0x07, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, - 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0x45, 0x0a, 0x16, 0x50, - 0x75, 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x03, 0x28, 0x04, 0x52, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, - 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, - 0x64, 0x79, 0x22, 0x84, 0x01, 0x0a, 0x12, 0x50, 0x75, 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x69, 0x73, 0x5f, - 0x61, 0x6c, 0x6c, 0x5f, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x0c, 0x69, 0x73, 0x41, 0x6c, 0x6c, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, - 0x25, 0x0a, 0x0e, 0x61, 0x66, 0x66, 0x65, 0x63, 0x74, 0x65, 0x64, 0x5f, 0x63, 0x6f, 0x75, 0x6e, - 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x61, 0x66, 0x66, 0x65, 0x63, 0x74, 0x65, - 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, - 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x66, 0x61, - 0x69, 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x5d, 0x0a, 0x12, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x14, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, - 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x64, 0x12, 0x18, - 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x15, 0x0a, 0x13, 0x53, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, - 0xcc, 0x02, 0x0a, 0x10, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, - 0x6c, 0x6c, 0x65, 0x72, 0x12, 0x58, 0x0a, 0x15, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1d, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x6e, 0x65, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, - 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x43, - 0x0a, 0x0a, 0x50, 0x75, 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x18, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, - 0x75, 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x12, 0x4d, 0x0a, 0x0f, 0x50, 0x75, 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x42, 0x61, 0x74, 0x63, 0x68, 0x12, 0x1d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, - 0x75, 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x75, - 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x12, 0x4a, 0x0a, 0x0f, 0x45, 0x6d, 0x69, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x1a, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, - 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x09, - 0x5a, 0x07, 0x2e, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x28, 0x03, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x81, 0x01, 0x0a, 0x11, 0x50, 0x75, + 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x1c, 0x0a, 0x07, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, + 0x48, 0x00, 0x52, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, 0x20, 0x0a, + 0x09, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, + 0x48, 0x01, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, + 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, + 0x6f, 0x64, 0x79, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x42, + 0x0c, 0x0a, 0x0a, 0x5f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x22, 0x62, 0x0a, + 0x16, 0x50, 0x75, 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x42, 0x61, 0x74, 0x63, 0x68, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x75, 0x73, 0x65, 0x72, 0x5f, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x03, 0x28, 0x04, 0x52, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x64, + 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, + 0x03, 0x28, 0x04, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x12, 0x0a, + 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, + 0x79, 0x22, 0x84, 0x01, 0x0a, 0x12, 0x50, 0x75, 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x24, 0x0a, 0x0e, 0x69, 0x73, 0x5f, 0x61, + 0x6c, 0x6c, 0x5f, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, + 0x52, 0x0c, 0x69, 0x73, 0x41, 0x6c, 0x6c, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x25, + 0x0a, 0x0e, 0x61, 0x66, 0x66, 0x65, 0x63, 0x74, 0x65, 0x64, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0d, 0x61, 0x66, 0x66, 0x65, 0x63, 0x74, 0x65, 0x64, + 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x66, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, + 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x66, 0x61, 0x69, + 0x6c, 0x65, 0x64, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x7a, 0x0a, 0x12, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, + 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x06, 0x75, 0x73, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1b, 0x0a, + 0x09, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, + 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, + 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x15, 0x0a, 0x13, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xcc, 0x02, 0x0a, 0x10, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, + 0x12, 0x58, 0x0a, 0x15, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, + 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1d, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x43, 0x0a, 0x0a, 0x50, 0x75, + 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x18, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x50, 0x75, 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, + 0x4d, 0x0a, 0x0f, 0x50, 0x75, 0x73, 0x68, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x42, 0x61, 0x74, + 0x63, 0x68, 0x12, 0x1d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x42, 0x61, 0x74, 0x63, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x75, 0x73, 0x68, 0x53, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4a, + 0x0a, 0x0f, 0x45, 0x6d, 0x69, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x76, 0x65, 0x6e, + 0x74, 0x12, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x3b, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -577,6 +608,7 @@ func file_stream_proto_init() { } } } + file_stream_proto_msgTypes[2].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/pkg/proto/stream.proto b/pkg/proto/stream.proto index 8045bd2..912f464 100644 --- a/pkg/proto/stream.proto +++ b/pkg/proto/stream.proto @@ -20,13 +20,15 @@ message CountConnectionResponse { } message PushStreamRequest { - uint64 user_id = 1; - bytes body = 2; + optional uint64 user_id = 1; + optional uint64 client_id = 2; + bytes body = 3; } message PushStreamBatchRequest { repeated uint64 user_id = 1; - bytes body = 2; + repeated uint64 client_id = 2; + bytes body = 3; } message PushStreamResponse { @@ -38,7 +40,8 @@ message PushStreamResponse { message StreamEventRequest { string event = 1; uint64 user_id = 2; - bytes payload = 3; + uint64 client_id = 3; + bytes payload = 4; } message StreamEventResponse {