refactor: cluster

This commit is contained in:
tangtaoit
2024-07-18 20:34:19 +08:00
parent 8faf28f774
commit a4d42180e3
25 changed files with 347 additions and 150 deletions

2
.gitignore vendored
View File

@@ -41,3 +41,5 @@ wukongimdata
wukongim*_data/
external_ip.txt

View File

@@ -49,7 +49,7 @@ demo:
# addr: "" # 数据源地址
# channelInfoOn: false # 是否开启频道信息数据源的获取
conversation: # 最近会话配置
on: true # 是否开启最近会话
on: true # 是否开启最近会话
# cacheExpire: 1d # 最近会话缓存过期时间 默认为1天注意这里指清除内存里的最近会话缓存并不表示清除最近会话
# syncInterval: 5m # 最近会话保存间隔,每隔指定的时间进行保存一次 默认为5分钟
# syncOnce: 100 # 最近会话同步保存一次的数量 超过指定未保存的数量 将进行保存 默认为100
@@ -60,4 +60,45 @@ conversation: # 最近会话配置
# maxCount: 5 # 消息最大重试次数, 服务端持有用户的连接但是给此用户发送消息后在指定的间隔内没有收到ack将会重新发送直到超过maxCount配置的数量后将不再发送这种情况很少出现如果出现这种情况此消息只能去离线接口去拉取
#userMsgQueueMaxSize: 0 # 用户消息队列最大大小超过此大小此用户将被限速0为不限制
#deadlockCheck: false # 是否开启死锁检测
#pprofOn: false # 是否开启pprof
#pprofOn: false # 是否开启pprof
# # 认证配置
# auth:
# kind: 'jwt' # 认证方式 jwt: jwt认证 none: 无需认证
# # 用户配置
# #用户名:密码:资源:权限 *表示通配符 资源格式也可以是[资源ID:权限]
# # 例如: - "admin:pwd:[clusterchannel:rw]" 表示admin用户密码为pwd对clusterchannel资源有读写权限,
# # - "admin:pwd:*" 表示admin用户密码为pwd对所有资源有读写权限
# users:
# - "admin:pwd:*"
# - "guest:guest:[*:r]" # guest用户密码为guest对所有资源有读权限
# jwt: # jwt配置
# secret: "" # jwt密钥这个配置比较重要需要自己生成一个随机字符串建议随机的32位字符串用于jwt的加密
# expire: 30d # jwt过期时间 默认为30天
# trace: # 数据追踪
# prometheusApiUrl: "http://xx.xx.xx.xx:9090" # prometheus的内网地址,用于获取监控数据
# # 集群配置
# cluster:
# nodeId: 1001 # 节点ID
# addr: "tcp://0.0.0.0:11110" # 分布式监听地址
# serverAddr: "" # 节点之间能访问到的内网通讯地址 例如xx.xx.xx.xx:11110
# apiUrl: "" # 节点的http地址 内网地址,节点之间需要能访问到 格式: http://ip:port 例如http://xx.xx.xx.xx:5001
# slotCount: 64 # 槽位分区数量默认是64个
# slotReplicaCount: 3 # 槽位分区副本数量默认是3个
# channelReplicaCount: 3 # 频道副本数量默认是3个
# # 初始节点列表 格式 nodeId@ip:port分布式初始化时的节点列表列表包含本节点自己
# # 例如:
# # initNodes:
# # - "1001@192.168.1.12:11110"
# # - "1002@192.168.1.13:11110"
# # - "1003@192.168.1.14:11110"
# initNodes:
# - ""
# # 集群种子节点地址 格式 nodeId@ip:port
# # 例如:
# # seed:
# # - "1001@192.168.1.12:11110"
# seed:
# - ""

View File

@@ -3,7 +3,7 @@ addr: "tcp://0.0.0.0:5110" # tcp监听地址
httpAddr: "0.0.0.0:5001"
wsAddr: "ws://0.0.0.0:5210"
rootDir: "./wukongimdata/1"
monitor:
manager:
on: true
addr: "0.0.0.0:5301"
demo:
@@ -16,7 +16,8 @@ cluster:
slotCount: 64
slotReplicaCount: 3
channelReplicaCount: 3
nodes:
apiUrl: "http://127.0.0.1:5001"
initNodes:
- "1001@127.0.0.1:10001"
- "1002@127.0.0.1:10002"
- "1003@127.0.0.1:10003"

View File

@@ -3,7 +3,7 @@ addr: "tcp://0.0.0.0:5120" # tcp监听地址
httpAddr: "0.0.0.0:5002"
wsAddr: "ws://0.0.0.0:5220"
rootDir: "./wukongimdata/2"
monitor:
manager:
on: true
addr: "0.0.0.0:5302"
demo:
@@ -17,7 +17,8 @@ cluster:
slotCount: 64
slotReplicaCount: 3
channelReplicaCount: 3
nodes:
apiUrl: "http://127.0.0.1:5002"
initNodes:
- "1001@127.0.0.1:10001"
- "1002@127.0.0.1:10002"
- "1003@127.0.0.1:10003"

View File

