🎉 Initial Commit
Some checks failed
release-nightly / build-docker (push) Has been cancelled

This commit is contained in:
2025-01-30 01:19:14 +08:00
commit 771227a9ed
27 changed files with 2152 additions and 0 deletions

24
pkg/internal/cache/store.go vendored Normal file
View File

@ -0,0 +1,24 @@
package cache
import (
"github.com/dgraph-io/ristretto"
"github.com/eko/gocache/lib/v4/store"
ristrettoCache "github.com/eko/gocache/store/ristretto/v4"
)
var S store.StoreInterface
func NewStore() error {
ristretto, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 1000,
MaxCost: 100,
BufferItems: 64,
})
if err != nil {
return err
}
S = ristrettoCache.NewRistretto(ristretto)
return nil
}

View File

@ -0,0 +1,17 @@
package database
import (
"gorm.io/gorm"
)
var AutoMaintainRange = []any{}
func RunMigration(source *gorm.DB) error {
if err := source.AutoMigrate(
AutoMaintainRange...,
); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,27 @@
package database
import (
"git.solsynth.dev/hypernet/nexus/pkg/nex/cruda"
"git.solsynth.dev/hypernet/insight/pkg/internal/gap"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
"github.com/spf13/viper"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
var C *gorm.DB
func NewGorm() error {
var err error
dsn, err := cruda.NewCrudaConn(gap.Nx).AllocDatabase("insight")
C, err = gorm.Open(postgres.Open(dsn), &gorm.Config{Logger: logger.New(&log.Logger, logger.Config{
Colorful: true,
IgnoreRecordNotFoundError: true,
LogLevel: lo.Ternary(viper.GetBool("debug.database"), logger.Info, logger.Silent),
})})
return err
}

View File

@ -0,0 +1,44 @@
package gap
import (
"fmt"
"strings"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"git.solsynth.dev/hypernet/nexus/pkg/proto"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
"github.com/spf13/viper"
)
var Nx *nex.Conn
func InitializeToNexus() error {
grpcBind := strings.SplitN(viper.GetString("grpc_bind"), ":", 2)
httpBind := strings.SplitN(viper.GetString("bind"), ":", 2)
outboundIp, _ := nex.GetOutboundIP()
grpcOutbound := fmt.Sprintf("%s:%s", outboundIp, grpcBind[1])
httpOutbound := fmt.Sprintf("%s:%s", outboundIp, httpBind[1])
var err error
Nx, err = nex.NewNexusConn(viper.GetString("nexus_addr"), &proto.ServiceInfo{
Id: viper.GetString("id"),
Type: "wa",
Label: "Insight",
GrpcAddr: grpcOutbound,
HttpAddr: lo.ToPtr("http://" + httpOutbound + "/api"),
})
if err == nil {
go func() {
err := Nx.RunRegistering()
if err != nil {
log.Error().Err(err).Msg("An error occurred while registering service...")
}
}()
}
return err
}

View File

@ -0,0 +1,26 @@
package grpc
import (
"context"
health "google.golang.org/grpc/health/grpc_health_v1"
"time"
)
func (v *Server) Check(ctx context.Context, request *health.HealthCheckRequest) (*health.HealthCheckResponse, error) {
return &health.HealthCheckResponse{
Status: health.HealthCheckResponse_SERVING,
}, nil
}
func (v *Server) Watch(request *health.HealthCheckRequest, server health.Health_WatchServer) error {
for {
if server.Send(&health.HealthCheckResponse{
Status: health.HealthCheckResponse_SERVING,
}) != nil {
break
}
time.Sleep(1000 * time.Millisecond)
}
return nil
}

View File

@ -0,0 +1,25 @@
package grpc
import (
"context"
"fmt"
"git.solsynth.dev/hypernet/insight/pkg/internal/services"
"git.solsynth.dev/hypernet/insight/pkg/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (v *Server) GenerateInsight(ctx context.Context, request *proto.InsightRequest) (*proto.InsightResponse, error) {
input := request.GetSource()
if err := services.PlaceOrder(uint(request.GetUserId()), len(input)); err != nil {
return nil, status.Errorf(codes.ResourceExhausted, fmt.Sprintf("failed to place order: %v", err))
}
out, err := services.GenerateInsights(input)
if err != nil {
_ = services.MakeRefund(uint(request.GetUserId()), len(input))
return nil, status.Errorf(codes.Internal, fmt.Sprintf("failed to generate insight: %v", err))
}
return &proto.InsightResponse{Response: out}, nil
}

View File

@ -0,0 +1,43 @@
package grpc
import (
"net"
iproto "git.solsynth.dev/hypernet/insight/pkg/proto"
"git.solsynth.dev/hypernet/nexus/pkg/proto"
"github.com/spf13/viper"
"google.golang.org/grpc"
health "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
)
type Server struct {
proto.UnimplementedDirectoryServiceServer
iproto.UnimplementedInsightServiceServer
health.UnimplementedHealthServer
srv *grpc.Server
}
func NewGrpc() *Server {
server := &Server{
srv: grpc.NewServer(),
}
proto.RegisterDirectoryServiceServer(server.srv, server)
iproto.RegisterInsightServiceServer(server.srv, server)
health.RegisterHealthServer(server.srv, server)
reflection.Register(server.srv)
return server
}
func (v *Server) Listen() error {
listener, err := net.Listen("tcp", viper.GetString("grpc_bind"))
if err != nil {
return err
}
return v.srv.Serve(listener)
}

View File

@ -0,0 +1,42 @@
package grpc
import (
"context"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"strconv"
"git.solsynth.dev/hypernet/nexus/pkg/proto"
"git.solsynth.dev/hypernet/insight/pkg/internal/database"
)
func (v *Server) BroadcastEvent(ctx context.Context, in *proto.EventInfo) (*proto.EventResponse, error) {
switch in.GetEvent() {
case "deletion":
data := nex.DecodeMap(in.GetData())
resType, ok := data["type"].(string)
if !ok {
break
}
switch resType {
case "account":
id, ok := data["id"].(string)
if !ok {
break
}
numericId, err := strconv.Atoi(id)
if err != nil {
break
}
tx := database.C.Begin()
for _, model := range database.AutoMaintainRange {
switch model.(type) {
default:
tx.Delete(model, "account_id = ?", numericId)
}
}
tx.Commit()
}
}
return &proto.EventResponse{}, nil
}

5
pkg/internal/meta.go Normal file
View File

@ -0,0 +1,5 @@
package pkg
const (
AppVersion = "1.0.0"
)

View File

@ -0,0 +1,19 @@
package api
import (
"git.solsynth.dev/hypernet/insight/pkg/internal/services"
"github.com/gofiber/fiber/v2"
)
func MapAPIs(app *fiber.App, baseURL string) {
api := app.Group(baseURL).Name("API")
{
api.Get("/status", func(c *fiber.Ctx) error {
err := services.PingOllama()
if err != nil {
return fiber.NewError(fiber.StatusServiceUnavailable, err.Error())
}
return c.SendStatus(fiber.StatusOK)
})
}
}

View File

@ -0,0 +1,18 @@
package exts
import (
"github.com/go-playground/validator/v10"
"github.com/gofiber/fiber/v2"
)
var validation = validator.New(validator.WithRequiredStructEnabled())
func BindAndValidate(c *fiber.Ctx, out any) error {
if err := c.BodyParser(out); err != nil {
return fiber.NewError(fiber.StatusBadRequest, err.Error())
} else if err := validation.Struct(out); err != nil {
return fiber.NewError(fiber.StatusBadRequest, err.Error())
}
return nil
}

View File

@ -0,0 +1,71 @@
package server
import (
"strings"
"git.solsynth.dev/hypernet/nexus/pkg/nex/sec"
"git.solsynth.dev/hypernet/insight/pkg/internal/server/api"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/idempotency"
"github.com/gofiber/fiber/v2/middleware/logger"
jsoniter "github.com/json-iterator/go"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
)
var IReader *sec.InternalTokenReader
type App struct {
app *fiber.App
}
func NewServer() *App {
app := fiber.New(fiber.Config{
DisableStartupMessage: true,
EnableIPValidation: true,
ServerHeader: "Hypernet.Insight",
AppName: "Hypernet.Insight",
ProxyHeader: fiber.HeaderXForwardedFor,
JSONEncoder: jsoniter.ConfigCompatibleWithStandardLibrary.Marshal,
JSONDecoder: jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal,
BodyLimit: 512 * 1024 * 1024 * 1024, // 512 TiB
EnablePrintRoutes: viper.GetBool("debug.print_routes"),
})
app.Use(idempotency.New())
app.Use(cors.New(cors.Config{
AllowCredentials: true,
AllowMethods: strings.Join([]string{
fiber.MethodGet,
fiber.MethodPost,
fiber.MethodHead,
fiber.MethodOptions,
fiber.MethodPut,
fiber.MethodDelete,
fiber.MethodPatch,
}, ","),
AllowOriginsFunc: func(origin string) bool {
return true
},
}))
app.Use(logger.New(logger.Config{
Format: "${status} | ${latency} | ${method} ${path}\n",
Output: log.Logger,
}))
app.Use(sec.ContextMiddleware(IReader))
api.MapAPIs(app, "/api")
return &App{app}
}
func (v *App) Listen() {
if err := v.app.Listen(viper.GetString("bind")); err != nil {
log.Fatal().Err(err).Msg("An error occurred when starting server...")
}
}

View File

@ -0,0 +1,24 @@
package services
import (
database2 "git.solsynth.dev/hypernet/insight/pkg/internal/database"
"time"
"github.com/rs/zerolog/log"
)
func DoAutoDatabaseCleanup() {
deadline := time.Now().Add(60 * time.Minute)
log.Debug().Time("deadline", deadline).Msg("Now cleaning up entire database...")
var count int64
for _, model := range database2.AutoMaintainRange {
tx := database2.C.Unscoped().Delete(model, "deleted_at >= ?", deadline)
if tx.Error != nil {
log.Error().Err(tx.Error).Msg("An error occurred when running auth context cleanup...")
}
count += tx.RowsAffected
}
log.Debug().Int64("affected", count).Msg("Clean up entire database accomplished.")
}

View File

@ -0,0 +1,59 @@
package services
import (
"context"
"fmt"
"net/http"
"time"
"github.com/spf13/viper"
"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/ollama"
"github.com/tmc/langchaingo/prompts"
)
func PingOllama() error {
host := viper.GetString("ollama.url")
resp, err := http.Get(host + "/api/version")
if err != nil {
return fmt.Errorf("failed to ping ollama: %v", err)
}
if resp.StatusCode != 200 {
return fmt.Errorf("ollama returned status code %d", resp.StatusCode)
}
return nil
}
var LargeModel *ollama.LLM
func ConnectOllama() error {
model := viper.GetString("ollama.model")
llm, err := ollama.New(ollama.WithModel(model))
if err != nil {
return err
}
LargeModel = llm
return nil
}
func GenerateInsights(source string) (string, error) {
prompt := prompts.NewPromptTemplate(
"Summerize this post on Solar Network below: {{.content}}",
[]string{"content"},
)
inPrompt, err := prompt.Format(map[string]any{
"content": source,
})
if err != nil {
return "", fmt.Errorf("failed to format prompt: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()
completion, err := LargeModel.Call(ctx, inPrompt,
llms.WithTemperature(0.8),
)
return completion, err
}

View File

@ -0,0 +1,69 @@
package services
import (
"context"
"fmt"
"time"
"git.solsynth.dev/hypernet/insight/pkg/internal/gap"
wproto "git.solsynth.dev/hypernet/wallet/pkg/proto"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
)
// PlaceOrder create a transaction if needed for user
// Pricing is 128 words input cost 10 source points, round up.
func PlaceOrder(user uint, inputLength int) error {
amount := float64(inputLength+128-1) / 128
conn, err := gap.Nx.GetClientGrpcConn("wa")
if err != nil {
return fmt.Errorf("unable to connect wallet: %v", err)
}
wc := wproto.NewPaymentServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
resp, err := wc.MakeTransactionWithAccount(ctx, &wproto.MakeTransactionWithAccountRequest{
PayerAccountId: lo.ToPtr(uint64(user)),
Amount: amount,
Remark: "Insight Thinking Fee",
})
if err != nil {
return err
}
log.Info().
Uint64("transaction", resp.Id).Float64("amount", amount).
Msg("Order placed for charge insight thinking fee...")
return nil
}
// MakeRefund to user who got error in generating insight
func MakeRefund(user uint, inputLength int) error {
amount := float64(inputLength+128-1) / 128
conn, err := gap.Nx.GetClientGrpcConn("wa")
if err != nil {
return fmt.Errorf("unable to connect wallet: %v", err)
}
wc := wproto.NewPaymentServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
resp, err := wc.MakeTransactionWithAccount(ctx, &wproto.MakeTransactionWithAccountRequest{
PayeeAccountId: lo.ToPtr(uint64(user)),
Amount: amount,
Remark: "Insight Thinking Failed - Refund",
})
if err != nil {
return err
}
log.Info().
Uint64("transaction", resp.Id).Float64("amount", amount).
Msg("Refund placed for insight thinking fee...")
return nil
}