From 421834ae5c4a1bbc9c846278166d2a9485e5f637 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Thu, 24 Oct 2024 23:52:38 +0800 Subject: [PATCH] :sparkles: Nex.Rx, and easier to use alloc mq apis --- pkg/nex/README.md | 6 +++++- pkg/nex/cruda/allocator.go | 10 +++++----- pkg/nex/cruda/command.go | 16 ++++++++-------- pkg/nex/cruda/conn.go | 6 +++--- pkg/nex/rx/mq_conn.go | 32 ++++++++++++++++++++++++++++++++ pkg/nex/rx/mq_io.go | 7 +++++++ 6 files changed, 60 insertions(+), 17 deletions(-) create mode 100644 pkg/nex/rx/mq_conn.go create mode 100644 pkg/nex/rx/mq_io.go diff --git a/pkg/nex/README.md b/pkg/nex/README.md index 8ec8bbc..c34e879 100644 --- a/pkg/nex/README.md +++ b/pkg/nex/README.md @@ -12,4 +12,8 @@ Cruda will help you to build a simplified database access layer based on the com ### Nex.Sec -The security part of nexus, including signing and validating the tokens and much more. \ No newline at end of file +The security part of nexus, including signing and validating the tokens and much more. + +### Nex.Rx + +The reactive part of nexus, such as streaming events, and message queues stuff. \ No newline at end of file diff --git a/pkg/nex/cruda/allocator.go b/pkg/nex/cruda/allocator.go index f39854a..ff6a769 100644 --- a/pkg/nex/cruda/allocator.go +++ b/pkg/nex/cruda/allocator.go @@ -10,9 +10,9 @@ import ( ) func (v *CrudConn) AllocDatabase(name string) (string, error) { - conn := v.Conn.GetNexusGrpcConn() + conn := v.n.GetNexusGrpcConn() ctx := context.Background() - ctx = metadata.AppendToOutgoingContext(ctx, "client_id", v.Conn.Info.Id) + ctx = metadata.AppendToOutgoingContext(ctx, "client_id", v.n.Info.Id) out, err := proto.NewDatabaseServiceClient(conn).AllocDatabase(ctx, &proto.AllocDatabaseRequest{ Name: name, }) @@ -24,13 +24,13 @@ func (v *CrudConn) AllocDatabase(name string) (string, error) { if err != nil { return "", err } - v.db = db + v.Db = db return dsn, nil } func MigrateModel[T any](v *CrudConn, model T) error { - if v.db == nil { + if v.Db == nil { return fmt.Errorf("database has not been allocated") } - return v.db.AutoMigrate(model) + return v.Db.AutoMigrate(model) } diff --git a/pkg/nex/cruda/command.go b/pkg/nex/cruda/command.go index 460a61b..e300973 100644 --- a/pkg/nex/cruda/command.go +++ b/pkg/nex/cruda/command.go @@ -15,7 +15,7 @@ func AddModel[T any](v *CrudConn, model T, id, prefix string, tags []string) err funcCmds := []string{".list", "", "", "", ""} funcMethods := []string{"get", "get", "put", "patch", "delete"} for idx, fn := range funcList { - if err := v.Conn.AddCommand(prefix+id+funcCmds[idx], funcMethods[idx], tags, fn(v)); err != nil { + if err := v.n.AddCommand(prefix+id+funcCmds[idx], funcMethods[idx], tags, fn(v)); err != nil { return err } } @@ -31,12 +31,12 @@ func cmdList[T any](c *CrudConn) nex.CommandHandler { var str T var count int64 - if err := c.db.Model(str).Count(&count).Error; err != nil { + if err := c.Db.Model(str).Count(&count).Error; err != nil { return err } var out []T - if err := c.db.Offset(skip).Limit(take).Find(&out).Error; err != nil { + if err := c.Db.Offset(skip).Limit(take).Find(&out).Error; err != nil { return err } @@ -55,7 +55,7 @@ func cmdGet[T any](c *CrudConn) nex.CommandHandler { } var out T - if err := c.db.First(&out, "id = ?", id).Error; err != nil { + if err := c.Db.First(&out, "id = ?", id).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return ctx.Write([]byte(err.Error()), "text/plain", http.StatusNotFound) } @@ -75,7 +75,7 @@ func cmdCreate[T any](c *CrudConn) nex.CommandHandler { return ctx.Write([]byte(err.Error()), "text/plain+error", http.StatusBadRequest) } - if err := c.db.Create(&payload).Error; err != nil { + if err := c.Db.Create(&payload).Error; err != nil { return err } @@ -98,11 +98,11 @@ func cmdUpdate[T any](c *CrudConn) nex.CommandHandler { } var out T - if err := c.db.Model(out).Where("id = ?", id).Updates(&payload).Error; err != nil { + if err := c.Db.Model(out).Where("id = ?", id).Updates(&payload).Error; err != nil { return err } - if err := c.db.First(&out, "id = ?", id).Error; err != nil { + if err := c.Db.First(&out, "id = ?", id).Error; err != nil { return err } @@ -118,7 +118,7 @@ func cmdDelete[T any](c *CrudConn) nex.CommandHandler { } var out T - if err := c.db.Delete(&out, "id = ?", id).Error; err != nil { + if err := c.Db.Delete(&out, "id = ?", id).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return ctx.Write([]byte(err.Error()), "text/plain", http.StatusNotFound) } diff --git a/pkg/nex/cruda/conn.go b/pkg/nex/cruda/conn.go index 38144a9..72fa213 100644 --- a/pkg/nex/cruda/conn.go +++ b/pkg/nex/cruda/conn.go @@ -6,13 +6,13 @@ import ( ) type CrudConn struct { - Conn *nex.Conn + n *nex.Conn - db *gorm.DB + Db *gorm.DB } func NewCrudaConn(conn *nex.Conn) *CrudConn { return &CrudConn{ - Conn: conn, + n: conn, } } diff --git a/pkg/nex/rx/mq_conn.go b/pkg/nex/rx/mq_conn.go new file mode 100644 index 0000000..98bfccb --- /dev/null +++ b/pkg/nex/rx/mq_conn.go @@ -0,0 +1,32 @@ +package rx + +import ( + "fmt" + "git.solsynth.dev/hypernet/nexus/pkg/nex" + "github.com/nats-io/nats.go" +) + +type MqConn struct { + n *nex.Conn + + Nt *nats.Conn +} + +func NewMqConn(conn *nex.Conn) (*MqConn, error) { + c := &MqConn{ + n: conn, + } + + mqAddr := conn.AllocResource(nex.AllocatableResourceMq) + if mqAddr == nil { + return nil, fmt.Errorf("unable to allocate resource: message queue") + } else if addr, ok := mqAddr.(string); !ok { + return nil, fmt.Errorf("alloced mq resource address is not a string") + } else if nc, err := nats.Connect(addr); err != nil { + return nil, fmt.Errorf("unable to connect to nats server: %v", err) + } else { + c.Nt = nc + } + + return c, nil +} diff --git a/pkg/nex/rx/mq_io.go b/pkg/nex/rx/mq_io.go new file mode 100644 index 0000000..4917e54 --- /dev/null +++ b/pkg/nex/rx/mq_io.go @@ -0,0 +1,7 @@ +package rx + +import "git.solsynth.dev/hypernet/nexus/pkg/nex" + +func (v *MqConn) Publish(topic string, data any) error { + return v.Nt.Publish(topic, nex.EncodeMap(data)) +}