@@ -3,7 +3,7 @@ addr: "tcp://0.0.0.0:5130" # tcp监听地址
httpAddr: "0.0.0.0:5003"
wsAddr: "ws://0.0.0.0:5230"
rootDir: "./wukongimdata/3"
monitor:
manager:
on: true
addr: "0.0.0.0:5303"
demo:
@@ -16,7 +16,8 @@ cluster:
slotCount: 64
slotReplicaCount: 3
channelReplicaCount: 3
nodes:
apiUrl: "http://127.0.0.1:5003"
initNodes:
- "1001@127.0.0.1:10001"
- "1002@127.0.0.1:10002"
- "1003@127.0.0.1:10003"

View File

@@ -3,7 +3,7 @@ addr: "tcp://0.0.0.0:5150" # tcp监听地址
httpAddr: "0.0.0.0:5005"
wsAddr: "ws://0.0.0.0:5250"
rootDir: "./wukongimdata/5"
monitor:
manager:
on: false
demo:
on: true
@@ -14,4 +14,14 @@ cluster:
nodeId: 1005
addr: "tcp://127.0.0.1:10005" # 分布式监听地址
serverAddr: "127.0.0.1:10005" # 节点通讯地址
seed: "1001@127.0.0.1:10001" # 其他任意集群内的节点
apiUrl: "http://127.0.0.1:5005"
seed: "1001@127.0.0.1:10001" # 其他任意集群内的节点
auth:
kind: 'jwt' # 认证方式 jwt: jwt认证 none: 无需认证
users:
- "admin:pwd:*"
- "guest:guest:[*:r]" # guest用户密码为guest对所有资源有读权限
jwt:
secret: "xxxxx"
expire: 30d

View File

@@ -0,0 +1,27 @@
mode: "debug"
addr: "tcp://0.0.0.0:5120" # tcp监听地址
httpAddr: "0.0.0.0:5002"
wsAddr: "ws://0.0.0.0:5220"
rootDir: "./wukongimdata/2"
manager:
on: false
demo:
on: true
addr: "0.0.0.0:5173"
conversation:
on: true
cluster:
nodeId: 1005
addr: "tcp://127.0.0.1:11111" # 分布式监听地址
serverAddr: "127.0.0.1:11111" # 节点通讯地址
apiUrl: "http://127.0.0.1:5002"
seed: "1001@127.0.0.1:11110" # 其他任意集群内的节点
auth:
kind: 'jwt' # 认证方式 jwt: jwt认证 none: 无需认证
users:
- "admin:pwd:*"
- "guest:guest:[*:r]" # guest用户密码为guest对所有资源有读权限
jwt:
secret: "xxxxx"
expire: 30d

View File

@@ -0,0 +1,31 @@
mode: "debug"
addr: "tcp://0.0.0.0:5110" # tcp监听地址
httpAddr: "0.0.0.0:5001"
wsAddr: "ws://0.0.0.0:5210"
rootDir: "./wukongimdata/1"
manager:
on: true
addr: "0.0.0.0:5301"
demo:
on: true
conversation:
on: true
cluster:
nodeId: 1001
addr: "tcp://127.0.0.1:11110"
slotCount: 64
slotReplicaCount: 3
channelReplicaCount: 3
apiUrl: "http://127.0.0.1:5001"
# 认证配置 用户名:密码:资源:权限 *表示通配符 资源格式也可以是[资源ID:权限]
# 例如: - "admin:pwd:[clusterchannel:rw]" 表示admin用户密码为pwd对clusterchannel资源有读写权限,
# - "admin:pwd:*" 表示admin用户密码为pwd对所有资源有读写权限
auth:
kind: 'jwt' # 认证方式 jwt: jwt认证 none: 无需认证
users:
- "admin:pwd:*"
- "guest:guest:[*:r]" # guest用户密码为guest对所有资源有读权限
jwt:
secret: "xxxxx"
expire: 30d

View File

