♻️ Refactored process manager

This commit is contained in:
LittleSheep 2024-01-17 14:34:08 +08:00
parent 3f434bfe46
commit 7ad17d9417
17 changed files with 3038 additions and 173 deletions

View File

@ -2,13 +2,14 @@ package deploy
import ( import (
"fmt" "fmt"
jsoniter "github.com/json-iterator/go"
"io" "io"
"os" "os"
"strings" "strings"
jsoniter "github.com/json-iterator/go"
"code.smartsheep.studio/goatworks/roadsign/pkg/cmd/rds/conn" "code.smartsheep.studio/goatworks/roadsign/pkg/cmd/rds/conn"
"code.smartsheep.studio/goatworks/roadsign/pkg/sign" "code.smartsheep.studio/goatworks/roadsign/pkg/navi"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
@ -72,7 +73,7 @@ var DeployCommands = []*cli.Command{
return fmt.Errorf("couldn't connect server: %s", err.Error()) return fmt.Errorf("couldn't connect server: %s", err.Error())
} }
var site sign.SiteConfig var site navi.SiteConfig
if file, err := os.Open(ctx.Args().Get(2)); err != nil { if file, err := os.Open(ctx.Args().Get(2)); err != nil {
return err return err
} else { } else {

View File

@ -8,6 +8,7 @@ import (
roadsign "code.smartsheep.studio/goatworks/roadsign/pkg" roadsign "code.smartsheep.studio/goatworks/roadsign/pkg"
"code.smartsheep.studio/goatworks/roadsign/pkg/hypertext" "code.smartsheep.studio/goatworks/roadsign/pkg/hypertext"
"code.smartsheep.studio/goatworks/roadsign/pkg/navi"
"code.smartsheep.studio/goatworks/roadsign/pkg/sideload" "code.smartsheep.studio/goatworks/roadsign/pkg/sideload"
"code.smartsheep.studio/goatworks/roadsign/pkg/sign" "code.smartsheep.studio/goatworks/roadsign/pkg/sign"
"github.com/google/uuid" "github.com/google/uuid"
@ -44,7 +45,7 @@ func main() {
} }
// Load & init sign // Load & init sign
if err := sign.ReadInConfig(viper.GetString("paths.configs")); err != nil { if err := navi.ReadInConfig(viper.GetString("paths.configs")); err != nil {
log.Panic().Err(err).Msg("An error occurred when loading configurations.") log.Panic().Err(err).Msg("An error occurred when loading configurations.")
} else { } else {
log.Info().Int("count", len(sign.App.Sites)).Msg("All configuration has been loaded.") log.Info().Int("count", len(sign.App.Sites)).Msg("All configuration has been loaded.")

View File

@ -3,7 +3,7 @@ package hypertext
import ( import (
"regexp" "regexp"
"code.smartsheep.studio/goatworks/roadsign/pkg/sign" "code.smartsheep.studio/goatworks/roadnavi/pkg/navi"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/samber/lo" "github.com/samber/lo"
) )
@ -16,7 +16,7 @@ func UseProxies(app *fiber.App) {
headers := ctx.GetReqHeaders() headers := ctx.GetReqHeaders()
// Filtering sites // Filtering sites
for _, site := range sign.App.Sites { for _, site := range navi.App.Sites {
// Matching rules // Matching rules
for _, rule := range site.Rules { for _, rule := range site.Rules {
if !lo.Contains(rule.Host, host) { if !lo.Contains(rule.Host, host) {
@ -89,7 +89,7 @@ func UseProxies(app *fiber.App) {
}) })
} }
func makeResponse(ctx *fiber.Ctx, site *sign.SiteConfig) error { func makeResponse(ctx *fiber.Ctx, site *navi.SiteConfig) error {
// Modify request // Modify request
for _, transformer := range site.Transformers { for _, transformer := range site.Transformers {
if err := transformer.TransformRequest(ctx); err != nil { if err := transformer.TransformRequest(ctx); err != nil {
@ -98,7 +98,7 @@ func makeResponse(ctx *fiber.Ctx, site *sign.SiteConfig) error {
} }
// Forward // Forward
err := sign.App.Forward(ctx, site) err := navi.App.Forward(ctx, site)
// Modify response // Modify response
for _, transformer := range site.Transformers { for _, transformer := range site.Transformers {

View File

@ -1,4 +1,4 @@
package sign package navi
import ( import (
"io" "io"

View File

@ -1,4 +1,4 @@
package sign package navi
import ( import (
"errors" "errors"

View File

@ -1,12 +1,12 @@
package sign package navi
import ( import (
"code.smartsheep.studio/goatworks/roadsign/pkg/sign/transformers"
"errors" "errors"
"math/rand" "math/rand"
"code.smartsheep.studio/goatworks/roadsign/pkg/navi/transformers"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
) )
type RoadApp struct { type RoadApp struct {
@ -18,14 +18,6 @@ func (v *RoadApp) Forward(ctx *fiber.Ctx, site *SiteConfig) error {
return errors.New("invalid configuration") return errors.New("invalid configuration")
} }
// Boot processes
for _, process := range site.Processes {
if err := process.BootProcess(); err != nil {
log.Warn().Err(err).Msgf("An error occurred when booting process (%s) for %s", process.ID, site.ID)
return fiber.ErrBadGateway
}
}
// Do forward // Do forward
idx := rand.Intn(len(site.Upstreams)) idx := rand.Intn(len(site.Upstreams))
upstream := site.Upstreams[idx] upstream := site.Upstreams[idx]
@ -47,7 +39,6 @@ type SiteConfig struct {
Rules []*RouterRule `json:"rules" yaml:"rules"` Rules []*RouterRule `json:"rules" yaml:"rules"`
Transformers []*RequestTransformerConfig `json:"transformers" yaml:"transformers"` Transformers []*RequestTransformerConfig `json:"transformers" yaml:"transformers"`
Upstreams []*UpstreamInstance `json:"upstreams" yaml:"upstreams"` Upstreams []*UpstreamInstance `json:"upstreams" yaml:"upstreams"`
Processes []*ProcessInstance `json:"processes" yaml:"processes"`
} }
type RouterRule struct { type RouterRule struct {

View File

@ -1,4 +1,4 @@
package sign package navi
import ( import (
"fmt" "fmt"

View File

@ -5,7 +5,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"code.smartsheep.studio/goatworks/roadsign/pkg/sign" "code.smartsheep.studio/goatworks/roadsign/pkg/navi"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/samber/lo" "github.com/samber/lo"
@ -17,7 +17,7 @@ func doPublish(c *fiber.Ctx) error {
var site *sign.SiteConfig var site *sign.SiteConfig
var upstream *sign.UpstreamInstance var upstream *sign.UpstreamInstance
var process *sign.ProcessInstance var process *sign.ProcessInstance
for _, item := range sign.App.Sites { for _, item := range navi.App.Sites {
if item.ID == c.Params("site") { if item.ID == c.Params("site") {
site = item site = item
for _, stream := range item.Upstreams { for _, stream := range item.Upstreams {
@ -40,7 +40,7 @@ func doPublish(c *fiber.Ctx) error {
if upstream == nil && process == nil { if upstream == nil && process == nil {
return fiber.ErrNotFound return fiber.ErrNotFound
} else if upstream != nil && upstream.GetType() != sign.UpstreamTypeFile { } else if upstream != nil && upstream.GetType() != navi.UpstreamTypeFile {
return fiber.ErrUnprocessableEntity return fiber.ErrUnprocessableEntity
} }

View File

@ -5,6 +5,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"code.smartsheep.studio/goatworks/roadsign/pkg/navi"
"code.smartsheep.studio/goatworks/roadsign/pkg/sign" "code.smartsheep.studio/goatworks/roadsign/pkg/sign"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/samber/lo" "github.com/samber/lo"
@ -31,7 +32,7 @@ func getSiteConfig(c *fiber.Ctx) error {
} }
func doSyncSite(c *fiber.Ctx) error { func doSyncSite(c *fiber.Ctx) error {
var req sign.SiteConfig var req navi.SiteConfig
if err := c.BodyParser(&req); err != nil { if err := c.BodyParser(&req); err != nil {
return err return err

View File

@ -1,7 +1,6 @@
package sideload package sideload
import ( import (
"code.smartsheep.studio/goatworks/roadsign/pkg/sign"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/samber/lo" "github.com/samber/lo"
) )
@ -15,7 +14,7 @@ func getStatistics(c *fiber.Ctx) error {
}) })
unhealthy := lo.FlatMap(sign.App.Sites, func(item *sign.SiteConfig, idx int) []*sign.ProcessInstance { unhealthy := lo.FlatMap(sign.App.Sites, func(item *sign.SiteConfig, idx int) []*sign.ProcessInstance {
return lo.Filter(item.Processes, func(item *sign.ProcessInstance, idx int) bool { return lo.Filter(item.Processes, func(item *sign.ProcessInstance, idx int) bool {
return item.Status != sign.ProcessStarted return item.Status != navi.ProcessStarted
}) })
}) })

2907
pkg/sideload/view/yarn.lock Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1,144 +0,0 @@
package sign
import (
"fmt"
"github.com/samber/lo"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
)
type ProcessStatus = int8
const (
ProcessCreated = ProcessStatus(iota)
ProcessStarting
ProcessStarted
ProcessExited
ProcessFailure
)
type ProcessInstance struct {
ID string `json:"id" yaml:"id"`
Workdir string `json:"workdir" yaml:"workdir"`
Command []string `json:"command" yaml:"command"`
Environment []string `json:"environment" yaml:"environment"`
Prepares [][]string `json:"prepares" yaml:"prepares"`
Preheat bool `json:"preheat" yaml:"preheat"`
Cmd *exec.Cmd `json:"-"`
Logger strings.Builder `json:"-"`
Status ProcessStatus `json:"status"`
}
func (v *ProcessInstance) BootProcess() error {
if v.Cmd != nil {
return nil
}
if err := v.PrepareProcess(); err != nil {
return err
}
if v.Cmd == nil {
return v.StartProcess()
}
if v.Cmd.Process == nil || v.Cmd.ProcessState == nil {
return v.StartProcess()
}
if v.Cmd.ProcessState.Exited() {
return v.StartProcess()
} else if v.Cmd.ProcessState.Exited() {
return fmt.Errorf("process already dead")
}
if v.Cmd.ProcessState.Exited() {
return fmt.Errorf("cannot start process")
} else {
return nil
}
}
func (v *ProcessInstance) PrepareProcess() error {
for _, script := range v.Prepares {
if len(script) <= 0 {
continue
}
cmd := exec.Command(script[0], script[1:]...)
cmd.Dir = filepath.Join(v.Workdir)
if err := cmd.Run(); err != nil {
return err
}
}
return nil
}
func (v *ProcessInstance) StartProcess() error {
if len(v.Command) <= 0 {
return fmt.Errorf("you need set the command for %s to enable process manager", v.ID)
}
v.Cmd = exec.Command(v.Command[0], v.Command[1:]...)
v.Cmd.Dir = filepath.Join(v.Workdir)
v.Cmd.Env = append(v.Cmd.Env, v.Environment...)
v.Cmd.Stdout = &v.Logger
v.Cmd.Stderr = &v.Logger
// Monitor
go func() {
for {
if v.Cmd.Process == nil || v.Cmd.ProcessState == nil {
v.Status = ProcessStarting
} else if !v.Cmd.ProcessState.Exited() {
v.Status = ProcessStarted
} else {
v.Status = lo.Ternary(v.Cmd.ProcessState.Success(), ProcessExited, ProcessFailure)
return
}
time.Sleep(100 * time.Millisecond)
}
}()
return v.Cmd.Start()
}
func (v *ProcessInstance) StopProcess() error {
if v.Cmd != nil && v.Cmd.Process != nil {
if err := v.Cmd.Process.Signal(os.Interrupt); err != nil {
v.Cmd.Process.Kill()
return err
} else {
v.Cmd = nil
}
}
return nil
}
func (v *ProcessInstance) GetLogs() string {
return v.Logger.String()
}
func (v *RoadApp) PreheatProcesses(callbacks ...func(total int, success int)) {
var processes []*ProcessInstance
for _, site := range v.Sites {
for _, process := range site.Processes {
if process.Preheat {
processes = append(processes, process)
}
}
}
success := 0
for _, process := range processes {
if process.BootProcess() == nil {
success++
}
}
if len(callbacks) > 0 {
for _, callback := range callbacks {
callback(len(processes), success)
}
}
}

101
pkg/warden/executor.go Normal file
View File

@ -0,0 +1,101 @@
package warden
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/samber/lo"
)
type AppStatus = int8
const (
AppCreated = AppStatus(iota)
AppStarting
AppStarted
AppExited
AppFailure
)
type WardenInstance struct {
Manifest WardenApplication `json:"manifest"`
Cmd *exec.Cmd `json:"-"`
Logger strings.Builder `json:"-"`
Status AppStatus `json:"status"`
}
func (v *WardenInstance) Wake() error {
if v.Cmd != nil {
return nil
}
if v.Cmd == nil {
return v.Start()
}
if v.Cmd.Process == nil || v.Cmd.ProcessState == nil {
return v.Start()
}
if v.Cmd.ProcessState.Exited() {
return v.Start()
} else if v.Cmd.ProcessState.Exited() {
return fmt.Errorf("process already dead")
}
if v.Cmd.ProcessState.Exited() {
return fmt.Errorf("cannot start process")
} else {
return nil
}
}
func (v *WardenInstance) Start() error {
manifest := v.Manifest
if len(manifest.Command) <= 0 {
return fmt.Errorf("you need set the command for %s to enable process manager", manifest.ID)
}
v.Cmd = exec.Command(manifest.Command[0], manifest.Command[1:]...)
v.Cmd.Dir = filepath.Join(manifest.Workdir)
v.Cmd.Env = append(v.Cmd.Env, manifest.Environment...)
v.Cmd.Stdout = &v.Logger
v.Cmd.Stderr = &v.Logger
// Monitor
go func() {
for {
if v.Cmd.Process == nil || v.Cmd.ProcessState == nil {
v.Status = AppStarting
} else if !v.Cmd.ProcessState.Exited() {
v.Status = AppStarted
} else {
v.Status = lo.Ternary(v.Cmd.ProcessState.Success(), AppExited, AppFailure)
return
}
time.Sleep(100 * time.Millisecond)
}
}()
return v.Cmd.Start()
}
func (v *WardenInstance) Stop() error {
if v.Cmd != nil && v.Cmd.Process != nil {
if err := v.Cmd.Process.Signal(os.Interrupt); err != nil {
v.Cmd.Process.Kill()
return err
} else {
v.Cmd = nil
}
}
return nil
}
func (v *WardenInstance) Logs() string {
return v.Logger.String()
}

8
pkg/warden/manifest.go Normal file
View File

@ -0,0 +1,8 @@
package warden
type WardenApplication struct {
ID string `json:"id" yaml:"id"`
Workdir string `json:"workdir" yaml:"workdir"`
Command []string `json:"command" yaml:"command"`
Environment []string `json:"environment" yaml:"environment"`
}