Nex.Rx, and easier to use alloc mq apis

This commit is contained in:
LittleSheep 2024-10-24 23:52:38 +08:00
parent 23d14e4e46
commit 421834ae5c
6 changed files with 60 additions and 17 deletions

View File

@ -13,3 +13,7 @@ Cruda will help you to build a simplified database access layer based on the com
### Nex.Sec
The security part of nexus, including signing and validating the tokens and much more.
### Nex.Rx
The reactive part of nexus, such as streaming events, and message queues stuff.

View File

@ -10,9 +10,9 @@ import (
)
func (v *CrudConn) AllocDatabase(name string) (string, error) {
conn := v.Conn.GetNexusGrpcConn()
conn := v.n.GetNexusGrpcConn()
ctx := context.Background()
ctx = metadata.AppendToOutgoingContext(ctx, "client_id", v.Conn.Info.Id)
ctx = metadata.AppendToOutgoingContext(ctx, "client_id", v.n.Info.Id)
out, err := proto.NewDatabaseServiceClient(conn).AllocDatabase(ctx, &proto.AllocDatabaseRequest{
Name: name,
})
@ -24,13 +24,13 @@ func (v *CrudConn) AllocDatabase(name string) (string, error) {
if err != nil {
return "", err
}
v.db = db
v.Db = db
return dsn, nil
}
func MigrateModel[T any](v *CrudConn, model T) error {
if v.db == nil {
if v.Db == nil {
return fmt.Errorf("database has not been allocated")
}
return v.db.AutoMigrate(model)
return v.Db.AutoMigrate(model)
}

View File

@ -15,7 +15,7 @@ func AddModel[T any](v *CrudConn, model T, id, prefix string, tags []string) err
funcCmds := []string{".list", "", "", "", ""}
funcMethods := []string{"get", "get", "put", "patch", "delete"}
for idx, fn := range funcList {
if err := v.Conn.AddCommand(prefix+id+funcCmds[idx], funcMethods[idx], tags, fn(v)); err != nil {
if err := v.n.AddCommand(prefix+id+funcCmds[idx], funcMethods[idx], tags, fn(v)); err != nil {
return err
}
}
@ -31,12 +31,12 @@ func cmdList[T any](c *CrudConn) nex.CommandHandler {
var str T
var count int64
if err := c.db.Model(str).Count(&count).Error; err != nil {
if err := c.Db.Model(str).Count(&count).Error; err != nil {
return err
}
var out []T
if err := c.db.Offset(skip).Limit(take).Find(&out).Error; err != nil {
if err := c.Db.Offset(skip).Limit(take).Find(&out).Error; err != nil {
return err
}
@ -55,7 +55,7 @@ func cmdGet[T any](c *CrudConn) nex.CommandHandler {
}
var out T
if err := c.db.First(&out, "id = ?", id).Error; err != nil {
if err := c.Db.First(&out, "id = ?", id).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return ctx.Write([]byte(err.Error()), "text/plain", http.StatusNotFound)
}
@ -75,7 +75,7 @@ func cmdCreate[T any](c *CrudConn) nex.CommandHandler {
return ctx.Write([]byte(err.Error()), "text/plain+error", http.StatusBadRequest)
}
if err := c.db.Create(&payload).Error; err != nil {
if err := c.Db.Create(&payload).Error; err != nil {
return err
}
@ -98,11 +98,11 @@ func cmdUpdate[T any](c *CrudConn) nex.CommandHandler {
}
var out T
if err := c.db.Model(out).Where("id = ?", id).Updates(&payload).Error; err != nil {
if err := c.Db.Model(out).Where("id = ?", id).Updates(&payload).Error; err != nil {
return err
}
if err := c.db.First(&out, "id = ?", id).Error; err != nil {
if err := c.Db.First(&out, "id = ?", id).Error; err != nil {
return err
}
@ -118,7 +118,7 @@ func cmdDelete[T any](c *CrudConn) nex.CommandHandler {
}
var out T
if err := c.db.Delete(&out, "id = ?", id).Error; err != nil {
if err := c.Db.Delete(&out, "id = ?", id).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return ctx.Write([]byte(err.Error()), "text/plain", http.StatusNotFound)
}

View File

@ -6,13 +6,13 @@ import (
)
type CrudConn struct {
Conn *nex.Conn
n *nex.Conn
db *gorm.DB
Db *gorm.DB
}
func NewCrudaConn(conn *nex.Conn) *CrudConn {
return &CrudConn{
Conn: conn,
n: conn,
}
}

32
pkg/nex/rx/mq_conn.go Normal file
View File

@ -0,0 +1,32 @@
package rx
import (
"fmt"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"github.com/nats-io/nats.go"
)
type MqConn struct {
n *nex.Conn
Nt *nats.Conn
}
func NewMqConn(conn *nex.Conn) (*MqConn, error) {
c := &MqConn{
n: conn,
}
mqAddr := conn.AllocResource(nex.AllocatableResourceMq)
if mqAddr == nil {
return nil, fmt.Errorf("unable to allocate resource: message queue")
} else if addr, ok := mqAddr.(string); !ok {
return nil, fmt.Errorf("alloced mq resource address is not a string")
} else if nc, err := nats.Connect(addr); err != nil {
return nil, fmt.Errorf("unable to connect to nats server: %v", err)
} else {
c.Nt = nc
}
return c, nil
}

7
pkg/nex/rx/mq_io.go Normal file
View File

@ -0,0 +1,7 @@
package rx
import "git.solsynth.dev/hypernet/nexus/pkg/nex"
func (v *MqConn) Publish(topic string, data any) error {
return v.Nt.Publish(topic, nex.EncodeMap(data))
}