From ab77a8383d383b15a349abcb2adc2ed341a809bb Mon Sep 17 00:00:00 2001 From: tangtaoit Date: Tue, 24 Sep 2024 22:14:24 +0800 Subject: [PATCH] refactor: backend services --- .gitignore | 4 +- cmd/root.go | 196 ++++++++++++++++++++++++++------ cmd/stop.go | 17 +-- internal/server/server.go | 4 + main.go | 10 ++ pkg/cluster/icluster/cluster.go | 3 + 6 files changed, 190 insertions(+), 44 deletions(-) diff --git a/.gitignore b/.gitignore index 4bb6cd4..6d9c4f4 100644 --- a/.gitignore +++ b/.gitignore @@ -42,4 +42,6 @@ wukongimdata wukongim*_data/ -external_ip.txt \ No newline at end of file +external_ip.txt + +wukongim.pid \ No newline at end of file diff --git a/cmd/root.go b/cmd/root.go index 2561675..3c9d237 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,8 +1,13 @@ package cmd import ( + "bytes" + "crypto/rand" + "errors" "fmt" + "io" "log" + "net" "os" "os/exec" "path" @@ -12,10 +17,9 @@ import ( "github.com/WuKongIM/WuKongIM/internal/server" "github.com/WuKongIM/WuKongIM/pkg/wklog" - "github.com/judwhite/go-svc" "github.com/spf13/cobra" "github.com/spf13/viper" - _ "go.uber.org/automaxprocs/maxprocs" + "go.uber.org/zap" ) @@ -25,7 +29,8 @@ var ( serverOpts = server.NewOptions() mode string daemon bool - pidfile string = "wukongimpid" + pidfile string = "wukongim.pid" + pingback string // pingback地址 installDir string initialed bool // 是否已经初始化成功 rootCmd = &cobra.Command{ @@ -35,8 +40,8 @@ var ( CompletionOptions: cobra.CompletionOptions{ DisableDefaultCmd: true, }, - Run: func(cmd *cobra.Command, args []string) { - initServer() + RunE: func(cmd *cobra.Command, args []string) error { + return cmdRun() }, } ) @@ -51,6 +56,7 @@ func init() { rootCmd.PersistentFlags().StringVar(&mode, "mode", "debug", "mode") // 后台运行 rootCmd.PersistentFlags().BoolVarP(&daemon, "daemon", "d", false, "run in daemon mode") + rootCmd.PersistentFlags().StringVar(&pingback, "pingback", "", "pingback address") } @@ -82,9 +88,9 @@ func initConfig() { initialed = true } -func initServer() { +func cmdRun() error { if !initialed { - return + return nil } logOpts := wklog.NewOptions() logOpts.Level = serverOpts.Logger.Level @@ -93,40 +99,158 @@ func initServer() { wklog.Configure(logOpts) s := server.New(serverOpts) - if daemon { - filePath, _ := filepath.Abs(os.Args[0]) - args := os.Args[1:] - newArgs := make([]string, 0) - for _, arg := range args { - if arg == "-d" || arg == "--daemon" { - continue - } - newArgs = append(newArgs, arg) - } - cmd := exec.Command(filePath, newArgs...) - // 将其他命令传入生成出的进程 - // cmd.Stdin = os.Stdin // 给新进程设置文件描述符,可以重定向到文件中 - // cmd.Stdout = os.Stdout - // cmd.Stderr = os.Stderr - fmt.Println("Root Dir:", serverOpts.RootDir) - fmt.Println("Config File:", cfgFile) - fmt.Println("WuKongIM is running in daemon mode") - err := cmd.Start() // 开始执行新进程,不等待新进程退出 - if err != nil { - log.Fatal(err) - } - err = os.WriteFile(path.Join(serverOpts.RootDir, pidfile), []byte(strconv.Itoa(cmd.Process.Pid)), 0644) - if err != nil { - log.Fatal(err) - } - os.Exit(0) + if daemon { // 后台运行 + // 以子进程方式启动 + fmt.Println("start as child process") + startAsChildProcess() } else { - if err := svc.Run(s); err != nil { - log.Fatal(err) + + err := s.Start() + if err != nil { + wklog.Error("start server error", zap.Error(err)) + return err } + // 等待集群准备好 + s.MustWaitAllSlotsReady() + + if pingback != "" { + confirmationBytes, err := io.ReadAll(os.Stdin) + if err != nil { + wklog.Error("read confirmation error from stdin", zap.Error(err)) + return err + } + conn, err := net.Dial("tcp", pingback) + if err != nil { + wklog.Error("dialing confirmation address", zap.Error(err)) + return err + } + defer conn.Close() + + _, err = conn.Write(confirmationBytes) + if err != nil { + wklog.Error("write confirmation error", zap.Error(err)) + return err + } + err = os.WriteFile(path.Join(".", pidfile), []byte(strconv.Itoa(os.Getpid())), 0o600) + if err != nil { + log.Fatal(err) + } + } + + select {} + + } + return nil +} + +func newRunCmd(listener net.Listener) (*exec.Cmd, error) { + filePath, _ := filepath.Abs(os.Args[0]) + args := os.Args[1:] + newArgs := make([]string, 0) + if listener != nil { + newArgs = append(newArgs, "--pingback", listener.Addr().String()) + } + for _, arg := range args { + if arg == "-d" || arg == "--daemon" { + continue + } + newArgs = append(newArgs, arg) } + cmd := exec.Command(filePath, newArgs...) + + // 允许相对路径运行 + if errors.Is(cmd.Err, exec.ErrDot) { + cmd.Err = nil + } + + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + return cmd, nil } + +func startAsChildProcess() { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + wklog.Panic("listen error", zap.Error(err)) + } + defer ln.Close() + + cmd, err := newRunCmd(ln) + if err != nil { + wklog.Panic("new cmd failed", zap.Error(err)) + } + + stdinPipe, err := cmd.StdinPipe() + if err != nil { + wklog.Panic("get stdin pipe failed", zap.Error(err)) + } + + // 生成一个随机的32字节的数据,用于验证子进程是否启动成功 + expect := make([]byte, 32) + _, err = rand.Read(expect) + if err != nil { + wklog.Panic("rand read error", zap.Error(err)) + } + + go func() { + _, _ = stdinPipe.Write(expect) + stdinPipe.Close() + }() + + // start the process + err = cmd.Start() + if err != nil { + wklog.Panic("starting WuKongIM process failed", zap.Error(err)) + } + + success, exit := make(chan struct{}), make(chan error) + + // 开启一个goroutine监听子进程是否启动成功 + go func() { + for { + conn, err := ln.Accept() + if err != nil { + if !errors.Is(err, net.ErrClosed) { + log.Println(err) + } + break + } + err = handlePingbackConn(conn, expect) + if err == nil { + close(success) + break + } + log.Println(err) + } + }() + + go func() { + err := cmd.Wait() + exit <- err + }() + + select { + case <-success: + wklog.Info("Successfully started WuKongIM - is running in the background", zap.Int("pid", cmd.Process.Pid)) + case err := <-exit: + wklog.Error("WuKongIM process exited with error", zap.Error(err)) + } +} + +func handlePingbackConn(conn net.Conn, expect []byte) error { + defer conn.Close() + confirmationBytes, err := io.ReadAll(io.LimitReader(conn, 32)) + if err != nil { + return err + } + if !bytes.Equal(confirmationBytes, expect) { + return fmt.Errorf("wrong confirmation: %x", confirmationBytes) + } + return nil +} + func addCommand(cmd CMD) { rootCmd.AddCommand(cmd.CMD()) } diff --git a/cmd/stop.go b/cmd/stop.go index c813917..bad4b65 100644 --- a/cmd/stop.go +++ b/cmd/stop.go @@ -3,9 +3,9 @@ package cmd import ( "fmt" "os" - "os/exec" "path" + "github.com/WuKongIM/WuKongIM/pkg/wkutil" "github.com/spf13/cobra" ) @@ -29,16 +29,19 @@ func (s *stopCMD) CMD() *cobra.Command { } func (s *stopCMD) run(cmd *cobra.Command, args []string) error { - strb, _ := os.ReadFile(path.Join(installDir, pidfile)) - command := exec.Command("kill", string(strb)) - err := command.Start() + strb, _ := os.ReadFile(path.Join(".", pidfile)) + + pid := wkutil.ParseInt(string(strb)) + process, err := os.FindProcess(pid) if err != nil { - fmt.Println("Error: ", err) return err } - err = command.Wait() + if process == nil { + return nil + } + + err = process.Kill() if err != nil { - fmt.Println("Error: ", err) return err } fmt.Println("WuKongIM server stopped") diff --git a/internal/server/server.go b/internal/server/server.go index e11e247..fc82bbb 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -361,6 +361,10 @@ func (s *Server) MustWaitClusterReady() { s.cluster.MustWaitClusterReady() } +func (s *Server) MustWaitAllSlotsReady() { + s.cluster.MustWaitAllSlotsReady() +} + // 提案频道分布式 func (s *Server) ProposeChannelClusterConfig(ctx context.Context, cfg wkdb.ChannelClusterConfig) error { return s.clusterServer.ProposeChannelClusterConfig(ctx, cfg) diff --git a/main.go b/main.go index ac6df17..fcd593f 100644 --- a/main.go +++ b/main.go @@ -4,7 +4,10 @@ import ( "embed" "github.com/WuKongIM/WuKongIM/cmd" + "github.com/WuKongIM/WuKongIM/pkg/wklog" "github.com/WuKongIM/WuKongIM/version" + "go.uber.org/automaxprocs/maxprocs" + "go.uber.org/zap" ) //go:embed web/dist @@ -36,5 +39,12 @@ func main() { // 将进程标准出错重定向至文件,进程崩溃时运行时将向该文件记录协程调用栈信息 // syscall.Dup2(int(logFile.Fd()), int(os.Stderr.Fd())) + undo, err := maxprocs.Set() + defer undo() + if err != nil { + wklog.Warn("maxprocs set error", zap.Error(err)) + } + cmd.Execute() + } diff --git a/pkg/cluster/icluster/cluster.go b/pkg/cluster/icluster/cluster.go index 0013225..321de03 100644 --- a/pkg/cluster/icluster/cluster.go +++ b/pkg/cluster/icluster/cluster.go @@ -51,6 +51,9 @@ type Cluster interface { // 等待集群准备好 MustWaitClusterReady() + + // 等待所有槽准备好 + MustWaitAllSlotsReady() // Monitor 获取监控信息 // Monitor() IMonitor }