refactor: cluster

This commit is contained in:
tangtaoit
2024-07-16 22:24:11 +08:00
parent 2b5df8c174
commit fd3bce6c45
17 changed files with 230 additions and 185 deletions

2
.gitignore vendored
View File

@@ -40,4 +40,4 @@ benchmarks/results/
wukongimdata
wukongim*_data/

View File

@@ -1,7 +1,7 @@
FROM golang:1.22 as build
ENV GOPROXY https://goproxy.cn,direct
ENV GO111MODULE on
ENV GOPROXY=https://goproxy.cn,direct
ENV GO111MODULE=on
# 安装 Node.js 和 Yarn
RUN curl -sS https://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add -

View File

@@ -17,28 +17,16 @@ deploy-arm:
docker push wukongim/wukongim:latest-arm64
deploy-v2-dev:
docker build -t wukongim .
docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.0.0-beta-20240428-dev
docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.0.0-beta-20240428-dev
docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.0.1-beta-20240715-dev
docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.0.1-beta-20240715-dev
deploy-v2:
docker build -t wukongim .
docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.0.0-beta-20240428
docker tag wukongim wukongim/wukongim:v2.0.0-beta-20240428
docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.0.0-beta-20240428
docker push wukongim/wukongim:v2.0.0-beta-20240428
docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.0.1-beta-20240715
docker tag wukongim wukongim/wukongim:v2.0.1-beta-20240715
docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.0.1-beta-20240715
docker push wukongim/wukongim:v2.0.1-beta-20240715
deploy-latest:
docker build -t wukongim .
<<<<<<< HEAD
docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:latest
docker tag wukongim wukongim/wukongim:latest
docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:latest
docker push wukongim/wukongim:latest
# deploy-v1.2.x:
# docker build -t wukongim .
# docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v1.2.2
# docker tag wukongim wukongim/wukongim:v1.2.2
# docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v1.2.2
# docker push wukongim/wukongim:v1.2.2
=======
docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v1.2
docker tag wukongim wukongim/wukongim:v1.2
docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v1.2
@@ -49,7 +37,6 @@ deploy-v1.2.x:
docker tag wukongim wukongim/wukongim:v1.2.5
docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v1.2.5
docker push wukongim/wukongim:v1.2.5
>>>>>>> main
# docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v1.2
# docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v1.2-dev

View File

@@ -1,6 +1,8 @@
package cmd
import "github.com/spf13/cobra"
import (
"github.com/spf13/cobra"
)
type WuKongIMContext struct {
}

View File

