✨ Redis cache
This commit is contained in:
@ -2,14 +2,17 @@ package nex
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/proto"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type AllocatableResourceType = string
|
||||
|
||||
const (
|
||||
AllocatableResourceMq = AllocatableResourceType("mq")
|
||||
AllocatableResourceKv = AllocatableResourceType("kv")
|
||||
AllocatableResourceMq = AllocatableResourceType("mq")
|
||||
AllocatableResourceKv = AllocatableResourceType("kv")
|
||||
AllocatableResourceCache = AllocatableResourceType("cache")
|
||||
)
|
||||
|
||||
func (v *Conn) AllocResource(t AllocatableResourceType) any {
|
||||
@ -28,6 +31,17 @@ func (v *Conn) AllocResource(t AllocatableResourceType) any {
|
||||
return nil
|
||||
}
|
||||
return resp.Endpoints
|
||||
case AllocatableResourceCache:
|
||||
conn := v.GetNexusGrpcConn()
|
||||
resp, err := proto.NewAllocatorServiceClient(conn).AllocCache(context.Background(), &proto.AllocCacheRequest{})
|
||||
if err != nil || !resp.IsSuccess {
|
||||
return nil
|
||||
}
|
||||
return redis.NewClient(&redis.Options{
|
||||
Addr: resp.GetAddr(),
|
||||
Password: resp.GetPassword(),
|
||||
DB: int(resp.GetDb()),
|
||||
})
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
87
pkg/nex/cachekit/rdb.go
Normal file
87
pkg/nex/cachekit/rdb.go
Normal file
@ -0,0 +1,87 @@
|
||||
package cachekit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/nex"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type CaConn struct {
|
||||
n *nex.Conn
|
||||
Rd *redis.Client
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
func NewCaConn(conn *nex.Conn, timeout time.Duration) (*CaConn, error) {
|
||||
c := &CaConn{
|
||||
n: conn,
|
||||
Timeout: timeout,
|
||||
}
|
||||
|
||||
rdb := conn.AllocResource(nex.AllocatableResourceCache)
|
||||
if rdb == nil {
|
||||
return nil, fmt.Errorf("unable to allocate resource: cache")
|
||||
} else if client, ok := rdb.(*redis.Client); !ok {
|
||||
return nil, fmt.Errorf("allocated cache resource is not a redis client")
|
||||
} else {
|
||||
c.Rd = client
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *CaConn) withTimeout() (context.Context, context.CancelFunc) {
|
||||
return context.WithTimeout(context.Background(), c.Timeout)
|
||||
}
|
||||
|
||||
// Set stores a key-value pair in Redis with an optional expiration time
|
||||
func (c *CaConn) Set(key string, value any, ttl time.Duration) error {
|
||||
ctx, cancel := c.withTimeout()
|
||||
defer cancel()
|
||||
return c.Rd.Set(ctx, key, value, ttl).Err()
|
||||
}
|
||||
|
||||
// Get retrieves a value from Redis by key
|
||||
func (c *CaConn) Get(key string) (string, error) {
|
||||
ctx, cancel := c.withTimeout()
|
||||
defer cancel()
|
||||
return c.Rd.Get(ctx, key).Result()
|
||||
}
|
||||
|
||||
// Delete removes a key from Redis
|
||||
func (c *CaConn) Delete(key string) error {
|
||||
ctx, cancel := c.withTimeout()
|
||||
defer cancel()
|
||||
return c.Rd.Del(ctx, key).Err()
|
||||
}
|
||||
|
||||
// Exists checks if a key exists in Redis
|
||||
func (c *CaConn) Exists(key string) (bool, error) {
|
||||
ctx, cancel := c.withTimeout()
|
||||
defer cancel()
|
||||
exists, err := c.Rd.Exists(ctx, key).Result()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return exists > 0, nil
|
||||
}
|
||||
|
||||
// ClearCacheByPrefix deletes all keys matching a given prefix
|
||||
func (c *CaConn) DeleteByPrefix(prefix string) error {
|
||||
ctx, cancel := c.withTimeout()
|
||||
defer cancel()
|
||||
|
||||
iter := c.Rd.Scan(ctx, 0, prefix+"*", 0).Iterator()
|
||||
for iter.Next(ctx) {
|
||||
if err := c.Rd.Del(ctx, iter.Val()).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := iter.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user