mirror of
https://gitee.com/WuKongDev/WuKongIM.git
synced 2025-12-06 14:59:08 +08:00
fix: #310
This commit is contained in:
@@ -3,8 +3,9 @@ addr: "tcp://0.0.0.0:5150" # tcp监听地址
|
||||
httpAddr: "0.0.0.0:5005"
|
||||
wsAddr: "ws://0.0.0.0:5250"
|
||||
rootDir: "./wukongimdata/5"
|
||||
manager:
|
||||
on: false
|
||||
manager: # 管理端配置
|
||||
on: true
|
||||
addr: "0.0.0.0:5305"
|
||||
demo:
|
||||
on: true
|
||||
addr: "0.0.0.0:5175"
|
||||
|
||||
28
exampleconfig/join_as_replica_2.yaml
Normal file
28
exampleconfig/join_as_replica_2.yaml
Normal file
@@ -0,0 +1,28 @@
|
||||
mode: "release"
|
||||
addr: "tcp://0.0.0.0:5160" # tcp监听地址
|
||||
httpAddr: "0.0.0.0:5006"
|
||||
wsAddr: "ws://0.0.0.0:5260"
|
||||
rootDir: "./wukongimdata/6"
|
||||
manager: # 管理端配置
|
||||
on: true
|
||||
addr: "0.0.0.0:5306"
|
||||
demo:
|
||||
on: true
|
||||
addr: "0.0.0.0:5176"
|
||||
conversation:
|
||||
on: true
|
||||
cluster:
|
||||
nodeId: 6
|
||||
addr: "tcp://127.0.0.1:10006" # 分布式监听地址
|
||||
serverAddr: "127.0.0.1:10006" # 节点通讯地址
|
||||
apiUrl: "http://127.0.0.1:5006"
|
||||
seed: "1@127.0.0.1:10001" # 其他任意集群内的节点
|
||||
|
||||
auth:
|
||||
kind: 'jwt' # 认证方式 jwt: jwt认证 none: 无需认证
|
||||
users:
|
||||
- "admin:pwd:*"
|
||||
- "guest:guest:[*:r]" # guest用户密码为guest对所有资源有读权限
|
||||
jwt:
|
||||
secret: "xxxxx"
|
||||
expire: 30d
|
||||
@@ -2,7 +2,7 @@ mode: "release"
|
||||
addr: "tcp://0.0.0.0:5100" # tcp监听地址
|
||||
httpAddr: "0.0.0.0:5001" # http api监听地址
|
||||
wsAddr: "ws://0.0.0.0:5200" # websocket ws 监听地址
|
||||
rootDir: "./wukongimdata/1001" # 数据存储目录
|
||||
rootDir: "./wukongimdata/1" # 数据存储目录
|
||||
pprofOn: true
|
||||
stress: false # 是否开启压力测试模式
|
||||
logger:
|
||||
@@ -15,7 +15,9 @@ demo:
|
||||
conversation:
|
||||
on: true
|
||||
cluster:
|
||||
nodeId: 1001 # 节点ID
|
||||
nodeId: 1 # 节点ID
|
||||
addr: "tcp://127.0.0.1:10001"
|
||||
serverAddr: "127.0.0.1:10001" # 节点通讯地址
|
||||
apiUrl: "http://127.0.0.1:5001"
|
||||
|
||||
# 认证配置 用户名:密码:资源:权限 *表示通配符 资源格式也可以是[资源ID:权限]
|
||||
|
||||
@@ -188,6 +188,8 @@ func New(opts *options.Options) *Server {
|
||||
clusterconfig.WithChannelMaxReplicaCount(uint32(s.opts.Cluster.ChannelReplicaCount)),
|
||||
clusterconfig.WithSlotMaxReplicaCount(uint32(s.opts.Cluster.SlotReplicaCount)),
|
||||
clusterconfig.WithPongMaxTick(s.opts.Cluster.PongMaxTick),
|
||||
clusterconfig.WithServerAddr(s.opts.Cluster.ServerAddr),
|
||||
clusterconfig.WithSeed(s.opts.Cluster.Seed),
|
||||
)),
|
||||
cluster.WithAddr(s.opts.Cluster.Addr),
|
||||
cluster.WithDataDir(path.Join(opts.DataDir)),
|
||||
|
||||
@@ -58,6 +58,7 @@ func (s *Server) onNodeMessage(_ gnet.Conn, m *proto.Message) {
|
||||
s.Error("onNodeMessage: unmarshal event failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if trace.GlobalTrace != nil && event.Type == rafttype.SyncReq {
|
||||
trace.GlobalTrace.Metrics.Cluster().MsgSyncIncomingCountAdd(trace.ClusterKindConfig, 1)
|
||||
trace.GlobalTrace.Metrics.Cluster().MsgSyncIncomingBytesAdd(trace.ClusterKindConfig, int64(m.Size()))
|
||||
|
||||
@@ -207,16 +207,7 @@ func (s *Server) Start() error {
|
||||
if len(cfg.Nodes) == 0 {
|
||||
if len(s.opts.ConfigOptions.InitNodes) > 0 {
|
||||
s.addOrUpdateNodes(s.opts.ConfigOptions.InitNodes)
|
||||
} else if strings.TrimSpace(s.opts.Seed) != "" {
|
||||
nodeMap := make(map[uint64]string)
|
||||
seedNodeId, addr, err := seedNode(s.opts.Seed)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nodeMap[seedNodeId] = addr
|
||||
s.addOrUpdateNodes(nodeMap)
|
||||
}
|
||||
|
||||
} else {
|
||||
nodeMap := make(map[uint64]string)
|
||||
for _, node := range cfg.Nodes {
|
||||
@@ -225,6 +216,19 @@ func (s *Server) Start() error {
|
||||
s.addOrUpdateNodes(nodeMap)
|
||||
}
|
||||
|
||||
// 如果有种子节点,但节点连接不存在,则建立连接
|
||||
if strings.TrimSpace(s.opts.Seed) != "" {
|
||||
nodeMap := make(map[uint64]string)
|
||||
seedNodeId, addr, err := seedNode(s.opts.Seed)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !s.nodeManager.exist(seedNodeId) {
|
||||
nodeMap[seedNodeId] = addr
|
||||
s.addOrUpdateNodes(nodeMap)
|
||||
}
|
||||
}
|
||||
|
||||
// 如果有新加入的节点 则执行加入逻辑
|
||||
join, err := s.needJoin()
|
||||
if err != nil {
|
||||
@@ -344,6 +348,10 @@ func (s *Server) uidToServerId(uid string) uint64 {
|
||||
// 保存槽分布式配置(事件)
|
||||
func (s *Server) onSaveSlotConfig(slotId uint32, cfg rafttype.Config) error {
|
||||
|
||||
if s.cfgServer.LeaderId() == 0 {
|
||||
return rafttype.ErrNotLeader
|
||||
}
|
||||
|
||||
slot := s.cfgServer.Slot(slotId)
|
||||
if slot == nil {
|
||||
s.Error("slot not found", zap.Uint32("slotId", slotId))
|
||||
@@ -356,7 +364,7 @@ func (s *Server) onSaveSlotConfig(slotId uint32, cfg rafttype.Config) error {
|
||||
// 提案槽更新(槽更新会触发分布式配置事件,事件会触发slot更新最新的配置,所以这里只需要提案即可,不需要再进行配置切换)
|
||||
err := s.cfgServer.ProposeSlots([]*types.Slot{cloneSlot})
|
||||
if err != nil {
|
||||
s.Error("onSaveSlotConfig: propose slot failed", zap.Uint32("slotId", slotId), zap.Error(err))
|
||||
s.Error("onSaveSlotConfig: propose slot failed", zap.Any("oldSlot", slot), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -8,8 +8,8 @@ import (
|
||||
|
||||
type Options struct {
|
||||
// NodeId 节点id
|
||||
NodeId uint64
|
||||
|
||||
NodeId uint64
|
||||
ServerAddr string // 分布式节点通讯地址
|
||||
ApiServerAddr string // 分布式api服务地址(内网地址)
|
||||
TickInterval time.Duration // 分布式tick间隔
|
||||
// InitNodes 初始化节点列表 key为节点id,value为分布式通讯的地址
|
||||
@@ -109,3 +109,9 @@ func WithSeed(seed string) Option {
|
||||
o.Seed = seed
|
||||
}
|
||||
}
|
||||
|
||||
func WithServerAddr(serverAddr string) Option {
|
||||
return func(o *Options) {
|
||||
o.ServerAddr = serverAddr
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,14 +2,18 @@ package clusterconfig
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/WuKongIM/WuKongIM/pkg/cluster/node/types"
|
||||
pb "github.com/WuKongIM/WuKongIM/pkg/cluster/node/types"
|
||||
"github.com/WuKongIM/WuKongIM/pkg/raft/raft"
|
||||
rafttypes "github.com/WuKongIM/WuKongIM/pkg/raft/types"
|
||||
"github.com/WuKongIM/WuKongIM/pkg/wklog"
|
||||
"github.com/WuKongIM/WuKongIM/pkg/wkutil"
|
||||
"github.com/bwmarrin/snowflake"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@@ -84,18 +88,33 @@ func (s *Server) initRaft() error {
|
||||
raftConfig = s.configToRaftConfig(s.config)
|
||||
} else {
|
||||
replicas := make([]uint64, 0, len(s.opts.InitNodes))
|
||||
learners := make([]uint64, 0)
|
||||
|
||||
var leader uint64
|
||||
if len(s.opts.InitNodes) > 0 {
|
||||
for nodeId := range s.opts.InitNodes {
|
||||
replicas = append(replicas, nodeId)
|
||||
}
|
||||
} else { // 单节点启动
|
||||
leader = s.opts.NodeId
|
||||
replicas = append(replicas, s.opts.NodeId)
|
||||
} else {
|
||||
|
||||
if strings.TrimSpace(s.opts.Seed) != "" {
|
||||
// 有种子节点,需要加入集群
|
||||
seedNodeId, _, err := seedNode(s.opts.Seed)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
replicas = append(replicas, seedNodeId) // 初次启动,如果有种子节点,种子节点作为replica
|
||||
learners = append(learners, s.opts.NodeId) // 初次启动,如果有种子节点,当前节点作为learner
|
||||
} else {
|
||||
// 单节点启动
|
||||
replicas = append(replicas, s.opts.NodeId)
|
||||
leader = s.opts.NodeId
|
||||
}
|
||||
|
||||
}
|
||||
raftConfig = rafttypes.Config{
|
||||
Replicas: replicas,
|
||||
Learners: learners,
|
||||
Term: 1,
|
||||
Leader: leader,
|
||||
}
|
||||
@@ -292,9 +311,14 @@ func (s *Server) genConfigId() uint64 {
|
||||
|
||||
func (s *Server) configToRaftConfig(cfg *Config) rafttypes.Config {
|
||||
|
||||
replicas := make([]uint64, 0, len(cfg.nodes()))
|
||||
for _, node := range cfg.nodes() {
|
||||
replicas = append(replicas, node.Id)
|
||||
nodes := cfg.nodes()
|
||||
|
||||
replicas := make([]uint64, 0, len(nodes))
|
||||
for _, node := range nodes {
|
||||
if node.AllowVote && node.Role == pb.NodeRole_NodeRoleReplica && !wkutil.ArrayContainsUint64(cfg.cfg.Learners, node.Id) {
|
||||
replicas = append(replicas, node.Id)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
var leader uint64
|
||||
@@ -311,3 +335,17 @@ func (s *Server) configToRaftConfig(cfg *Config) rafttypes.Config {
|
||||
// Term: cfg.cfg.Term, // 不需要设置term,不设置表示使用当前term,Config的term只是应用的最新的term,不表示是日志的term
|
||||
}
|
||||
}
|
||||
|
||||
func seedNode(seed string) (uint64, string, error) {
|
||||
seedArray := strings.Split(seed, "@")
|
||||
if len(seedArray) < 2 {
|
||||
return 0, "", errors.New("seed format error")
|
||||
}
|
||||
seedNodeIDStr := seedArray[0]
|
||||
seedAddr := seedArray[1]
|
||||
seedNodeID, err := strconv.ParseUint(seedNodeIDStr, 10, 64)
|
||||
if err != nil {
|
||||
return 0, "", err
|
||||
}
|
||||
return seedNodeID, seedAddr, nil
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ func (s *Server) applyLog(log types.Log) error {
|
||||
s.config.cfg.Term = log.Term
|
||||
s.config.cfg.Version = log.Index
|
||||
|
||||
// fmt.Println("apply log", log.Index, log.Term, cmd.CmdType.String())
|
||||
err = s.config.saveConfig()
|
||||
if err != nil {
|
||||
s.Error("save config err", zap.Error(err))
|
||||
|
||||
@@ -130,11 +130,12 @@ func (p *PebbleShardLogStorage) TruncateLogTo(index uint64) error {
|
||||
p.Error("get max index error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
if index <= appliedIdx {
|
||||
if index < appliedIdx {
|
||||
p.Error("index must be less than applied index", zap.Uint64("index", index), zap.Uint64("appliedIdx", appliedIdx))
|
||||
return nil
|
||||
}
|
||||
keyData := key.NewLogKey(index)
|
||||
|
||||
keyData := key.NewLogKey(index + 1)
|
||||
maxKeyData := key.NewLogKey(math.MaxUint64)
|
||||
err = p.db.DeleteRange(keyData, maxKeyData, p.wo)
|
||||
if err != nil {
|
||||
|
||||
@@ -9,7 +9,6 @@ import (
|
||||
|
||||
// 分布式初始化事件
|
||||
func (h *handler) handleClusterInit() {
|
||||
|
||||
cfg := &types.Config{
|
||||
SlotCount: h.cfgOptions.SlotCount,
|
||||
SlotReplicaCount: h.cfgOptions.SlotMaxReplicaCount,
|
||||
@@ -43,6 +42,7 @@ func (h *handler) handleClusterInit() {
|
||||
} else { // 没有initNodes,则认为是单节点模式
|
||||
nodes = append(nodes, &types.Node{
|
||||
Id: opts.NodeId,
|
||||
ClusterAddr: h.s.cfgOptions.ServerAddr,
|
||||
ApiServerAddr: opts.ApiServerAddr,
|
||||
Online: true,
|
||||
AllowVote: true,
|
||||
|
||||
@@ -7,7 +7,9 @@ import (
|
||||
)
|
||||
|
||||
func (h *handler) handleCompare() {
|
||||
|
||||
if h.cfgServer.LeaderId() == 0 {
|
||||
return
|
||||
}
|
||||
// 如果配置里自己节点的apiServerAddr配置不存在或不同,则提案配置
|
||||
if strings.TrimSpace(h.cfgOptions.ApiServerAddr) != "" {
|
||||
localNode := h.cfgServer.Node(h.cfgOptions.NodeId)
|
||||
|
||||
@@ -52,6 +52,7 @@ func (s *Server) Stop() {
|
||||
}
|
||||
|
||||
func (s *Server) Step(event types.Event) {
|
||||
|
||||
s.cfgServer.StepRaftEvent(event)
|
||||
|
||||
if s.cfgServer.IsLeader() && event.Type == types.SyncReq {
|
||||
@@ -93,6 +94,7 @@ func (s *Server) handleEvents() {
|
||||
}
|
||||
}
|
||||
// 比较新旧配置
|
||||
|
||||
s.handler.handleCompare()
|
||||
|
||||
if s.cfgServer.IsLeader() {
|
||||
|
||||
@@ -1,11 +1 @@
|
||||
package raft
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrLogIndexNotContinuous = errors.New("log index is not continuous")
|
||||
ErrStopped = errors.New("raft is stopped")
|
||||
ErrNotLeader = errors.New("not leader")
|
||||
ErrPaused = errors.New("raft is paused")
|
||||
ErrProposalDropped = errors.New("proposal dropped")
|
||||
)
|
||||
|
||||
@@ -16,7 +16,7 @@ func (n *Node) BecomeCandidate() {
|
||||
n.voteFor = n.opts.NodeId
|
||||
n.cfg.Leader = 0
|
||||
n.cfg.Role = types.RoleCandidate
|
||||
n.Info("become candidate", zap.Uint32("term", n.cfg.Term), zap.Int("nextElectionTimeout", n.randomizedElectionTimeout))
|
||||
n.Info("become candidate", zap.Uint32("term", n.cfg.Term), zap.Int("nextElectionTimeout", n.randomizedElectionTimeout), zap.Uint64s("replicas", n.cfg.Replicas))
|
||||
}
|
||||
|
||||
func (n *Node) BecomeFollower(term uint32, leaderId uint64) {
|
||||
@@ -27,7 +27,13 @@ func (n *Node) BecomeFollower(term uint32, leaderId uint64) {
|
||||
n.voteFor = None
|
||||
n.cfg.Leader = leaderId
|
||||
n.cfg.Role = types.RoleFollower
|
||||
n.Debug("become follower", zap.Uint32("term", n.cfg.Term), zap.Uint64("leaderId", leaderId))
|
||||
|
||||
if n.opts.Key == "clusterconfig" {
|
||||
n.Info("become follower", zap.Uint32("term", n.cfg.Term), zap.Uint64("leaderId", leaderId), zap.Uint64s("replicas", n.cfg.Replicas))
|
||||
} else {
|
||||
n.Debug("become follower", zap.Uint32("term", n.cfg.Term), zap.Uint64("leaderId", leaderId), zap.Uint64s("replicas", n.cfg.Replicas))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (n *Node) BecomeLeader(term uint32) {
|
||||
@@ -37,7 +43,13 @@ func (n *Node) BecomeLeader(term uint32) {
|
||||
n.tickFnc = n.tickLeader
|
||||
n.cfg.Leader = n.opts.NodeId
|
||||
n.cfg.Role = types.RoleLeader
|
||||
n.Debug("become leader", zap.Uint32("term", n.cfg.Term))
|
||||
|
||||
if n.opts.Key == "clusterconfig" {
|
||||
n.Info("become leader", zap.Uint32("term", n.cfg.Term), zap.Uint64s("replicas", n.cfg.Replicas))
|
||||
} else {
|
||||
n.Debug("become leader", zap.Uint32("term", n.cfg.Term), zap.Uint64s("replicas", n.cfg.Replicas))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (n *Node) BecomeLearner(term uint32, leaderId uint64) {
|
||||
@@ -48,7 +60,13 @@ func (n *Node) BecomeLearner(term uint32, leaderId uint64) {
|
||||
n.voteFor = None
|
||||
n.cfg.Leader = leaderId
|
||||
n.cfg.Role = types.RoleLearner
|
||||
n.Info("become learner", zap.Uint32("term", n.cfg.Term), zap.Uint64("leaderId", leaderId))
|
||||
|
||||
if n.opts.Key == "clusterconfig" {
|
||||
n.Info("become learner", zap.Uint32("term", n.cfg.Term), zap.Uint64("leaderId", leaderId), zap.Uint64s("replicas", n.cfg.Replicas))
|
||||
} else {
|
||||
n.Debug("become learner", zap.Uint32("term", n.cfg.Term), zap.Uint64("leaderId", leaderId), zap.Uint64s("replicas", n.cfg.Replicas))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (n *Node) reset() {
|
||||
|
||||
@@ -4,10 +4,12 @@ import (
|
||||
"errors"
|
||||
|
||||
"github.com/WuKongIM/WuKongIM/pkg/raft/types"
|
||||
"github.com/WuKongIM/WuKongIM/pkg/wkutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (n *Node) switchConfig(newCfg types.Config) error {
|
||||
|
||||
oldCfg := n.cfg
|
||||
if n.cfg.Version > newCfg.Version {
|
||||
n.Error("config version is lower than current version", zap.Uint64("newVersion", newCfg.Version), zap.Uint64("currentVersion", oldCfg.Version))
|
||||
@@ -51,7 +53,12 @@ func (n *Node) roleChangeIfNeed(oldCfg, newCfg types.Config) {
|
||||
if newCfg.Leader != 0 && newCfg.Leader == n.opts.NodeId {
|
||||
n.BecomeLeader(newCfg.Term)
|
||||
} else {
|
||||
n.BecomeFollower(newCfg.Term, newCfg.Leader)
|
||||
if wkutil.ArrayContainsUint64(newCfg.Replicas, n.opts.NodeId) {
|
||||
n.BecomeFollower(newCfg.Term, newCfg.Leader)
|
||||
} else if wkutil.ArrayContainsUint64(newCfg.Learners, n.opts.NodeId) {
|
||||
n.BecomeLearner(newCfg.Term, newCfg.Leader)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,29 +40,42 @@ func (n *Node) Step(e types.Event) error {
|
||||
case types.Campaign:
|
||||
n.campaign()
|
||||
case types.VoteReq: // 投票请求
|
||||
if n.canVote(e) {
|
||||
if e.From == n.opts.NodeId {
|
||||
n.sendVoteResp(types.LocalNode, types.ReasonOk)
|
||||
} else {
|
||||
n.sendVoteResp(e.From, types.ReasonOk)
|
||||
}
|
||||
if n.cfg.Role != types.RoleLearner {
|
||||
if n.canVote(e) {
|
||||
if e.From == n.opts.NodeId {
|
||||
n.sendVoteResp(types.LocalNode, types.ReasonOk)
|
||||
} else {
|
||||
n.sendVoteResp(e.From, types.ReasonOk)
|
||||
}
|
||||
|
||||
n.voteFor = e.From
|
||||
n.electionElapsed = 0
|
||||
if e.From != n.opts.NodeId {
|
||||
n.Info("agree vote", zap.Uint64("voteFor", e.From), zap.Uint32("term", e.Term), zap.Uint64("index", e.Index))
|
||||
n.voteFor = e.From
|
||||
n.electionElapsed = 0
|
||||
if e.From != n.opts.NodeId {
|
||||
n.Info("agree vote", zap.Uint64("voteFor", e.From), zap.Uint32("term", e.Term), zap.Uint64("index", e.Index))
|
||||
}
|
||||
} else {
|
||||
if n.voteFor != None {
|
||||
n.Info("already vote for other", zap.Uint64("voteFor", n.voteFor))
|
||||
} else if e.Index < n.queue.lastLogIndex {
|
||||
n.Info("lower config version, reject vote")
|
||||
} else if e.Term < n.cfg.Term {
|
||||
n.Info("lower term, reject vote")
|
||||
}
|
||||
n.Info("reject vote", zap.Uint64("from", e.From), zap.Uint32("term", e.Term), zap.Uint64("index", e.Index))
|
||||
n.sendVoteResp(e.From, types.ReasonError)
|
||||
}
|
||||
} else {
|
||||
if n.voteFor != None {
|
||||
n.Info("already vote for other", zap.Uint64("voteFor", n.voteFor))
|
||||
} else if e.Index < n.queue.lastLogIndex {
|
||||
n.Info("lower config version, reject vote")
|
||||
} else if e.Term < n.cfg.Term {
|
||||
n.Info("lower term, reject vote")
|
||||
}
|
||||
n.Info("reject vote", zap.Uint64("from", e.From), zap.Uint32("term", e.Term), zap.Uint64("index", e.Index))
|
||||
n.sendVoteResp(e.From, types.ReasonError)
|
||||
/**
|
||||
如果学习者收到投票请求,则角色转换为follower
|
||||
TODO:这里逻辑感觉不太严谨,
|
||||
主要解决如下情况:
|
||||
当两个节点时,一个是leader,一个是learner,当learner完成学习后。
|
||||
leader节点会将learner节点的角色转换为follower时,会导致leader自己本身转换成candidate。
|
||||
这样learner同步不到配置日志,导致leader节点认为learner成为了follower,但是实际learner还是learner
|
||||
**/
|
||||
n.BecomeFollower(e.Term, e.From)
|
||||
}
|
||||
|
||||
case types.ApplyResp: // 应用返回
|
||||
if e.Reason == types.ReasonOk {
|
||||
n.queue.appliedTo(e.Index)
|
||||
@@ -84,7 +97,7 @@ func (n *Node) stepLeader(e types.Event) error {
|
||||
case types.Propose: // 提案
|
||||
if n.stopPropose { // 停止提案
|
||||
n.Foucs("stop propose", zap.String("key", n.Key()))
|
||||
return ErrProposalDropped
|
||||
return types.ErrProposalDropped
|
||||
}
|
||||
n.idleTick = 0
|
||||
err := n.queue.append(e.Logs...)
|
||||
@@ -173,7 +186,9 @@ func (n *Node) stepLeader(e types.Event) error {
|
||||
n.stopPropose = false
|
||||
syncInfo := n.replicaSync[e.From]
|
||||
if syncInfo == nil {
|
||||
n.Error("role switch error,syncInfo not exist", zap.Uint64("from", e.From), zap.Uint64("to", e.To), zap.String("type", e.Type.String()))
|
||||
if n.queue.lastLogIndex > 0 {
|
||||
n.Warn("role switch error,syncInfo not exist", zap.Uint64("from", e.From), zap.Uint64("to", e.To), zap.String("type", e.Type.String()))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
n.replicaSync[e.From].roleSwitching = false
|
||||
@@ -298,6 +313,7 @@ func (n *Node) stepLearner(e types.Event) error {
|
||||
if n.cfg.Leader == None {
|
||||
n.BecomeLearner(e.Term, e.From)
|
||||
}
|
||||
n.updateFollowCommittedIndex(e.CommittedIndex) // 更新提交索引
|
||||
n.idleTick = 0
|
||||
// 如果领导的配置版本大于本地配置版本,那么请求配置
|
||||
if e.ConfigVersion > n.cfg.Version {
|
||||
@@ -330,6 +346,7 @@ func (n *Node) stepLearner(e types.Event) error {
|
||||
n.suspend = true
|
||||
}
|
||||
}
|
||||
n.updateFollowCommittedIndex(e.CommittedIndex) // 更新提交索引
|
||||
|
||||
} else if e.Reason == types.ReasonTruncate {
|
||||
// if n.Key() == "clusterconfig" {
|
||||
@@ -529,6 +546,10 @@ func (n *Node) roleSwitchIfNeed(e types.Event) {
|
||||
return
|
||||
}
|
||||
|
||||
if n.cfg.MigrateTo != e.From { // 迁移目标节点不是当前同步节点,才进行进行角色切换
|
||||
return
|
||||
}
|
||||
|
||||
syncInfo := n.replicaSync[e.From]
|
||||
if syncInfo.roleSwitching {
|
||||
return
|
||||
@@ -549,10 +570,15 @@ func (n *Node) roleSwitchIfNeed(e types.Event) {
|
||||
|
||||
} else { // 学习者转追随者
|
||||
// 如果learner的日志已经追上了follower的日志,那么将learner转为follower
|
||||
if e.Index+n.opts.LearnerToFollowerMinLogGap > n.queue.lastLogIndex {
|
||||
if len(n.cfg.Replicas) == 1 && e.Index > n.queue.lastLogIndex { // 如果只有一个副本,则学习者必须完全追上领导者的日志,才能做转换。(因为转换的这个follower会导致重新选举)
|
||||
syncInfo.roleSwitching = true
|
||||
// 发送学习者转为追随者
|
||||
n.sendLearnerToFollowerReq(e.From)
|
||||
} else {
|
||||
if e.Index+n.opts.LearnerToFollowerMinLogGap > n.queue.lastLogIndex {
|
||||
syncInfo.roleSwitching = true
|
||||
// 发送学习者转为追随者
|
||||
n.sendLearnerToFollowerReq(e.From)
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if n.cfg.MigrateFrom == n.cfg.Leader && n.cfg.MigrateTo == e.From { // // 追随者转为领导者
|
||||
|
||||
@@ -116,7 +116,7 @@ func (r *Raft) Step(e types.Event) {
|
||||
func (r *Raft) StepWait(ctx context.Context, e types.Event) error {
|
||||
if r.pause.Load() {
|
||||
r.Info("raft is paused, ignore event", zap.String("event", e.String()))
|
||||
return ErrPaused
|
||||
return types.ErrPaused
|
||||
}
|
||||
// 处理其他副本发过来的提案
|
||||
if e.Type == types.SendPropose {
|
||||
@@ -133,7 +133,7 @@ func (r *Raft) StepWait(ctx context.Context, e types.Event) error {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-r.stopper.ShouldStop():
|
||||
return ErrStopped
|
||||
return types.ErrStopped
|
||||
}
|
||||
|
||||
select {
|
||||
@@ -142,7 +142,7 @@ func (r *Raft) StepWait(ctx context.Context, e types.Event) error {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-r.stopper.ShouldStop():
|
||||
return ErrStopped
|
||||
return types.ErrStopped
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -59,6 +59,11 @@ func (r *Raft) ProposeBatchUntilAppliedTimeout(ctx context.Context, reqs []types
|
||||
err error
|
||||
needWait = true
|
||||
)
|
||||
|
||||
if r.node.LeaderId() == 0 {
|
||||
return nil, types.ErrNotLeader
|
||||
}
|
||||
|
||||
if !r.node.IsLeader() {
|
||||
// 如果不是leader,则转发给leader
|
||||
resps, err = r.fowardPropose(ctx, reqs)
|
||||
@@ -97,7 +102,7 @@ func (r *Raft) ProposeBatchUntilAppliedTimeout(ctx context.Context, reqs []types
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-r.stopper.ShouldStop():
|
||||
return nil, ErrStopped
|
||||
return nil, types.ErrStopped
|
||||
}
|
||||
} else {
|
||||
return resps, nil
|
||||
@@ -261,6 +266,6 @@ func (r *Raft) fowardPropose(ctx context.Context, reqs types.ProposeReqSet) ([]*
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-r.stopper.ShouldStop():
|
||||
return nil, ErrStopped
|
||||
return nil, types.ErrStopped
|
||||
}
|
||||
}
|
||||
|
||||
@@ -293,7 +293,9 @@ func (rg *RaftGroup) handleRoleChangeReq(r IRaft, e types.Event) {
|
||||
|
||||
err = rg.opts.Storage.SaveConfig(r.Key(), newCfg)
|
||||
if err != nil {
|
||||
rg.Error("change role failed", zap.Error(err))
|
||||
if err != types.ErrNotLeader {
|
||||
rg.Error("change role failed", zap.Error(err), zap.String("key", r.Key()), zap.String("cfg", newCfg.String()))
|
||||
}
|
||||
rg.AddEvent(r.Key(), types.Event{
|
||||
Type: respEventType,
|
||||
From: e.From,
|
||||
|
||||
11
pkg/raft/types/errros.go
Normal file
11
pkg/raft/types/errros.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package types
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrLogIndexNotContinuous = errors.New("log index is not continuous")
|
||||
ErrStopped = errors.New("raft is stopped")
|
||||
ErrNotLeader = errors.New("not leader")
|
||||
ErrPaused = errors.New("raft is paused")
|
||||
ErrProposalDropped = errors.New("proposal dropped")
|
||||
)
|
||||
@@ -641,10 +641,20 @@ func IsEmptyLog(v Log) bool {
|
||||
return v.Id == 0 && v.Index == 0 && v.Term == 0 && len(v.Data) == 0
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
## 新节点加入
|
||||
MigrateFrom,MigrateTo的值为新节点的id,并且Learners包含新节点的id,Replicas不包含新节点的id
|
||||
|
||||
## follower -> leader
|
||||
MigrateFrom为leader节点的id,MigrateTo的值为follower的id,Learners不需要包含follower的id
|
||||
|
||||
**/
|
||||
|
||||
type Config struct {
|
||||
MigrateFrom uint64 // 迁移源节点
|
||||
MigrateFrom uint64 // 迁移源节点,如果值为leader的ID,表示 learner -> leader ,如果值为follower的ID,表示learner -> follower
|
||||
MigrateTo uint64 // 迁移目标节点
|
||||
Replicas []uint64 // 副本集合(不包含节点自己)
|
||||
Replicas []uint64 // 副本集合(包含节点自己,leader + follower,不包含learner, learner在Learners字段里)
|
||||
Learners []uint64 // 学习节点集合
|
||||
Role Role // 节点角色
|
||||
Term uint32 // 领导任期
|
||||
|
||||
Reference in New Issue
Block a user