@@ -79,12 +79,13 @@ type Options struct {
Addr string // demo服务地址 默认为 0.0.0.0:5172
}
External struct {
IP string // 外网IP
TCPAddr string // 节点的TCP地址 对外公开APP端长连接通讯 格式: ip:port
WSAddr string // 节点的wsAdd地址 对外公开 WEB端长连接通讯 格式: ws://ip:port
WSSAddr string // 节点的wssAddr地址 对外公开 WEB端长连接通讯 格式: wss://ip:port
ManagerAddr string // 对外访问的管理地址
APIUrl string // 对外访问的API基地址 格式: http://ip:port
IP string // 外网IP
TCPAddr string // 节点的TCP地址 对外公开APP端长连接通讯 格式: ip:port
WSAddr string // 节点的wsAdd地址 对外公开 WEB端长连接通讯 格式: ws://ip:port
WSSAddr string // 节点的wssAddr地址 对外公开 WEB端长连接通讯 格式: wss://ip:port
ManagerAddr string // 对外访问的管理地址
APIUrl string // 对外访问的API基地址 格式: http://ip:port
AutoGetExternalIP bool // 是否自动获取外网IP
}
Channel struct { // 频道配置
CacheCount int // 频道缓存数量
@@ -156,19 +157,17 @@ type Options struct {
}
Cluster struct {
NodeId uint64 // 节点ID
Addr string // 节点监听地址 例如tcp://0.0.0.0:11110
ServerAddr string // 节点服务地址 例如 127.0.0.1:11110
APIUrl string // 节点之间可访问的api地址
ReqTimeout time.Duration // 请求超时时间
Role Role // 节点角色 replica, proxy
Seed string // 种子节点
SlotReplicaCount int // 每个槽的副本数量
ChannelReplicaCount int // 每个频道的副本数量
SlotCount int // 槽数量
Nodes []*Node // 集群节点地址
PeerRPCMsgTimeout time.Duration // 节点之间rpc消息超时时间
PeerRPCTimeoutScanInterval time.Duration // 节点之间rpc消息超时时间扫描间隔
NodeId uint64 // 节点ID
Addr string // 节点监听地址 例如tcp://0.0.0.0:11110
ServerAddr string // 节点之间能访问到的内网通讯地址 例如 127.0.0.1:11110
APIUrl string // 节点之间可访问的api地址
ReqTimeout time.Duration // 请求超时时间
Role Role // 节点角色 replica, proxy
Seed string // 种子节点
SlotReplicaCount int // 每个槽的副本数量
ChannelReplicaCount int // 每个频道的副本数量
SlotCount int // 槽数量
InitNodes []*Node // 集群初始节点地址
TickInterval time.Duration // 分布式tick间隔
@@ -348,42 +347,38 @@ func NewOptions(op ...Option) *Options {
Addr: "0.0.0.0:5172",
},
Cluster: struct {
NodeId uint64
Addr string
ServerAddr string
APIUrl string
ReqTimeout time.Duration
Role Role
Seed string
SlotReplicaCount int
ChannelReplicaCount int
SlotCount int
Nodes []*Node
PeerRPCMsgTimeout time.Duration
PeerRPCTimeoutScanInterval time.Duration
TickInterval time.Duration
HeartbeatIntervalTick int
ElectionIntervalTick int
ChannelReactorSubCount int
SlotReactorSubCount int
PongMaxTick int
NodeId uint64
Addr string
ServerAddr string
APIUrl string
ReqTimeout time.Duration
Role Role
Seed string
SlotReplicaCount int
ChannelReplicaCount int
SlotCount int
InitNodes []*Node
TickInterval time.Duration
HeartbeatIntervalTick int
ElectionIntervalTick int
ChannelReactorSubCount int
SlotReactorSubCount int
PongMaxTick int
}{
NodeId: 1001,
Addr: "tcp://0.0.0.0:11110",
ServerAddr: "",
ReqTimeout: time.Second * 10,
Role: RoleReplica,
SlotCount: 64,
SlotReplicaCount: 3,
ChannelReplicaCount: 3,
PeerRPCMsgTimeout: time.Second * 20,
PeerRPCTimeoutScanInterval: time.Second * 1,
TickInterval: time.Millisecond * 150,
HeartbeatIntervalTick: 1,
ElectionIntervalTick: 10,
ChannelReactorSubCount: 64,
SlotReactorSubCount: 64,
PongMaxTick: 30,
NodeId: 1001,
Addr: "tcp://0.0.0.0:11110",
ServerAddr: "",
ReqTimeout: time.Second * 10,
Role: RoleReplica,
SlotCount: 64,
SlotReplicaCount: 3,
ChannelReplicaCount: 3,
TickInterval: time.Millisecond * 150,
HeartbeatIntervalTick: 1,
ElectionIntervalTick: 10,
ChannelReactorSubCount: 64,
SlotReactorSubCount: 64,
PongMaxTick: 30,
},
Trace: struct {
Endpoint string
@@ -502,6 +497,7 @@ func (o *Options) ConfigureWithViper(vp *viper.Viper) {
o.External.WSSAddr = o.getString("external.wssAddr", o.External.WSSAddr)
o.External.ManagerAddr = o.getString("external.managerAddr", o.External.ManagerAddr)
o.External.APIUrl = o.getString("external.apiUrl", o.External.APIUrl)
o.External.AutoGetExternalIP = o.getBool("external.autoGetExternalIP", o.External.AutoGetExternalIP)
o.Manager.On = o.getBool("manager.on", o.Manager.On)
o.Manager.Addr = o.getString("manager.addr", o.Manager.Addr)
@@ -578,37 +574,46 @@ func (o *Options) ConfigureWithViper(vp *viper.Viper) {
o.ConfigureDataDir() // 数据目录
o.configureLog(vp) // 日志配置
ip := o.External.IP
if strings.TrimSpace(ip) == "" {
ip = getIntranetIP()
externalIp := o.External.IP
var err error
if strings.TrimSpace(externalIp) == "" && o.External.AutoGetExternalIP { // 开启了自动获取外网ip并且没有配置外网ip
externalIp, err = GetExternalIP() // 获取外网IP
if err != nil {
wklog.Panic("get external ip failed", zap.Error(err))
}
}
if strings.TrimSpace(externalIp) == "" {
externalIp = getIntranetIP() // 默认自动获取内网地址 (方便源码启动)
}
if strings.TrimSpace(o.External.TCPAddr) == "" {
addrPairs := strings.Split(o.Addr, ":")
portInt64, _ := strconv.ParseInt(addrPairs[len(addrPairs)-1], 10, 64)
o.External.TCPAddr = fmt.Sprintf("%s:%d", ip, portInt64)
o.External.TCPAddr = fmt.Sprintf("%s:%d", externalIp, portInt64)
}
if strings.TrimSpace(o.External.WSAddr) == "" {
addrPairs := strings.Split(o.WSAddr, ":")
portInt64, _ := strconv.ParseInt(addrPairs[len(addrPairs)-1], 10, 64)
o.External.WSAddr = fmt.Sprintf("%s://%s:%d", addrPairs[0], ip, portInt64)
o.External.WSAddr = fmt.Sprintf("%s://%s:%d", addrPairs[0], externalIp, portInt64)
}
if strings.TrimSpace(o.WSSAddr) != "" && strings.TrimSpace(o.External.WSSAddr) == "" {
addrPairs := strings.Split(o.WSSAddr, ":")
portInt64, _ := strconv.ParseInt(addrPairs[len(addrPairs)-1], 10, 64)
o.External.WSSAddr = fmt.Sprintf("%s://%s:%d", addrPairs[0], ip, portInt64)
o.External.WSSAddr = fmt.Sprintf("%s://%s:%d", addrPairs[0], externalIp, portInt64)
}
if strings.TrimSpace(o.External.ManagerAddr) == "" {
addrPairs := strings.Split(o.Manager.Addr, ":")
portInt64, _ := strconv.ParseInt(addrPairs[len(addrPairs)-1], 10, 64)
o.External.ManagerAddr = fmt.Sprintf("%s:%d", ip, portInt64)
o.External.ManagerAddr = fmt.Sprintf("%s:%d", externalIp, portInt64)
}
if strings.TrimSpace(o.External.APIUrl) == "" {
addrPairs := strings.Split(o.HTTPAddr, ":")
portInt64, _ := strconv.ParseInt(addrPairs[len(addrPairs)-1], 10, 64)
o.External.APIUrl = fmt.Sprintf("http://%s:%d", ip, portInt64)
o.External.APIUrl = fmt.Sprintf("http://%s:%d", externalIp, portInt64)
}
// =================== cluster ===================
@@ -630,15 +635,13 @@ func (o *Options) ConfigureWithViper(vp *viper.Viper) {
}
o.Cluster.SlotReplicaCount = o.getInt("cluster.slotReplicaCount", o.Cluster.SlotReplicaCount)
o.Cluster.ChannelReplicaCount = o.getInt("cluster.channelReplicaCount", o.Cluster.ChannelReplicaCount)
o.Cluster.PeerRPCMsgTimeout = o.getDuration("cluster.peerRPCMsgTimeout", o.Cluster.PeerRPCMsgTimeout)
o.Cluster.ServerAddr = o.getString("cluster.serverAddr", o.Cluster.ServerAddr)
o.Cluster.PeerRPCTimeoutScanInterval = o.getDuration("cluster.peerRPCTimeoutScanInterval", o.Cluster.PeerRPCTimeoutScanInterval)
o.Cluster.PongMaxTick = o.getInt("cluster.pongMaxTick", o.Cluster.PongMaxTick)
o.Cluster.ReqTimeout = o.getDuration("cluster.reqTimeout", o.Cluster.ReqTimeout)
o.Cluster.Seed = o.getString("cluster.seed", o.Cluster.Seed)
o.Cluster.SlotCount = o.getInt("cluster.slotCount", o.Cluster.SlotCount)
nodes := o.getStringSlice("cluster.nodes") // 格式为: nodeID@addr 例如 1@localhost:11110
nodes := o.getStringSlice("cluster.initNodes") // 格式为: nodeID@addr 例如 1@localhost:11110
if len(nodes) > 0 {
for _, nodeStr := range nodes {
if !strings.Contains(nodeStr, "@") {
@@ -656,7 +659,7 @@ func (o *Options) ConfigureWithViper(vp *viper.Viper) {
addr = fmt.Sprintf("%s:%s", addr, defaultPort)
}
o.Cluster.Nodes = append(o.Cluster.Nodes, &Node{
o.Cluster.InitNodes = append(o.Cluster.InitNodes, &Node{
Id: nodeID,
ServerAddr: addr,
})
@@ -673,6 +676,7 @@ func (o *Options) ConfigureWithViper(vp *viper.Viper) {
o.Trace.Endpoint = o.getString("trace.endpoint", o.Trace.Endpoint)
o.Trace.ServiceName = o.getString("trace.serviceName", o.Trace.ServiceName)
o.Trace.ServiceHostName = o.getString("trace.serviceHostName", fmt.Sprintf("%s[%d]", o.Trace.ServiceName, o.Cluster.NodeId))
o.Trace.PrometheusApiUrl = o.getString("trace.prometheusApiUrl", o.Trace.PrometheusApiUrl)
// =================== deliver ===================
o.Deliver.DeliverrCount = o.getInt("deliver.deliverrCount", o.Deliver.DeliverrCount)
@@ -889,7 +893,7 @@ func (o *Options) ConfigFileUsed() string {
// 是否是单机模式
func (o *Options) IsSingleNode() bool {
return len(o.Cluster.Nodes) == 0
return len(o.Cluster.InitNodes) == 0
}
func (o *Options) getString(key string, defaultValue string) string {
@@ -1010,6 +1014,31 @@ func getIntranetIP() string {
return ""
}
// 获取外网地址并保存到本地文件
func GetExternalIP() (string, error) {
externalIPBytes, err := os.ReadFile("external_ip.txt")
if err != nil {
if !os.IsNotExist(err) {
wklog.Warn("read external_ip.txt error", zap.Error(err))
}
} else if len(externalIPBytes) > 0 {
return string(externalIPBytes), nil
}
externalIP, err := wkutil.GetExternalIP()
if err != nil {
return "", err
}
if externalIP != "" {
err := os.WriteFile("external_ip.txt", []byte(externalIP), 0755)
if err != nil {
return "", err
}
}
return externalIP, nil
}
type Node struct {
Id uint64
ServerAddr string
@@ -1402,21 +1431,9 @@ func WithClusterSlotCount(slotCount int) Option {
}
}
func WithClusterNodes(nodes []*Node) Option {
func WithClusterInitNodes(nodes []*Node) Option {
return func(opts *Options) {
opts.Cluster.Nodes = nodes
}
}
func WithClusterPeerRPCMsgTimeout(peerRPCMsgTimeout time.Duration) Option {
return func(opts *Options) {
opts.Cluster.PeerRPCMsgTimeout = peerRPCMsgTimeout
}
}
func WithClusterPeerRPCTimeoutScanInterval(peerRPCTimeoutScanInterval time.Duration) Option {
return func(opts *Options) {
opts.Cluster.PeerRPCTimeoutScanInterval = peerRPCTimeoutScanInterval
opts.Cluster.InitNodes = nodes
}
}

View File

@@ -87,6 +87,7 @@ func New(opts *Options) *Server {
trace.WithEndpoint(s.opts.Trace.Endpoint),
trace.WithServiceName(s.opts.Trace.ServiceName),
trace.WithServiceHostName(s.opts.Trace.ServiceHostName),
trace.WithPrometheusApiUrl(s.opts.Trace.PrometheusApiUrl),
))
trace.SetGlobalTrace(s.trace)
@@ -129,8 +130,8 @@ func New(opts *Options) *Server {
// 初始化分布式服务
initNodes := make(map[uint64]string)
if len(s.opts.Cluster.Nodes) > 0 {
for _, node := range s.opts.Cluster.Nodes {
if len(s.opts.Cluster.InitNodes) > 0 {
for _, node := range s.opts.Cluster.InitNodes {
serverAddr := strings.ReplaceAll(node.ServerAddr, "tcp://", "")
initNodes[node.Id] = serverAddr
}

View File

@@ -42,6 +42,13 @@ func (s *DemoServer) Start() {
st, _ := fs.Sub(version.DemoFs, "demo/chatdemo/dist")
s.r.GetGinRoute().NoRoute(func(c *gin.Context) {
if c.Request.URL.Path == "" || c.Request.URL.Path == "/" {
c.Redirect(http.StatusFound, fmt.Sprintf("/chatdemo?apiurl=%s", s.s.opts.External.APIUrl))
c.Abort()
return
}
if strings.HasPrefix(c.Request.URL.Path, "/chatdemo") {
c.FileFromFS("./index.html", http.FS(st))
return

View File

@@ -41,28 +41,6 @@ func (m *ManagerServer) Start() {
// jwt和token认证中间件
m.r.Use(m.jwtAndTokenAuthMiddleware())
m.r.Use(func(c *wkhttp.Context) { // 管理者权限判断
if c.FullPath() == "/api/varz" {
c.Next()
return
}
if strings.TrimSpace(m.s.opts.ManagerToken) == "" {
c.Next()
return
}
if strings.HasPrefix(c.FullPath(), "/web") {
c.Next()
return
}
managerToken := c.GetHeader("token")
if managerToken != m.s.opts.ManagerToken {
c.AbortWithStatus(http.StatusUnauthorized)
return
}
c.Next()
})
m.r.GetGinRoute().Use(gzip.Gzip(gzip.DefaultCompression, gzip.WithExcludedPaths([]string{"/metrics"})))
st, _ := fs.Sub(version.WebFs, "web/dist")
@@ -138,6 +116,10 @@ func (m *ManagerServer) jwtAndTokenAuthMiddleware() wkhttp.HandlerFunc {
c.Next()
return
}
if strings.HasPrefix(fpath, "/metrics") {
c.Next()
return
}
// 管理token认证
token := c.GetHeader("token")

View File

@@ -60,8 +60,8 @@ func NewTestClusterServerTwoNode(t *testing.T, opt ...Option) (*Server, *Server)
ServerAddr: "0.0.0.0:11111",
})
s1 := NewTestServer(t, WithDemoOn(false), WithWSAddr("ws://0.0.0.0:5210"), WithManagerAddr("0.0.0.0:5310"), WithAddr("tcp://0.0.0.0:5110"), WithHTTPAddr("0.0.0.0:5001"), WithClusterAddr("tcp://0.0.0.0:11110"), WithClusterNodeId(1001), WithClusterNodes(nodes), WithOpts(opt...))
s2 := NewTestServer(t, WithDemoOn(false), WithWSAddr("ws://0.0.0.0:5220"), WithManagerAddr("0.0.0.0:5320"), WithAddr("tcp://0.0.0.0:5120"), WithHTTPAddr("0.0.0.0:5002"), WithClusterAddr("tcp://0.0.0.0:11111"), WithClusterNodeId(1002), WithClusterNodes(nodes), WithOpts(opt...))
s1 := NewTestServer(t, WithDemoOn(false), WithWSAddr("ws://0.0.0.0:5210"), WithManagerAddr("0.0.0.0:5310"), WithAddr("tcp://0.0.0.0:5110"), WithHTTPAddr("0.0.0.0:5001"), WithClusterAddr("tcp://0.0.0.0:11110"), WithClusterNodeId(1001), WithClusterInitNodes(nodes), WithOpts(opt...))
s2 := NewTestServer(t, WithDemoOn(false), WithWSAddr("ws://0.0.0.0:5220"), WithManagerAddr("0.0.0.0:5320"), WithAddr("tcp://0.0.0.0:5120"), WithHTTPAddr("0.0.0.0:5002"), WithClusterAddr("tcp://0.0.0.0:11111"), WithClusterNodeId(1002), WithClusterInitNodes(nodes), WithOpts(opt...))
return s1, s2
}
@@ -92,7 +92,7 @@ func NewTestClusterServerTreeNode(t testing.TB, opt ...Option) (*Server, *Server
WithHTTPAddr("0.0.0.0:5001"),
WithClusterAddr("tcp://0.0.0.0:11110"),
WithClusterNodeId(1001),
WithClusterNodes(nodes),
WithClusterInitNodes(nodes),
WithClusterTickInterval(time.Millisecond*50),
WithOpts(opt...),
)
@@ -108,7 +108,7 @@ func NewTestClusterServerTreeNode(t testing.TB, opt ...Option) (*Server, *Server
WithHTTPAddr("0.0.0.0:5002"),
WithClusterAddr("tcp://0.0.0.0:11111"),
WithClusterNodeId(1002),
WithClusterNodes(nodes),
WithClusterInitNodes(nodes),
WithClusterTickInterval(time.Millisecond*50),
WithOpts(opt...),
)
@@ -124,7 +124,7 @@ func NewTestClusterServerTreeNode(t testing.TB, opt ...Option) (*Server, *Server
WithHTTPAddr("0.0.0.0:5003"),
WithClusterAddr("tcp://0.0.0.0:11112"),
WithClusterNodeId(1003),
WithClusterNodes(nodes),
WithClusterInitNodes(nodes),
WithClusterTickInterval(time.Millisecond*50),
WithOpts(opt...),
)

View File

@@ -129,7 +129,6 @@ func (s *Server) Start() error {
if strings.TrimSpace(s.opts.Seed) != "" {
learners = []uint64{s.opts.NodeId}
}
fmt.Println("replicas--------->:", s.opts.NodeId, replicas, learners)
s.configReactor.Step(s.handlerKey, replica.Message{
MsgType: replica.MsgConfigResp,

View File

@@ -47,6 +47,7 @@ func (s *Server) handleLeaderTermStartIndex(c *wkserver.Context) {
}
binary.BigEndian.PutUint64(resultBytes, lastIndex)
}
c.Write(resultBytes)
}

View File

@@ -110,6 +110,9 @@ func (s *Server) handleNodeJoin(cmd *CMD) error {
// 将新节点加入学习者列表
if !wkutil.ArrayContainsUint64(s.cfg.cfg.Learners, newNode.Id) {
s.cfg.cfg.Learners = append(s.cfg.cfg.Learners, newNode.Id)
// 如果是新加入的节点,就是从自己迁移到自己
s.cfg.cfg.MigrateFrom = newNode.Id
s.cfg.cfg.MigrateTo = newNode.Id
}
return s.SwitchConfig(s.cfg.cfg)

View File

@@ -52,6 +52,13 @@ func (s *Server) checkClusterConfig() error {
return err
}
// 检查和均衡槽领导
err = s.handleSlotLeaderAutoBalance()
if err != nil {
s.Error("handleSlotLeaderAutoBalance failed", zap.Error(err))
return err
}
}
// ================== 处理槽领导选举 ==================
@@ -350,6 +357,8 @@ func (s *Server) handleNodeJoining() error {
if uint32(len(firstSlot.Replicas)) < s.cfgServer.SlotReplicaCount() { // 如果当前槽的副本数量小于配置的副本数量,则可以将新节点直接加入到学习节点中
for _, slot := range slots {
newSlot := slot.Clone()
newSlot.MigrateFrom = joiningNode.Id
newSlot.MigrateTo = joiningNode.Id
newSlot.Learners = append(slot.Learners, joiningNode.Id)
migrateSlots = append(migrateSlots, newSlot)
}
@@ -619,3 +628,35 @@ func (s *Server) handleSlotLeaderElection() error {
return s.opts.OnSlotElection(electionSlots)
}
func (s *Server) handleSlotLeaderAutoBalance() error {
cfg := s.cfgServer.Config()
// 有未加入的节点或者有槽正在迁移,则不进行自动均衡
for _, node := range cfg.Nodes {
if node.Status != pb.NodeStatus_NodeStatusJoined {
return nil
}
}
for _, slot := range cfg.Slots {
if slot.MigrateFrom != 0 || slot.MigrateTo != 0 {
return nil
}
if slot.Status == pb.SlotStatus_SlotStatusCandidate {
return nil
}
}
// 自动均衡槽领导
changedSlots := s.autoBalanceSlotLeaders(cfg)
if len(changedSlots) == 0 {
return nil
}
err := s.ProposeSlots(changedSlots)
if err != nil {
s.Error("handleSlotLeaderAutoBalance failed,ProposeSlots failed", zap.Error(err))
return err
}
return nil
}

View File

@@ -109,7 +109,7 @@ func NewOptions(opt ...Option) *Options {
LazyFreeCycle: 1,
InitialTaskQueueCap: 24,
LogSyncLimitSizeOfEach: 1024 * 1024 * 20, // 20M
Addr: "tcp://127.0.0.1:10001",
Addr: "tcp://127.0.0.1:11110",
ChannelElectionPoolSize: 10,
MaxChannelElectionBatchLen: 100,
ChannelMaxReplicaCount: 3,

View File

@@ -103,7 +103,12 @@ func New(opts *Options) *Server {
}
initNodes[seedNodeID] = seedAddr
}
initNodes[s.opts.NodeId] = strings.ReplaceAll(s.opts.Addr, "tcp://", "")
if s.opts.ServerAddr != "" {
initNodes[s.opts.NodeId] = strings.ReplaceAll(s.opts.ServerAddr, "tcp://", "")
} else {
initNodes[s.opts.NodeId] = strings.ReplaceAll(s.opts.Addr, "tcp://", "")
}
}
opts.Send = s.send
@@ -527,7 +532,7 @@ func (s *Server) joinLoop() {
case <-time.After(time.Second * 2):
resp, err := s.nodeManager.requestClusterJoin(seedNodeId, req)
if err != nil {
s.Error("requestClusterJoin failed", zap.Error(err))
s.Error("requestClusterJoin failed", zap.Error(err), zap.Uint64("seedNodeId", seedNodeId))
continue
}
if len(resp.Nodes) > 0 {

View File

@@ -264,6 +264,7 @@ func (s *slot) FollowerToLeader(followerId uint64) error {
func (s *slot) learnerTo(learnerId uint64) error {
fmt.Println("learnerTo----->", learnerId)
s.learnerToLock.Lock()
defer s.learnerToLock.Unlock()
@@ -288,7 +289,9 @@ func (s *slot) learnerTo(learnerId uint64) error {
if !wkutil.ArrayContainsUint64(slot.Replicas, slot.MigrateTo) {
slot.Replicas = append(slot.Replicas, slot.MigrateTo)
}
slot.Replicas = wkutil.RemoveUint64(slot.Replicas, slot.MigrateFrom)
if slot.MigrateFrom != slot.MigrateTo {
slot.Replicas = wkutil.RemoveUint64(slot.Replicas, slot.MigrateFrom)
}
if slot.Leader == slot.MigrateFrom {
slot.Leader = learnerId

View File

@@ -88,13 +88,13 @@ func (r *Replica) HasReady() bool {
return true
}
if r.status == StatusLogCoflictCheck && isFollower {
return r.leader != 0
return r.leader != 0 && r.logConflictCheckTick >= r.opts.RequestTimeoutTick
}
return false
}
if isFollower && r.leader != 0 && r.logConflictCheckTick >= r.opts.RequestTimeoutTick {
if isFollower && r.leader != 0 {
if r.syncTick >= r.syncIntervalTick && !r.syncing {
return true
}
@@ -147,8 +147,8 @@ func (r *Replica) Ready() Ready {
}
}
// ==================== 日志冲突检查 ====================
if r.status == StatusLogCoflictCheck && isFollower && r.logConflictCheckTick >= r.opts.RequestTimeoutTick {
if r.leader != 0 {
if r.status == StatusLogCoflictCheck && isFollower {
if r.leader != 0 && r.logConflictCheckTick >= r.opts.RequestTimeoutTick {
r.logConflictCheckTick = 0
r.msgs = append(r.msgs, r.newMsgLogConflictCheck())
rd.Messages = r.msgs
@@ -194,19 +194,20 @@ func (r *Replica) hardStateChange() bool {
func (r *Replica) Tick() {
if r.role == RoleFollower || r.role == RoleLearner {
r.syncTick++
if r.syncTick > r.syncIntervalTick*2 { // 同步超时 一直没有返回
r.send(r.newSyncTimeoutMsg()) // 同步超时
// 重置同步状态,从而可以重新发起同步
r.syncing = false
r.syncTick = 0
}
if r.status == StatusReady {
r.syncTick++
if r.syncTick > r.syncIntervalTick*2 && r.status == StatusReady { // 同步超时 一直没有返回
r.send(r.newSyncTimeoutMsg()) // 同步超时
// 日志冲突检查超时,重新发起
if r.status == StatusLogCoflictCheck {
// 重置同步状态,从而可以重新发起同步
r.syncing = false
r.syncTick = 0
}
} else if r.status == StatusLogCoflictCheck { // 日志冲突检查超时,重新发起
r.logConflictCheckTick++
}
}
if r.tickFnc != nil {

View File

@@ -253,11 +253,15 @@ func (r *Replica) stepFollower(m Message) error {
r.logConflictCheckTick = r.opts.RequestTimeoutTick // 可以进行下次请求
if m.Index != NoConflict && m.Index > 0 {
r.replicaLog.updateLastIndex(m.Index - 1)
if m.Index >= r.replicaLog.unstable.offset {
r.replicaLog.unstable.truncateLogTo(m.Index)
truncateLogIndex := m.Index
if truncateLogIndex > r.replicaLog.lastLogIndex+1 {
truncateLogIndex = r.replicaLog.lastLogIndex + 1
}
if truncateLogIndex >= r.replicaLog.unstable.offset {
r.replicaLog.unstable.truncateLogTo(truncateLogIndex)
}
r.replicaLog.updateLastIndex(truncateLogIndex - 1)
}
}
@@ -312,11 +316,16 @@ func (r *Replica) stepLearner(m Message) error {
r.status = StatusReady
if m.Index != NoConflict && m.Index > 0 {
r.replicaLog.updateLastIndex(m.Index - 1)
if m.Index >= r.replicaLog.unstable.offset {
r.replicaLog.unstable.truncateLogTo(m.Index)
truncateLogIndex := m.Index
if truncateLogIndex > r.replicaLog.lastLogIndex+1 {
truncateLogIndex = r.replicaLog.lastLogIndex + 1
}
if truncateLogIndex >= r.replicaLog.unstable.offset {
r.replicaLog.unstable.truncateLogTo(truncateLogIndex)
}
r.replicaLog.updateLastIndex(truncateLogIndex - 1)
}
}
case MsgSyncResp: // 同步日志返回
@@ -337,6 +346,7 @@ func (r *Replica) stepLearner(m Message) error {
} else {
r.syncTick = 0
}
r.updateFollowCommittedIndex(m.CommittedIndex) // 更新提交索引
}
return nil

View File

@@ -42,6 +42,9 @@ func (u *unstable) appliedTo(index uint64) {
return
}
num := int(index + 1 - u.offset)
fmt.Println("appliedTo", num, u.offset, index, len(u.logs))
u.logs = u.logs[num:]
u.offset = index + 1
u.offsetInProgress = max(u.offsetInProgress, u.offset)

View File

@@ -297,7 +297,7 @@ func (d *metrics) requestAndFillAppMetrics(label string, filterId string, rg v1.
}
func (d *metrics) requestAndFillClusterMetrics(label string, rg v1.Range, rate bool, resps *[]*clusterMetricsResp) {
query := `rate(` + label + `[10s])`
query := `rate(` + label + `[1m])`
if !rate {
query = label
}
@@ -314,12 +314,12 @@ func (d *metrics) requestAndFillClusterMetrics(label string, rg v1.Range, rate b
}
func (d *metrics) requestAndFillSystemMetrics(label string, rg v1.Range, rate bool, resps *[]*systemMetricsResp) {
query := `rate(` + label + `[10s])`
query := `rate(` + label + `[1m])`
if !rate {
query = label
}
if label == "go_gc_duration_seconds_count" {
query = `increase(` + label + `[10s])`
query = `increase(` + label + `[1m])`
}
countValue, err := d.opts.requestPrometheus(query, rg)
if err != nil {
@@ -530,9 +530,9 @@ func getLabelByFilterId(label string, filterId string) string {
func getRateByLabelFilterId(label string, filterId string) string {
if filterId != "" {
return `rate(` + label + `{id="` + filterId + `"}[10s])`
return `rate(` + label + `{id="` + filterId + `"}[1m])`
}
return `sum(rate(` + label + `[10s]))`
return `sum(rate(` + label + `[1m]))`
}
type appMetricsResp struct {

View File

@@ -1,6 +1,7 @@
package wkutil
import (
"errors"
"io"
"net"
"net/http"
@@ -20,6 +21,15 @@ func GetExternalIP() (string, error) {
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
return "", errors.New("get external ip failed")
}
resultStr := string(resultBytes)
if len(resultStr) > 15 {
return "", errors.New("get external ip failed")
}
return strings.TrimSpace(string(resultBytes)), nil
}