refactor: backend services

This commit is contained in:
tangtaoit
2024-09-24 22:14:24 +08:00
parent 387d2dc7cc
commit ab77a8383d
6 changed files with 190 additions and 44 deletions

4
.gitignore vendored
View File

@@ -42,4 +42,6 @@ wukongimdata
wukongim*_data/
external_ip.txt
external_ip.txt
wukongim.pid

View File

@@ -1,8 +1,13 @@
package cmd
import (
"bytes"
"crypto/rand"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"os/exec"
"path"
@@ -12,10 +17,9 @@ import (
"github.com/WuKongIM/WuKongIM/internal/server"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/judwhite/go-svc"
"github.com/spf13/cobra"
"github.com/spf13/viper"
_ "go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
)
@@ -25,7 +29,8 @@ var (
serverOpts = server.NewOptions()
mode string
daemon bool
pidfile string = "wukongimpid"
pidfile string = "wukongim.pid"
pingback string // pingback地址
installDir string
initialed bool // 是否已经初始化成功
rootCmd = &cobra.Command{
@@ -35,8 +40,8 @@ var (
CompletionOptions: cobra.CompletionOptions{
DisableDefaultCmd: true,
},
Run: func(cmd *cobra.Command, args []string) {
initServer()
RunE: func(cmd *cobra.Command, args []string) error {
return cmdRun()
},
}
)
@@ -51,6 +56,7 @@ func init() {
rootCmd.PersistentFlags().StringVar(&mode, "mode", "debug", "mode")
// 后台运行
rootCmd.PersistentFlags().BoolVarP(&daemon, "daemon", "d", false, "run in daemon mode")
rootCmd.PersistentFlags().StringVar(&pingback, "pingback", "", "pingback address")
}
@@ -82,9 +88,9 @@ func initConfig() {
initialed = true
}
func initServer() {
func cmdRun() error {
if !initialed {
return
return nil
}
logOpts := wklog.NewOptions()
logOpts.Level = serverOpts.Logger.Level
@@ -93,40 +99,158 @@ func initServer() {
wklog.Configure(logOpts)
s := server.New(serverOpts)
if daemon {
filePath, _ := filepath.Abs(os.Args[0])
args := os.Args[1:]
newArgs := make([]string, 0)
for _, arg := range args {
if arg == "-d" || arg == "--daemon" {
continue
}
newArgs = append(newArgs, arg)
}
cmd := exec.Command(filePath, newArgs...)
// 将其他命令传入生成出的进程
// cmd.Stdin = os.Stdin // 给新进程设置文件描述符,可以重定向到文件中
// cmd.Stdout = os.Stdout
// cmd.Stderr = os.Stderr
fmt.Println("Root Dir:", serverOpts.RootDir)
fmt.Println("Config File:", cfgFile)
fmt.Println("WuKongIM is running in daemon mode")
err := cmd.Start() // 开始执行新进程,不等待新进程退出
if err != nil {
log.Fatal(err)
}
err = os.WriteFile(path.Join(serverOpts.RootDir, pidfile), []byte(strconv.Itoa(cmd.Process.Pid)), 0644)
if err != nil {
log.Fatal(err)
}
os.Exit(0)
if daemon { // 后台运行
// 以子进程方式启动
fmt.Println("start as child process")
startAsChildProcess()
} else {
if err := svc.Run(s); err != nil {
log.Fatal(err)
err := s.Start()
if err != nil {
wklog.Error("start server error", zap.Error(err))
return err
}
// 等待集群准备好
s.MustWaitAllSlotsReady()
if pingback != "" {
confirmationBytes, err := io.ReadAll(os.Stdin)
if err != nil {
wklog.Error("read confirmation error from stdin", zap.Error(err))
return err
}
conn, err := net.Dial("tcp", pingback)
if err != nil {
wklog.Error("dialing confirmation address", zap.Error(err))
return err
}
defer conn.Close()
_, err = conn.Write(confirmationBytes)
if err != nil {
wklog.Error("write confirmation error", zap.Error(err))
return err
}
err = os.WriteFile(path.Join(".", pidfile), []byte(strconv.Itoa(os.Getpid())), 0o600)
if err != nil {
log.Fatal(err)
}
}
select {}
}
return nil
}
func newRunCmd(listener net.Listener) (*exec.Cmd, error) {
filePath, _ := filepath.Abs(os.Args[0])
args := os.Args[1:]
newArgs := make([]string, 0)
if listener != nil {
newArgs = append(newArgs, "--pingback", listener.Addr().String())
}
for _, arg := range args {
if arg == "-d" || arg == "--daemon" {
continue
}
newArgs = append(newArgs, arg)
}
cmd := exec.Command(filePath, newArgs...)
// 允许相对路径运行
if errors.Is(cmd.Err, exec.ErrDot) {
cmd.Err = nil
}
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd, nil
}
func startAsChildProcess() {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
wklog.Panic("listen error", zap.Error(err))
}
defer ln.Close()
cmd, err := newRunCmd(ln)
if err != nil {
wklog.Panic("new cmd failed", zap.Error(err))
}
stdinPipe, err := cmd.StdinPipe()
if err != nil {
wklog.Panic("get stdin pipe failed", zap.Error(err))
}
// 生成一个随机的32字节的数据用于验证子进程是否启动成功
expect := make([]byte, 32)
_, err = rand.Read(expect)
if err != nil {
wklog.Panic("rand read error", zap.Error(err))
}
go func() {
_, _ = stdinPipe.Write(expect)
stdinPipe.Close()
}()
// start the process
err = cmd.Start()
if err != nil {
wklog.Panic("starting WuKongIM process failed", zap.Error(err))
}
success, exit := make(chan struct{}), make(chan error)
// 开启一个goroutine监听子进程是否启动成功
go func() {
for {
conn, err := ln.Accept()
if err != nil {
if !errors.Is(err, net.ErrClosed) {
log.Println(err)
}
break
}
err = handlePingbackConn(conn, expect)
if err == nil {
close(success)
break
}
log.Println(err)
}
}()
go func() {
err := cmd.Wait()
exit <- err
}()
select {
case <-success:
wklog.Info("Successfully started WuKongIM - is running in the background", zap.Int("pid", cmd.Process.Pid))
case err := <-exit:
wklog.Error("WuKongIM process exited with error", zap.Error(err))
}
}
func handlePingbackConn(conn net.Conn, expect []byte) error {
defer conn.Close()
confirmationBytes, err := io.ReadAll(io.LimitReader(conn, 32))
if err != nil {
return err
}
if !bytes.Equal(confirmationBytes, expect) {
return fmt.Errorf("wrong confirmation: %x", confirmationBytes)
}
return nil
}
func addCommand(cmd CMD) {
rootCmd.AddCommand(cmd.CMD())
}

View File

@@ -3,9 +3,9 @@ package cmd
import (
"fmt"
"os"
"os/exec"
"path"
"github.com/WuKongIM/WuKongIM/pkg/wkutil"
"github.com/spf13/cobra"
)
@@ -29,16 +29,19 @@ func (s *stopCMD) CMD() *cobra.Command {
}
func (s *stopCMD) run(cmd *cobra.Command, args []string) error {
strb, _ := os.ReadFile(path.Join(installDir, pidfile))
command := exec.Command("kill", string(strb))
err := command.Start()
strb, _ := os.ReadFile(path.Join(".", pidfile))
pid := wkutil.ParseInt(string(strb))
process, err := os.FindProcess(pid)
if err != nil {
fmt.Println("Error: ", err)
return err
}
err = command.Wait()
if process == nil {
return nil
}
err = process.Kill()
if err != nil {
fmt.Println("Error: ", err)
return err
}
fmt.Println("WuKongIM server stopped")

View File

@@ -361,6 +361,10 @@ func (s *Server) MustWaitClusterReady() {
s.cluster.MustWaitClusterReady()
}
func (s *Server) MustWaitAllSlotsReady() {
s.cluster.MustWaitAllSlotsReady()
}
// 提案频道分布式
func (s *Server) ProposeChannelClusterConfig(ctx context.Context, cfg wkdb.ChannelClusterConfig) error {
return s.clusterServer.ProposeChannelClusterConfig(ctx, cfg)

10
main.go
View File

@@ -4,7 +4,10 @@ import (
"embed"
"github.com/WuKongIM/WuKongIM/cmd"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/version"
"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
)
//go:embed web/dist
@@ -36,5 +39,12 @@ func main() {
// 将进程标准出错重定向至文件,进程崩溃时运行时将向该文件记录协程调用栈信息
// syscall.Dup2(int(logFile.Fd()), int(os.Stderr.Fd()))
undo, err := maxprocs.Set()
defer undo()
if err != nil {
wklog.Warn("maxprocs set error", zap.Error(err))
}
cmd.Execute()
}

View File

@@ -51,6 +51,9 @@ type Cluster interface {
// 等待集群准备好
MustWaitClusterReady()
// 等待所有槽准备好
MustWaitAllSlotsReady()
// Monitor 获取监控信息
// Monitor() IMonitor
}