@@ -13,7 +13,6 @@ import (
"github.com/WuKongIM/WuKongIM/internal/server"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/judwhite/go-svc"
"github.com/sasha-s/go-deadlock"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
@@ -25,8 +24,9 @@ var (
serverOpts = server.NewOptions()
mode string
daemon bool
pidfile string = "WukongimPID"
pidfile string = "wukongimpid"
installDir string
initialed bool // 是否已经初始化成功
rootCmd = &cobra.Command{
Use: "wk",
Short: "WuKongIM, a sleek and high-performance instant messaging platform.",
@@ -42,12 +42,6 @@ var (
func init() {
homeDir, err := server.GetHomeDir()
if err != nil {
log.Fatal(err)
}
installDir = homeDir
cobra.OnInitialize(initConfig)
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file")
@@ -64,9 +58,11 @@ func initConfig() {
if strings.TrimSpace(cfgFile) != "" {
vp.SetConfigFile(cfgFile)
if err := vp.ReadInConfig(); err != nil {
wklog.Error("read config file error", zap.Error(err))
if !ignoreMissingConfig {
fmt.Println("read config file error: ", err)
panic(fmt.Errorf("read config file error: %s", err))
} else {
wklog.Error("read config file error", zap.Error(err))
}
}
}
@@ -79,17 +75,22 @@ func initConfig() {
serverOpts.Mode = server.Mode(mode)
}
serverOpts.ConfigureWithViper(vp)
installDir = serverOpts.RootDir
initialed = true
}
func initServer() {
if !initialed {
return
}
logOpts := wklog.NewOptions()
logOpts.Level = serverOpts.Logger.Level
logOpts.LogDir = serverOpts.Logger.Dir
logOpts.LineNum = serverOpts.Logger.LineNum
wklog.Configure(logOpts)
deadlock.Opts.Disable = true // 禁用deadlock检测
s := server.New(serverOpts)
if daemon {
filePath, _ := filepath.Abs(os.Args[0])
@@ -106,11 +107,13 @@ func initServer() {
// cmd.Stdin = os.Stdin // 给新进程设置文件描述符,可以重定向到文件中
// cmd.Stdout = os.Stdout
// cmd.Stderr = os.Stderr
fmt.Println("root dir:", serverOpts.RootDir)
fmt.Println("config file:", cfgFile)
err := cmd.Start() // 开始执行新进程,不等待新进程退出
if err != nil {
log.Fatal(err)
}
err = os.WriteFile(path.Join(installDir, pidfile), []byte(strconv.Itoa(cmd.Process.Pid)), 0644)
err = os.WriteFile(path.Join(serverOpts.RootDir, pidfile), []byte(strconv.Itoa(cmd.Process.Pid)), 0644)
if err != nil {
log.Fatal(err)
}

View File

@@ -1,6 +1,7 @@
package cmd
import (
"fmt"
"os"
"os/exec"
"path"
@@ -9,10 +10,13 @@ import (
)
type stopCMD struct {
ctx *WuKongIMContext
}
func newStopCMD(ctx *WuKongIMContext) *stopCMD {
return &stopCMD{}
return &stopCMD{
ctx: ctx,
}
}
func (s *stopCMD) CMD() *cobra.Command {
@@ -29,7 +33,14 @@ func (s *stopCMD) run(cmd *cobra.Command, args []string) error {
command := exec.Command("kill", string(strb))
err := command.Start()
if err != nil {
fmt.Println("Error: ", err)
return err
}
return command.Wait()
err = command.Wait()
if err != nil {
fmt.Println("Error: ", err)
return err
}
fmt.Println("WuKongIM server stopped")
return nil
}

View File

@@ -1,67 +1,80 @@
version: '3'
version: '3.7'
x-wukongim-common: &wukongim-common # 公共配置
image: wukongim/wukongim:v2.0.1-beta-20240715
healthcheck:
test: "wget -q -Y off -O /dev/null http://localhost:5001/health > /dev/null 2>&1"
interval: 10s
timeout: 10s
retries: 3
services:
wukongim1:
image: wukongim/wukongim:v2.0.0-beta-20240428
<<: *wukongim-common
container_name: wukongim1
environment:
- "WK_CLUSTER_NODEID=1001" # 节点ID必须数字
- "WK_EXTERNAL_APIURL=http://node1.tgo.local:5001" # 节点对外API地址
- "WK_EXTERNAL_WSADDR=ws://127.0.0.1:15200" # 节点对外WS地址 这里IP视情况修改
- "WK_CLUSTER_NODES=1001@node1.tgo.local 1002@node2.tgo.local 1003@node3.tgo.local" # 集群节点列表
healthcheck:
test: "wget -q -Y off -O /dev/null http://localhost:5001/health > /dev/null 2>&1"
interval: 10s
timeout: 10s
retries: 3
- "WK_CLUSTER_NODEID=1001" # 节点ID必须数字
- "WK_EXTERNAL_WSADDR=ws://127.0.0.1:15200" # 节点对外WS地址 这里IP视情况修改
- "WK_CLUSTER_NODES=1001@node1.wk.local 1002@node2.wk.local 1003@node3.wk.local" # 集群节点列表
- "WK_CLUSTER_APIURL=http://node1.wk.local:5001" # 节点对外API地址
networks:
wukongim-bridge:
aliases:
- node1.tgo.local # docker内部别名
- node1.wk.local # docker内部别名
volumes:
- $PWD/wukongim1_data:/root/wukongim
- $PWD/wukongim1_data:/root/wukongim # 数据挂载到物理机的目录
wukongim2:
image: wukongim/wukongim:v2.0.0-beta-20240428
<<: *wukongim-common
container_name: wukongim2
environment:
- "WK_CLUSTER_NODEID=1002"
- "WK_EXTERNAL_APIURL=http://node2.tgo.local:5001"
- "WK_EXTERNAL_WSADDR=ws://127.0.0.1:15200"
- "WK_CLUSTER_NODES=1001@node1.tgo.local 1002@node2.tgo.local 1003@node3.tgo.local"
healthcheck:
test: "wget -q -Y off -O /dev/null http://localhost:5001/health > /dev/null 2>&1"
interval: 10s
timeout: 10s
retries: 3
- "WK_EXTERNAL_WSADDR=ws://127.0.0.1:15200" # 节点对外WS地址 这里IP视情况修改
- "WK_CLUSTER_NODES=1001@node1.wk.local 1002@node2.wk.local 1003@node3.wk.local" # 集群节点列表
- "WK_CLUSTER_APIURL=http://node2.wk.local:5001"
networks:
wukongim-bridge:
aliases:
- node2.tgo.local
- node2.wk.local
volumes:
- $PWD/wukongim2_data:/root/wukongim
wukongim3:
image: wukongim/wukongim:v2.0.0-beta-20240428
<<: *wukongim-common
container_name: wukongim3
environment:
- "WK_CLUSTER_NODEID=1003"
- "WK_EXTERNAL_APIURL=http://node3.tgo.local:5001"
- "WK_EXTERNAL_WSADDR=ws://127.0.0.1:15200"
- "WK_CLUSTER_NODES=1001@node1.tgo.local 1002@node2.tgo.local 1003@node3.tgo.local"
healthcheck:
test: "wget -q -Y off -O /dev/null http://localhost:5001/health > /dev/null 2>&1"
interval: 10s
timeout: 10s
retries: 3
- "WK_EXTERNAL_WSADDR=ws://127.0.0.1:15200" # 节点对外WS地址 这里IP视情况修改
- "WK_CLUSTER_NODES=1001@node1.wk.local 1002@node2.wk.local 1003@node3.wk.local" # 集群节点列表
- "WK_CLUSTER_APIURL=http://node3.wk.local:5001"
networks:
wukongim-bridge:
aliases:
- node3.tgo.local
- node3.wk.local
volumes:
- $PWD/wukongim3_data:/root/wukongim
prometheus:
image: prom/prometheus:v2.53.1
volumes:
- "./prometheus.yml:/etc/prometheus/prometheus.yml"
ports:
- "9090:9090"
# node-exporter:
# image: prom/node-exporter:v1.8.0
# container_name: node_exporter
# privileged: true
# # command:
# # - '--path.rootfs=/host'
# pid: host
# restart: unless-stopped
# environment:
# - TZ=Asia/Shanghai
# # volumes:
# # - '/:/host:ro,rslave'
# ports:
# - "9100:9100"
nginx:
image: nginx:1.19.2-alpine
image: nginx:1.27.0
container_name: apigateway
hostname: apigateway
volumes:
@@ -69,7 +82,11 @@ services:
networks:
wukongim-bridge:
aliases:
- apigateway.tgo.local
- apigateway.wk.local
depends_on:
- wukongim1
- wukongim2
- wukongim3
ports:
- "15001:5001"
- "15200:5200"

View File

@@ -8,68 +8,60 @@ gzip_buffers 16 8k;
gzip_http_version 1.1;
gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;
# 定义api负载均衡服务器
# api负载均衡
upstream wukongimapi {
server node1.tgo.local:5001;
server node2.tgo.local:5001;
server node3.tgo.local:5001;
server node1.wk.local:5001;
}
# demo
# demo负载均衡
upstream wukongimdemo {
server node1.tgo.local:5172;
server node2.tgo.local:5172;
server node3.tgo.local:5172;
server node1.wk.local:5172;
}
# minitor
upstream wukongiminitor {
server node1.tgo.local:5300;
server node2.tgo.local:5300;
server node3.tgo.local:5300;
# manager负载均衡
upstream wukongimanager {
server node1.wk.local:5300;
}
# ws
# ws负载均衡
upstream wukongimws {
server node1.tgo.local:5200;
server node2.tgo.local:5200;
server node3.tgo.local:5200;
server node1.wk.local:5200;
}
# tcp负载均衡
upstream wukongimtcp {
server node1.wk.local:5100;
}
# http api转发
server {
listen 5001;
location / {
proxy_pass http://wukongimapi;
proxy_connect_timeout 60s;
proxy_connect_timeout 20s;
proxy_read_timeout 60s;
}
}
# demo
server {
listen 5172;
location / {
proxy_pass http://wukongimdemo;
proxy_connect_timeout 60s;
proxy_connect_timeout 20s;
proxy_read_timeout 60s;
}
location /login {
rewrite ^ /chatdemo?apiurl=http://127.0.0.1:15001;
proxy_pass http://wukongimdemo;
proxy_connect_timeout 60s;
proxy_connect_timeout 20s;
proxy_read_timeout 60s;
}
}
# manager
server {
listen 5300;
location / {
proxy_pass http://wukongiminitor;
proxy_pass http://wukongimanager;
proxy_connect_timeout 60s;
proxy_read_timeout 60s;
}
}
# ws
server {
listen 5200;
location / {
@@ -78,7 +70,7 @@ server {
proxy_http_version 1.1;
# nginx接收upstream server数据超时, 默认120s, 如果连续的120s内没有收到1个字节, 连接关闭
proxy_read_timeout 120s;
# nginx发送数据至upstream server超时, 默认60s, 如果连续的120s内没有发送1个字节, 连接关闭
# nginx发送数据至upstream server超时, 默认120s, 如果连续的120s内没有发送1个字节, 连接关闭
proxy_send_timeout 120s;
# nginx与upstream server的连接超时时间
proxy_connect_timeout 4s;
@@ -86,4 +78,11 @@ server {
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
# tcp
server {
listen: 5100;
proxy_connect_timeout 4s;
proxy_timeout 120s;
proxy_pass wukongimtcp;
}

View File

@@ -0,0 +1,24 @@
global:
scrape_interval: 10s # Set the scrape interval to every 10 seconds. Default is every 1 minute.
evaluation_interval: 10s # Evaluate rules every 10 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).
scrape_configs:
# - job_name: node-exporter
# static_configs:
# - targets: ['node-exporter:9100']
- job_name: wukongim-trace-metrics-1
static_configs:
- targets: ['wukongim1:5300']
labels:
id: "1001"
- job_name: wukongim-trace-metrics-2
static_configs:
- targets: ['wukongim2:5300']
labels:
id: "1002"
- job_name: wukongim-trace-metrics-3
static_configs:
- targets: ['wukongim3:5300']
labels:
id: "1003"

View File

@@ -1,22 +1,18 @@
mode: "release"
addr: "tcp://0.0.0.0:5110" # tcp监听地址
httpAddr: "0.0.0.0:5001"
wsAddr: "ws://0.0.0.0:5210"
rootDir: "./wukongimdata/1"
monitor:
addr: "tcp://0.0.0.0:5100" # tcp监听地址
httpAddr: "0.0.0.0:5001" # http api监听地址
wsAddr: "ws://0.0.0.0:5200" # websocket ws 监听地址
rootDir: "./wukongimdata/1001" # 数据存储目录
manager: # 管理端配置
on: true
addr: "0.0.0.0:5301"
addr: "0.0.0.0:5300"
demo:
on: true
conversation:
on: true
cluster:
nodeId: 1
addr: "tcp://127.0.0.1:10001"
slotCount: 128
replicaCount: 2
slotReplicaCount: 2
channelReplicaCount: 2
nodeId: 1001 # 节点ID
apiUrl: "http://127.0.0.1:5001"
# 认证配置 用户名:密码:资源:权限 *表示通配符 资源格式也可以是[资源ID:权限]
# 例如: - "admin:pwd:[clusterchannel:rw]" 表示admin用户密码为pwd对clusterchannel资源有读写权限,
@@ -27,6 +23,6 @@ auth:
users:
- "admin:pwd:*"
- "guest:guest:[*:r]" # guest用户密码为guest对所有资源有读权限
# jwt:
# secret: "xxxxx"
# expire: 30d
jwt: ## jwt认证方式
secret: "xxxxx" # jwt密钥
expire: 30d # jwt过期时间

2
go.mod
View File

@@ -126,4 +126,4 @@ require (
)
// replace github.com/WuKongIM/WuKongIMGoSDK => ../../WuKongIMGoSDK
replace github.com/WuKongIM/WuKongIMGoProto => ../../WuKongIMGoProto
// replace github.com/WuKongIM/WuKongIMGoProto => ../../WuKongIMGoProto

View File

@@ -69,7 +69,7 @@ type Options struct {
Level zapcore.Level
LineNum bool // 是否显示代码行数
}
Monitor struct {
Manager struct {
On bool // 是否开启监控
Addr string // 监控地址 默认为 0.0.0.0:5300
}
@@ -83,7 +83,7 @@ type Options struct {
TCPAddr string // 节点的TCP地址 对外公开APP端长连接通讯 格式: ip:port
WSAddr string // 节点的wsAdd地址 对外公开 WEB端长连接通讯 格式: ws://ip:port
WSSAddr string // 节点的wssAddr地址 对外公开 WEB端长连接通讯 格式: wss://ip:port
MonitorAddr string // 对外访问的监控地址
ManagerAddr string // 对外访问的管理地址
APIUrl string // 对外访问的API基地址 格式: http://ip:port
}
Channel struct { // 频道配置
@@ -158,8 +158,8 @@ type Options struct {
Cluster struct {
NodeId uint64 // 节点ID
Addr string // 节点监听地址 例如tcp://0.0.0.0:11110
GRPCAddr string // 节点grpc监听地址 例如0.0.0.0:11111
ServerAddr string // 节点服务地址 例如 127.0.0.1:11110
APIUrl string // 节点之间可访问的api地址
ReqTimeout time.Duration // 请求超时时间
Role Role // 节点角色 replica, proxy
Seed string // 种子节点
@@ -333,7 +333,7 @@ func NewOptions(op ...Option) *Options {
MsgNotifyEventCountPerPush: 100,
MsgNotifyEventRetryMaxCount: 5,
},
Monitor: struct {
Manager: struct {
On bool
Addr string
}{
@@ -350,8 +350,8 @@ func NewOptions(op ...Option) *Options {
Cluster: struct {
NodeId uint64
Addr string
GRPCAddr string
ServerAddr string
APIUrl string
ReqTimeout time.Duration
Role Role
Seed string
@@ -368,9 +368,8 @@ func NewOptions(op ...Option) *Options {
SlotReactorSubCount int
PongMaxTick int
}{
NodeId: 1,
NodeId: 1001,
Addr: "tcp://0.0.0.0:11110",
GRPCAddr: "0.0.0.0:11111",
ServerAddr: "",
ReqTimeout: time.Second * 10,
Role: RoleReplica,
@@ -501,11 +500,11 @@ func (o *Options) ConfigureWithViper(vp *viper.Viper) {
o.External.TCPAddr = o.getString("external.tcpAddr", o.External.TCPAddr)
o.External.WSAddr = o.getString("external.wsAddr", o.External.WSAddr)
o.External.WSSAddr = o.getString("external.wssAddr", o.External.WSSAddr)
o.External.MonitorAddr = o.getString("external.monitorAddr", o.External.MonitorAddr)
o.External.ManagerAddr = o.getString("external.managerAddr", o.External.ManagerAddr)
o.External.APIUrl = o.getString("external.apiUrl", o.External.APIUrl)
o.Monitor.On = o.getBool("monitor.on", o.Monitor.On)
o.Monitor.Addr = o.getString("monitor.addr", o.Monitor.Addr)
o.Manager.On = o.getBool("manager.on", o.Manager.On)
o.Manager.Addr = o.getString("manager.addr", o.Manager.Addr)
o.Demo.On = o.getBool("demo.on", o.Demo.On)
o.Demo.Addr = o.getString("demo.addr", o.Demo.Addr)
@@ -600,10 +599,10 @@ func (o *Options) ConfigureWithViper(vp *viper.Viper) {
o.External.WSSAddr = fmt.Sprintf("%s://%s:%d", addrPairs[0], ip, portInt64)
}
if strings.TrimSpace(o.External.MonitorAddr) == "" {
addrPairs := strings.Split(o.Monitor.Addr, ":")
if strings.TrimSpace(o.External.ManagerAddr) == "" {
addrPairs := strings.Split(o.Manager.Addr, ":")
portInt64, _ := strconv.ParseInt(addrPairs[len(addrPairs)-1], 10, 64)
o.External.MonitorAddr = fmt.Sprintf("%s:%d", ip, portInt64)
o.External.ManagerAddr = fmt.Sprintf("%s:%d", ip, portInt64)
}
if strings.TrimSpace(o.External.APIUrl) == "" {
@@ -629,7 +628,6 @@ func (o *Options) ConfigureWithViper(vp *viper.Viper) {
default:
wklog.Panic("cluster.role must be proxy or replica, but got " + role)
}
o.Cluster.GRPCAddr = o.getString("cluster.grpcAddr", o.Cluster.GRPCAddr)
o.Cluster.SlotReplicaCount = o.getInt("cluster.slotReplicaCount", o.Cluster.SlotReplicaCount)
o.Cluster.ChannelReplicaCount = o.getInt("cluster.channelReplicaCount", o.Cluster.ChannelReplicaCount)
o.Cluster.PeerRPCMsgTimeout = o.getDuration("cluster.peerRPCMsgTimeout", o.Cluster.PeerRPCMsgTimeout)
@@ -669,6 +667,7 @@ func (o *Options) ConfigureWithViper(vp *viper.Viper) {
o.Cluster.HeartbeatIntervalTick = o.getInt("cluster.heartbeatIntervalTick", o.Cluster.HeartbeatIntervalTick)
o.Cluster.ChannelReactorSubCount = o.getInt("cluster.channelReactorSubCount", o.Cluster.ChannelReactorSubCount)
o.Cluster.SlotReactorSubCount = o.getInt("cluster.slotReactorSubCount", o.Cluster.SlotReactorSubCount)
o.Cluster.APIUrl = o.getString("cluster.apiUrl", o.Cluster.APIUrl)
// =================== trace ===================
o.Trace.Endpoint = o.getString("trace.endpoint", o.Trace.Endpoint)
@@ -826,6 +825,15 @@ func (o *Options) ConfigureDataDir() {
}
}
// Check 检查配置是否正确
func (o *Options) Check() error {
if o.Cluster.NodeId == 0 {
return errors.New("cluster.nodeId must be set")
}
return nil
}
func (o *Options) ClusterOn() bool {
return o.Cluster.NodeId != 0
}
@@ -1068,15 +1076,15 @@ func WithLoggerLineNum(lineNum bool) Option {
}
}
func WithMonitorOn(on bool) Option {
func WithManagerOn(on bool) Option {
return func(opts *Options) {
opts.Monitor.On = on
opts.Manager.On = on
}
}
func WithMonitorAddr(addr string) Option {
func WithManagerAddr(addr string) Option {
return func(opts *Options) {
opts.Monitor.Addr = addr
opts.Manager.Addr = addr
}
}
@@ -1116,9 +1124,9 @@ func WithExternalWSSAddr(wssAddr string) Option {
}
}
func WithExternalMonitorAddr(monitorAddr string) Option {
func WithExternalManagerAddr(managerAddr string) Option {
return func(opts *Options) {
opts.External.MonitorAddr = monitorAddr
opts.External.ManagerAddr = managerAddr
}
}
@@ -1338,12 +1346,6 @@ func WithClusterAddr(addr string) Option {
}
}
func WithClusterGRPCAddr(grpcAddr string) Option {
return func(opts *Options) {
opts.Cluster.GRPCAddr = grpcAddr
}
}
func WithClusterServerAddr(serverAddr string) Option {
return func(opts *Options) {
opts.Cluster.ServerAddr = serverAddr

View File

@@ -7,7 +7,6 @@ import (
"path"
"path/filepath"
"runtime"
"runtime/debug"
"strings"
"time"
@@ -27,17 +26,16 @@ import (
wkproto "github.com/WuKongIM/WuKongIMGoProto"
"github.com/gin-gonic/gin"
"github.com/judwhite/go-svc"
"github.com/panjf2000/ants/v2"
"github.com/pkg/errors"
"go.etcd.io/etcd/pkg/v3/idutil"
"go.uber.org/zap"
)
type Server struct {
opts *Options // 配置
wklog.Log // 日志
cluster icluster.Cluster // 分布式
clusterServer *cluster.Server
opts *Options // 配置
wklog.Log // 日志
cluster icluster.Cluster // 分布式接口
clusterServer *cluster.Server // 分布式服务实现
reqIDGen *idutil.Generator // 请求ID生成器
ctx context.Context
cancel context.CancelFunc
@@ -45,20 +43,19 @@ type Server struct {
start time.Time // 服务开始时间
store *clusterstore.Store // 存储相关接口
engine *wknet.Engine // 长连接引擎
authPool *ants.Pool // 认证的协程池
userReactor *userReactor // 用户的reactor
channelReactor *channelReactor // 频道的reactor
userReactor *userReactor // 用户的reactor,用于处理用户的行为逻辑
channelReactor *channelReactor // 频道的reactor,用户处理频道的行为逻辑
webhook *webhook // webhook
trace *trace.Trace // 监控
demoServer *DemoServer // demo server
apiServer *APIServer // api服务
managerServer *ManagerServer // 监控服务
managerServer *ManagerServer // 管理者api服务
systemUIDManager *SystemUIDManager // 系统账号管理
tagManager *tagManager // tag管理
tagManager *tagManager // tag管理用来管理频道订阅者的tag用于快速查找订阅者所在节点
deliverManager *deliverManager // 消息投递管理
retryManager *retryManager // 消息重试管理
@@ -75,8 +72,15 @@ func New(opts *Options) *Server {
start: now,
}
// 配置检查
err := opts.Check()
if err != nil {
s.Panic("config check error", zap.Error(err))
}
s.ctx, s.cancel = context.WithCancel(context.Background())
// 初始化监控追踪
s.trace = trace.New(
s.ctx,
trace.NewOptions(
@@ -86,17 +90,9 @@ func New(opts *Options) *Server {
))
trace.SetGlobalTrace(s.trace)
var err error
s.authPool, err = ants.NewPool(opts.Process.AuthPoolSize, ants.WithPanicHandler(func(i interface{}) {
stack := debug.Stack()
s.Panic("authPool panic", zap.String("stack", string(stack)))
}))
if err != nil {
s.Panic("create auth pool error", zap.Error(err))
}
gin.SetMode(opts.GinMode)
// 初始化存储
storeOpts := clusterstore.NewOptions(s.opts.Cluster.NodeId)
storeOpts.DataDir = path.Join(s.opts.DataDir, "db")
storeOpts.SlotCount = uint32(s.opts.Cluster.SlotCount)
@@ -104,8 +100,11 @@ func New(opts *Options) *Server {
storeOpts.IsCmdChannel = opts.IsCmdChannel
storeOpts.Db.ShardNum = s.opts.Db.ShardNum
s.store = clusterstore.NewStore(storeOpts)
// 初始化tag管理
s.tagManager = newTagManager(s)
// 初始化长连接引擎
s.engine = wknet.NewEngine(
wknet.WithAddr(s.opts.Addr),
wknet.WithWSAddr(s.opts.WSAddr),
@@ -118,25 +117,24 @@ func New(opts *Options) *Server {
trace.GlobalTrace.Metrics.System().ExtranetOutgoingAdd(int64(n))
}),
)
s.webhook = newWebhook(s)
s.channelReactor = newChannelReactor(s, opts)
s.userReactor = newUserReactor(s)
s.demoServer = NewDemoServer(s)
s.systemUIDManager = NewSystemUIDManager(s)
s.apiServer = NewAPIServer(s)
s.managerServer = NewManagerServer(s)
s.retryManager = newRetryManager(s)
s.conversationManager = NewConversationManager(s)
s.webhook = newWebhook(s) // webhook
s.channelReactor = newChannelReactor(s, opts) // 频道的reactor
s.userReactor = newUserReactor(s) // 用户的reactor
s.demoServer = NewDemoServer(s) // demo server
s.systemUIDManager = NewSystemUIDManager(s) // 系统账号管理
s.apiServer = NewAPIServer(s) // api服务
s.managerServer = NewManagerServer(s) // 管理者的api服务
s.retryManager = newRetryManager(s) // 消息重试管理
s.conversationManager = NewConversationManager(s) // 会话管理
// 初始化分布式服务
initNodes := make(map[uint64]string)
if len(s.opts.Cluster.Nodes) > 0 {
for _, node := range s.opts.Cluster.Nodes {
serverAddr := strings.ReplaceAll(node.ServerAddr, "tcp://", "")
initNodes[node.Id] = serverAddr
}
}
role := pb.NodeRole_NodeRoleReplica
if s.opts.Cluster.Role == RoleProxy {
role = pb.NodeRole_NodeRoleProxy
@@ -152,7 +150,7 @@ func New(opts *Options) *Server {
cluster.WithRole(role),
cluster.WithServerAddr(s.opts.Cluster.ServerAddr),
cluster.WithMessageLogStorage(s.store.GetMessageShardLogStorage()),
cluster.WithApiServerAddr(s.opts.External.APIUrl),
cluster.WithApiServerAddr(s.opts.Cluster.APIUrl),
cluster.WithChannelMaxReplicaCount(s.opts.Cluster.ChannelReplicaCount),
cluster.WithSlotMaxReplicaCount(uint32(s.opts.Cluster.SlotReplicaCount)),
cluster.WithLogLevel(s.opts.Logger.Level),
@@ -185,6 +183,7 @@ func New(opts *Options) *Server {
s.handleClusterMessage(fromNodeId, msg)
})
// 消息投递管理者
s.deliverManager = newDeliverManager(s)
return s
@@ -225,8 +224,8 @@ func (s *Server) Start() error {
}
s.Info(fmt.Sprintf("Listening for Manager http api on %s", fmt.Sprintf("http://%s", s.opts.HTTPAddr)))
if s.opts.Monitor.On {
s.Info(fmt.Sprintf("Listening for Monitor on %s", s.opts.Monitor.Addr))
if s.opts.Manager.On {
s.Info(fmt.Sprintf("Listening for Manager on %s", s.opts.Manager.Addr))
}
defer s.Info("Server is ready")

View File

@@ -75,6 +75,11 @@ func (s *APIServer) Stop() {
}
func (s *APIServer) setRoutes() {
s.r.GET("/health", func(c *wkhttp.Context) {
c.JSON(http.StatusOK, gin.H{"status": "ok"})
})
connz := NewConnzAPI(s.s)
connz.Route(s.r)

View File

@@ -27,7 +27,7 @@ func NewManagerServer(s *Server) *ManagerServer {
r := wkhttp.New()
return &ManagerServer{
addr: s.opts.Monitor.Addr,
addr: s.opts.Manager.Addr,
s: s,
r: r,
Log: wklog.NewWKLog("ManagerServer"),

View File

@@ -228,7 +228,7 @@ func TestClusterNodeJoin(t *testing.T) {
assert.Equal(t, 2, len(cfg.Nodes))
// new server
s3 := NewTestServer(t, WithDemoOn(false), WithClusterSeed("1001@127.0.0.1:11110"), WithClusterServerAddr("0.0.0.0:11115"), WithWSAddr("ws://0.0.0.0:5250"), WithMonitorAddr("0.0.0.0:5350"), WithAddr("tcp://0.0.0.0:5150"), WithHTTPAddr("0.0.0.0:5005"), WithClusterAddr("tcp://0.0.0.0:11115"), WithClusterNodeId(1005))
s3 := NewTestServer(t, WithDemoOn(false), WithClusterSeed("1001@127.0.0.1:11110"), WithClusterServerAddr("0.0.0.0:11115"), WithWSAddr("ws://0.0.0.0:5250"), WithManagerAddr("0.0.0.0:5350"), WithAddr("tcp://0.0.0.0:5150"), WithHTTPAddr("0.0.0.0:5005"), WithClusterAddr("tcp://0.0.0.0:11115"), WithClusterNodeId(1005))
err = s3.Start()
assert.Nil(t, err)
defer s3.StopNoErr()

View File

@@ -60,8 +60,8 @@ func NewTestClusterServerTwoNode(t *testing.T, opt ...Option) (*Server, *Server)
ServerAddr: "0.0.0.0:11111",
})
s1 := NewTestServer(t, WithDemoOn(false), WithWSAddr("ws://0.0.0.0:5210"), WithMonitorAddr("0.0.0.0:5310"), WithAddr("tcp://0.0.0.0:5110"), WithHTTPAddr("0.0.0.0:5001"), WithClusterAddr("tcp://0.0.0.0:11110"), WithClusterNodeId(1001), WithClusterNodes(nodes), WithOpts(opt...))
s2 := NewTestServer(t, WithDemoOn(false), WithWSAddr("ws://0.0.0.0:5220"), WithMonitorAddr("0.0.0.0:5320"), WithAddr("tcp://0.0.0.0:5120"), WithHTTPAddr("0.0.0.0:5002"), WithClusterAddr("tcp://0.0.0.0:11111"), WithClusterNodeId(1002), WithClusterNodes(nodes), WithOpts(opt...))
s1 := NewTestServer(t, WithDemoOn(false), WithWSAddr("ws://0.0.0.0:5210"), WithManagerAddr("0.0.0.0:5310"), WithAddr("tcp://0.0.0.0:5110"), WithHTTPAddr("0.0.0.0:5001"), WithClusterAddr("tcp://0.0.0.0:11110"), WithClusterNodeId(1001), WithClusterNodes(nodes), WithOpts(opt...))
s2 := NewTestServer(t, WithDemoOn(false), WithWSAddr("ws://0.0.0.0:5220"), WithManagerAddr("0.0.0.0:5320"), WithAddr("tcp://0.0.0.0:5120"), WithHTTPAddr("0.0.0.0:5002"), WithClusterAddr("tcp://0.0.0.0:11111"), WithClusterNodeId(1002), WithClusterNodes(nodes), WithOpts(opt...))
return s1, s2
}
@@ -87,7 +87,7 @@ func NewTestClusterServerTreeNode(t testing.TB, opt ...Option) (*Server, *Server
WithClusterSlotReplicaCount(3),
WithClusterChannelReplicaCount(3),
WithWSAddr("ws://0.0.0.0:5210"),
WithMonitorAddr("0.0.0.0:5310"),
WithManagerAddr("0.0.0.0:5310"),
WithAddr("tcp://0.0.0.0:5110"),
WithHTTPAddr("0.0.0.0:5001"),
WithClusterAddr("tcp://0.0.0.0:11110"),
@@ -103,7 +103,7 @@ func NewTestClusterServerTreeNode(t testing.TB, opt ...Option) (*Server, *Server
WithClusterSlotReplicaCount(3),
WithClusterChannelReplicaCount(3),
WithWSAddr("ws://0.0.0.0:5220"),
WithMonitorAddr("0.0.0.0:5320"),
WithManagerAddr("0.0.0.0:5320"),
WithAddr("tcp://0.0.0.0:5120"),
WithHTTPAddr("0.0.0.0:5002"),
WithClusterAddr("tcp://0.0.0.0:11111"),
@@ -119,7 +119,7 @@ func NewTestClusterServerTreeNode(t testing.TB, opt ...Option) (*Server, *Server
WithClusterSlotReplicaCount(3),
WithClusterChannelReplicaCount(3),
WithWSAddr("ws://0.0.0.0:5230"),
WithMonitorAddr("0.0.0.0:5330"),
WithManagerAddr("0.0.0.0:5330"),
WithAddr("tcp://0.0.0.0:5130"),
WithHTTPAddr("0.0.0.0:5003"),
WithClusterAddr("tcp://0.0.0.0:11112"),