diff --git a/internal/api/server.go b/internal/api/server.go index 2e744df..0e0bd54 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -39,7 +39,11 @@ func (s *Server) Start() error { s.timingWheel.Start() s.apiServer.start() - s.managerServer.start() + + if options.G.Manager.On { + s.managerServer.start() + } + // 判断是否开启迁移任务 if strings.TrimSpace(options.G.OldV1Api) != "" { s.migrateTask.Run() @@ -51,7 +55,9 @@ func (s *Server) Start() error { func (s *Server) Stop() { s.timingWheel.Stop() s.apiServer.stop() - s.managerServer.stop() + if options.G.Manager.On { + s.managerServer.stop() + } } // Schedule 延迟任务 diff --git a/pkg/cluster2/channel/server.go b/pkg/cluster2/channel/server.go index 3b4ddf1..82f2ad4 100644 --- a/pkg/cluster2/channel/server.go +++ b/pkg/cluster2/channel/server.go @@ -3,8 +3,10 @@ package channel import ( "sync" + "github.com/WuKongIM/WuKongIM/pkg/fasthash" "github.com/WuKongIM/WuKongIM/pkg/raft/raftgroup" rafttype "github.com/WuKongIM/WuKongIM/pkg/raft/types" + "github.com/WuKongIM/WuKongIM/pkg/ringlock" "github.com/WuKongIM/WuKongIM/pkg/wkdb" "github.com/WuKongIM/WuKongIM/pkg/wklog" "github.com/WuKongIM/WuKongIM/pkg/wkutil" @@ -23,13 +25,14 @@ type Server struct { channels map[string]bool } - wakeLeaderLock sync.Mutex + wakeLeaderLock *ringlock.RingLock } func NewServer(opts *Options) *Server { s := &Server{ - opts: opts, - Log: wklog.NewWKLog("channel.Server"), + opts: opts, + Log: wklog.NewWKLog("channel.Server"), + wakeLeaderLock: ringlock.NewRingLock(1024), } s.storage = newStorage(opts.DB, s) for i := 0; i < opts.GroupCount; i++ { @@ -61,8 +64,8 @@ func (s *Server) Stop() { // 唤醒频道领导 func (s *Server) WakeLeaderIfNeed(clusterConfig wkdb.ChannelClusterConfig) error { - s.wakeLeaderLock.Lock() - defer s.wakeLeaderLock.Unlock() + s.wakeLeaderLock.Lock(clusterConfig.ChannelId) + defer s.wakeLeaderLock.Unlock(clusterConfig.ChannelId) channelKey := wkutil.ChannelToKey(clusterConfig.ChannelId, clusterConfig.ChannelType) rg := s.getRaftGroup(channelKey) @@ -179,23 +182,10 @@ func (s *Server) AddEvent(channelKey string, e rafttype.Event) { } func (s *Server) getRaftGroup(channelKey string) *raftgroup.RaftGroup { - index := int(fnv32(channelKey) % uint32(s.opts.GroupCount)) + index := int(fasthash.Hash(channelKey) % uint32(s.opts.GroupCount)) return s.raftGroups[index] } -func fnv32(key string) uint32 { - const ( - offset32 = 2166136261 - prime32 = 16777619 - ) - hash := offset32 - for i := 0; i < len(key); i++ { - hash ^= int(key[i]) - hash *= prime32 - } - return uint32(hash) -} - func (s *Server) ChannelCount() int { var count int diff --git a/pkg/cluster2/cluster/iserver.go b/pkg/cluster2/cluster/iserver.go index 349e878..1e03fb3 100644 --- a/pkg/cluster2/cluster/iserver.go +++ b/pkg/cluster2/cluster/iserver.go @@ -32,7 +32,7 @@ func (s *Server) GetOrCreateChannelClusterConfigFromSlotLeader(channelId string, func (s *Server) SlotLeaderOfChannel(channelId string, channelType uint8) (*types.Node, error) { slotId := s.getSlotId(channelId) - slotLeaderId := s.slotServer.SlotLeaderId(slotId) + slotLeaderId := s.cfgServer.SlotLeaderId(slotId) if slotLeaderId == 0 { return nil, fmt.Errorf("slot[%d] leader not found", slotId) } @@ -45,7 +45,7 @@ func (s *Server) SlotLeaderOfChannel(channelId string, channelType uint8) (*type func (s *Server) SlotLeaderIdOfChannel(channelId string, channelType uint8) (nodeID uint64, err error) { slotId := s.getSlotId(channelId) - slotLeaderId := s.slotServer.SlotLeaderId(slotId) + slotLeaderId := s.cfgServer.SlotLeaderId(slotId) if slotLeaderId == 0 { return 0, fmt.Errorf("slot[%d] leader not found", slotId) } @@ -54,7 +54,7 @@ func (s *Server) SlotLeaderIdOfChannel(channelId string, channelType uint8) (nod func (s *Server) LoadOnlyChannelClusterConfig(channelId string, channelType uint8) (wkdb.ChannelClusterConfig, error) { slotId := s.getSlotId(channelId) - slotLeaderId := s.slotServer.SlotLeaderId(slotId) + slotLeaderId := s.cfgServer.SlotLeaderId(slotId) if slotLeaderId == 0 { return wkdb.EmptyChannelClusterConfig, fmt.Errorf("slot[%d] leader not found", slotId) } @@ -106,7 +106,7 @@ func (s *Server) GetSlotId(v string) uint32 { } func (s *Server) SlotLeaderId(slotId uint32) uint64 { - return s.slotServer.SlotLeaderId(slotId) + return s.cfgServer.SlotLeaderId(slotId) } func (s *Server) NodeIsOnline(nodeId uint64) bool { diff --git a/pkg/cluster2/cluster/rpc_client.go b/pkg/cluster2/cluster/rpc_client.go index 9f7ce83..19636b1 100644 --- a/pkg/cluster2/cluster/rpc_client.go +++ b/pkg/cluster2/cluster/rpc_client.go @@ -151,6 +151,23 @@ func (r *rpcClient) RequestChannelLastLogInfo(nodeId uint64, channelId string, c return resp, nil } +// 节点请求加入 +func (r *rpcClient) RequestClusterJoin(nodeId uint64, req *ClusterJoinReq) (*ClusterJoinResp, error) { + data, err := req.Marshal() + if err != nil { + return nil, err + } + body, err := r.request(nodeId, "/rpc/cluster/join", data) + if err != nil { + return nil, err + } + resp := &ClusterJoinResp{} + if err := resp.Unmarshal(body); err != nil { + return nil, err + } + return resp, nil +} + func (r *rpcClient) request(nodeId uint64, path string, body []byte) ([]byte, error) { node := r.s.nodeManager.node(nodeId) diff --git a/pkg/cluster2/cluster/rpc_server.go b/pkg/cluster2/cluster/rpc_server.go index 709b63e..02116a7 100644 --- a/pkg/cluster2/cluster/rpc_server.go +++ b/pkg/cluster2/cluster/rpc_server.go @@ -4,7 +4,8 @@ import ( "context" "time" - "github.com/WuKongIM/WuKongIM/pkg/raft/types" + "github.com/WuKongIM/WuKongIM/pkg/cluster2/node/types" + rafttypes "github.com/WuKongIM/WuKongIM/pkg/raft/types" "github.com/WuKongIM/WuKongIM/pkg/wkdb" "github.com/WuKongIM/WuKongIM/pkg/wklog" "github.com/WuKongIM/WuKongIM/pkg/wkserver" @@ -44,6 +45,9 @@ func (r *rpcServer) setRoutes() { // 获取频道最新日志信息 r.s.netServer.Route("/rpc/channel/lastLogInfo", r.handleChannelLastLogInfo) + + // 节点加入 + r.s.netServer.Route("/rpc/cluster/join", r.handleClusterJoin) } func (r *rpcServer) handleChannelPropose(c *wkserver.Context) { @@ -164,7 +168,7 @@ func (r *rpcServer) slotInfos(slotIds []uint32) ([]SlotInfo, error) { type channelProposeReq struct { channelId string channelType uint8 - reqs types.ProposeReqSet + reqs rafttypes.ProposeReqSet } func (ch *channelProposeReq) encode() ([]byte, error) { @@ -280,3 +284,69 @@ func (s *Server) getChannelLastLogInfo(channelId string, channelType uint8) (*Ch } return resp, nil } + +func (r *rpcServer) handleClusterJoin(c *wkserver.Context) { + req := &ClusterJoinReq{} + if err := req.Unmarshal(c.Body()); err != nil { + r.Error("unmarshal ClusterJoinReq failed", zap.Error(err)) + c.WriteErr(err) + return + } + + if !r.s.cfgServer.IsLeader() { + resp, err := r.s.rpcClient.RequestClusterJoin(r.s.cfgServer.LeaderId(), req) + if err != nil { + r.Error("requestClusterJoin failed", zap.Error(err)) + c.WriteErr(err) + return + } + data, err := resp.Marshal() + if err != nil { + r.Error("marshal ClusterJoinResp failed", zap.Error(err)) + c.WriteErr(err) + return + } + c.Write(data) + return + } + + allowVote := false + if req.Role == types.NodeRole_NodeRoleReplica { + allowVote = true + } + + resp := ClusterJoinResp{} + + nodeInfos := make([]*NodeInfo, 0, len(r.s.cfgServer.Nodes())) + for _, node := range r.s.cfgServer.Nodes() { + nodeInfos = append(nodeInfos, &NodeInfo{ + NodeId: node.Id, + ServerAddr: node.ClusterAddr, + }) + } + resp.Nodes = nodeInfos + + err := r.s.cfgServer.ProposeJoin(&types.Node{ + Id: req.NodeId, + ClusterAddr: req.ServerAddr, + Join: true, + Online: true, + Role: req.Role, + AllowVote: allowVote, + CreatedAt: time.Now().Unix(), + Status: types.NodeStatus_NodeStatusWillJoin, + }) + if err != nil { + r.Error("proposeJoin failed", zap.Error(err)) + c.WriteErr(err) + return + } + + result, err := resp.Marshal() + if err != nil { + r.Error("marshal ClusterJoinResp failed", zap.Error(err)) + c.WriteErr(err) + return + } + c.Write(result) +} diff --git a/pkg/cluster2/cluster/server.go b/pkg/cluster2/cluster/server.go index 0fb9613..77fffff 100644 --- a/pkg/cluster2/cluster/server.go +++ b/pkg/cluster2/cluster/server.go @@ -6,6 +6,8 @@ import ( "fmt" "path" "strconv" + "strings" + "sync" "time" "github.com/WuKongIM/WuKongIM/pkg/cluster2/channel" @@ -21,6 +23,7 @@ import ( "github.com/WuKongIM/WuKongIM/pkg/wkserver" "github.com/WuKongIM/WuKongIM/pkg/wkserver/proto" "github.com/WuKongIM/WuKongIM/pkg/wkutil" + "github.com/lni/goutils/syncutil" "github.com/panjf2000/gnet/v2" "go.uber.org/zap" ) @@ -53,6 +56,9 @@ type Server struct { cancelFnc context.CancelFunc onMessageFnc func(fromNodeId uint64, msg *proto.Message) uptime time.Time // 服务启动时间 + stopper *syncutil.Stopper + + sync.Mutex } func New(opts *Options) *Server { @@ -61,6 +67,7 @@ func New(opts *Options) *Server { nodeManager: newNodeManager(opts), Log: wklog.NewWKLog("cluster"), uptime: time.Now(), + stopper: syncutil.NewStopper(), } s.cancelCtx, s.cancelFnc = context.WithCancel(context.Background()) // 初始化传输层 @@ -181,7 +188,18 @@ func (s *Server) Start() error { // 添加或更新节点 cfg := s.cfgServer.GetClusterConfig() if len(cfg.Nodes) == 0 { - s.addOrUpdateNodes(s.opts.ConfigOptions.InitNodes) + if len(s.opts.ConfigOptions.InitNodes) > 0 { + s.addOrUpdateNodes(s.opts.ConfigOptions.InitNodes) + } else if strings.TrimSpace(s.opts.Seed) != "" { + nodeMap := make(map[uint64]string) + seedNodeId, addr, err := seedNode(s.opts.Seed) + if err != nil { + return err + } + nodeMap[seedNodeId] = addr + s.addOrUpdateNodes(nodeMap) + } + } else { nodeMap := make(map[uint64]string) for _, node := range cfg.Nodes { @@ -190,6 +208,15 @@ func (s *Server) Start() error { s.addOrUpdateNodes(nodeMap) } + // 如果有新加入的节点 则执行加入逻辑 + join, err := s.needJoin() + if err != nil { + return err + } + if join { // 需要加入集群 + s.stopper.RunWorker(s.joinLoop) + } + return nil } @@ -248,6 +275,8 @@ func (s *Server) slotApplyLogs(slotId uint32, logs []rafttype.Log) error { } func (s *Server) addOrUpdateNodes(nodeMap map[uint64]string) { + s.Lock() + defer s.Unlock() if len(nodeMap) == 0 { return diff --git a/pkg/cluster2/cluster/server_join.go b/pkg/cluster2/cluster/server_join.go new file mode 100644 index 0000000..f49ce5a --- /dev/null +++ b/pkg/cluster2/cluster/server_join.go @@ -0,0 +1,63 @@ +package cluster + +import ( + "errors" + "strconv" + "strings" + "time" + + "go.uber.org/zap" +) + +func (s *Server) joinLoop() { + seedNodeId, _, _ := seedNode(s.opts.Seed) + req := &ClusterJoinReq{ + NodeId: s.opts.ConfigOptions.NodeId, + ServerAddr: s.opts.ServerAddr, + Role: s.opts.Role, + } + for { + select { + case <-time.After(time.Second * 2): + resp, err := s.rpcClient.RequestClusterJoin(seedNodeId, req) + if err != nil { + s.Error("requestClusterJoin failed", zap.Error(err), zap.Uint64("seedNodeId", seedNodeId)) + continue + } + if len(resp.Nodes) > 0 { + nodeMap := make(map[uint64]string) + for _, n := range resp.Nodes { + nodeMap[n.NodeId] = n.ServerAddr + } + s.addOrUpdateNodes(nodeMap) + } + return + case <-s.stopper.ShouldStop(): + return + } + } +} + +// 是否需要加入集群 +func (s *Server) needJoin() (bool, error) { + if strings.TrimSpace(s.opts.Seed) == "" { + return false, nil + } + seedNodeId, _, err := seedNode(s.opts.Seed) + seedNode := s.cfgServer.Node(seedNodeId) + return seedNode == nil, err +} + +func seedNode(seed string) (uint64, string, error) { + seedArray := strings.Split(seed, "@") + if len(seedArray) < 2 { + return 0, "", errors.New("seed format error") + } + seedNodeIDStr := seedArray[0] + seedAddr := seedArray[1] + seedNodeID, err := strconv.ParseUint(seedNodeIDStr, 10, 64) + if err != nil { + return 0, "", err + } + return seedNodeID, seedAddr, nil +} diff --git a/pkg/cluster2/node/clusterconfig/config.go b/pkg/cluster2/node/clusterconfig/config.go index 58a9fb8..d44f110 100644 --- a/pkg/cluster2/node/clusterconfig/config.go +++ b/pkg/cluster2/node/clusterconfig/config.go @@ -207,6 +207,10 @@ func (c *Config) updateNodeJoining(nodeId uint64) { } } c.cfg.Learners = wkutil.RemoveUint64(c.cfg.Learners, nodeId) + if c.cfg.MigrateTo == nodeId { + c.cfg.MigrateTo = 0 + c.cfg.MigrateFrom = 0 + } } func (c *Config) updateNodeJoined(nodeId uint64, slots []*types.Slot) { @@ -431,6 +435,18 @@ func (c *Config) hasWillJoinNode() bool { return false } +func (c *Config) willJoinNodes() []*types.Node { + c.mu.RLock() + defer c.mu.RUnlock() + var nodes []*types.Node + for _, n := range c.cfg.Nodes { + if n.Status == types.NodeStatus_NodeStatusWillJoin { + nodes = append(nodes, n) + } + } + return nodes +} + func (c *Config) saveConfig() error { c.mu.Lock() defer c.mu.Unlock() diff --git a/pkg/cluster2/node/clusterconfig/options.go b/pkg/cluster2/node/clusterconfig/options.go index c249138..b6d92f3 100644 --- a/pkg/cluster2/node/clusterconfig/options.go +++ b/pkg/cluster2/node/clusterconfig/options.go @@ -23,6 +23,8 @@ type Options struct { PongMaxTick int // pongMaxTick 节点超过多少tick没有回应心跳就认为是掉线 + // Seed 种子节点,可以引导新节点加入集群 格式:nodeId@ip:port (nodeId为种子节点的nodeId) + Seed string } func NewOptions(opts ...Option) *Options { @@ -37,6 +39,7 @@ func NewOptions(opts ...Option) *Options { for _, opt := range opts { opt(o) } + return o } @@ -100,3 +103,9 @@ func WithPongMaxTick(pongMaxTick int) Option { o.PongMaxTick = pongMaxTick } } + +func WithSeed(seed string) Option { + return func(o *Options) { + o.Seed = seed + } +} diff --git a/pkg/cluster2/node/clusterconfig/server.go b/pkg/cluster2/node/clusterconfig/server.go index db45e47..5ccccd3 100644 --- a/pkg/cluster2/node/clusterconfig/server.go +++ b/pkg/cluster2/node/clusterconfig/server.go @@ -15,18 +15,13 @@ import ( ) type Server struct { - opts *Options - raft *raft.Raft - - config *Config // 分布式配置对象 - // 配置日志存储 - storage *PebbleShardLogStorage - - cfgGenId *snowflake.Node - + opts *Options + raft *raft.Raft // raft算法 + config *Config // 分布式配置对象 + storage *PebbleShardLogStorage // 配置日志存储 + cfgGenId *snowflake.Node // 配置ID生成器 + listeners []IEvent // 事件监听器 wklog.Log - - listeners []IEvent } func New(opts *Options) *Server { @@ -89,8 +84,13 @@ func (s *Server) initRaft() error { raftConfig = configToRaftConfig(s.config) } else { replicas := make([]uint64, 0, len(s.opts.InitNodes)) - for nodeId := range s.opts.InitNodes { - replicas = append(replicas, nodeId) + + if len(s.opts.InitNodes) > 0 { + for nodeId := range s.opts.InitNodes { + replicas = append(replicas, nodeId) + } + } else { + replicas = append(replicas, s.opts.NodeId) } raftConfig = rafttypes.Config{ Replicas: replicas, @@ -165,6 +165,17 @@ func (s *Server) StepRaftEvent(e rafttypes.Event) { s.raft.Step(e) } +func (s *Server) switchConfig(cfg *Config) { + if s.raft == nil { + return + } + + s.raft.Step(rafttypes.Event{ + Type: rafttypes.ConfChange, + Config: configToRaftConfig(cfg), + }) +} + func (s *Server) IsLeader() bool { if s.raft == nil { return false @@ -210,6 +221,14 @@ func (s *Server) Slot(id uint32) *pb.Slot { return s.config.slot(id) } +func (s *Server) SlotLeaderId(id uint32) uint64 { + st := s.config.slot(id) + if st == nil { + return 0 + } + return st.Leader +} + // AllowVoteNodes 获取允许投票的节点 func (s *Server) AllowVoteNodes() []*pb.Node { return s.config.allowVoteNodes() @@ -241,6 +260,11 @@ func (s *Server) NodeIsOnline(nodeId uint64) bool { return s.config.nodeOnline(nodeId) } +// 获取槽的副本数量 +func (s *Server) SlotReplicaCount() uint32 { + return s.config.slotReplicaCount() +} + // NodeConfigVersionFromLeader 获取节点的配置版本(只有主节点才有这个信息) func (s *Server) NodeConfigVersionFromLeader(nodeId uint64) uint64 { return s.raft.GetReplicaLastLogIndex(nodeId) diff --git a/pkg/cluster2/node/clusterconfig/server_apply.go b/pkg/cluster2/node/clusterconfig/server_apply.go index d6b52a3..6a3db63 100644 --- a/pkg/cluster2/node/clusterconfig/server_apply.go +++ b/pkg/cluster2/node/clusterconfig/server_apply.go @@ -1,8 +1,11 @@ package clusterconfig import ( + "encoding/binary" + pb "github.com/WuKongIM/WuKongIM/pkg/cluster2/node/types" "github.com/WuKongIM/WuKongIM/pkg/raft/types" + "github.com/WuKongIM/WuKongIM/pkg/wkutil" "go.uber.org/zap" ) @@ -52,6 +55,12 @@ func (s *Server) handleCmd(cmd *CMD) error { return s.handleNodeOnlineStatusChange(cmd) case CMDTypeSlotUpdate: // 槽更新 return s.handleSlotUpdate(cmd) + case CMDTypeNodeJoin: // 节点加入 + return s.handleNodeJoin(cmd) + case CMDTypeNodeJoining: // 节点加入中 + return s.handleNodeJoining(cmd) + case CMDTypeNodeJoined: // 节点加入完成 + return s.handleNodeJoined(cmd) } return nil } @@ -100,3 +109,42 @@ func (s *Server) handleSlotUpdate(cmd *CMD) error { return nil } + +func (s *Server) handleNodeJoin(cmd *CMD) error { + + newNode := &pb.Node{} + err := newNode.Unmarshal(cmd.Data) + if err != nil { + s.Error("unmarshal node err", zap.Error(err)) + return err + } + s.config.addOrUpdateNode(newNode) + + // 将新节点加入学习者列表 + if !wkutil.ArrayContainsUint64(s.config.cfg.Learners, newNode.Id) { + s.config.cfg.Learners = append(s.config.cfg.Learners, newNode.Id) + // 如果是新加入的节点,就是从自己迁移到自己 + s.config.cfg.MigrateFrom = newNode.Id + s.config.cfg.MigrateTo = newNode.Id + } + s.switchConfig(s.config) + return nil +} + +func (s *Server) handleNodeJoining(cmd *CMD) error { + nodeId := binary.BigEndian.Uint64(cmd.Data) + s.config.updateNodeJoining(nodeId) + s.switchConfig(s.config) + return nil +} + +func (s *Server) handleNodeJoined(cmd *CMD) error { + nodeId, slots, err := DecodeNodeJoined(cmd.Data) + if err != nil { + s.Error("decode node joined err", zap.Error(err)) + return err + } + s.config.updateNodeJoined(nodeId, slots) + s.switchConfig(s.config) + return nil +} diff --git a/pkg/cluster2/node/clusterconfig/server_propose.go b/pkg/cluster2/node/clusterconfig/server_propose.go index 9687f1b..1a6bf27 100644 --- a/pkg/cluster2/node/clusterconfig/server_propose.go +++ b/pkg/cluster2/node/clusterconfig/server_propose.go @@ -1,6 +1,8 @@ package clusterconfig import ( + "encoding/binary" + pb "github.com/WuKongIM/WuKongIM/pkg/cluster2/node/types" "go.uber.org/zap" ) @@ -86,3 +88,71 @@ func (s *Server) ProposeSlots(slots []*pb.Slot) error { } return nil } + +// ProposeJoined 提案节点加入 +func (s *Server) ProposeJoined(nodeId uint64, slots []*pb.Slot) error { + + data, err := EncodeNodeJoined(nodeId, slots) + if err != nil { + return err + } + + cmd := NewCMD(CMDTypeNodeJoined, data) + cmdBytes, err := cmd.Marshal() + if err != nil { + return err + } + + logId := s.genConfigId() + _, err = s.ProposeUntilApplied(logId, cmdBytes) + if err != nil { + s.Error("ProposeJoined failed", zap.Error(err)) + return err + } + return nil +} + +func (s *Server) ProposeJoining(nodeId uint64) error { + + nodeIdBytes := make([]byte, 8) + binary.BigEndian.PutUint64(nodeIdBytes, nodeId) + + cmd := NewCMD(CMDTypeNodeJoining, nodeIdBytes) + cmdBytes, err := cmd.Marshal() + if err != nil { + s.Error("ProposeJoining cmd marshal failed", zap.Error(err)) + return err + } + + logId := s.genConfigId() + _, err = s.ProposeUntilApplied(logId, cmdBytes) + if err != nil { + s.Error("ProposeJoining failed", zap.Error(err)) + return err + } + return nil + +} + +// ProposeJoin 提案节点加入 +func (s *Server) ProposeJoin(node *pb.Node) error { + + data, err := node.Marshal() + if err != nil { + return err + } + + cmd := NewCMD(CMDTypeNodeJoin, data) + cmdBytes, err := cmd.Marshal() + if err != nil { + return err + } + + logId := s.genConfigId() + _, err = s.ProposeUntilApplied(logId, cmdBytes) + if err != nil { + s.Error("ProposeJoin failed", zap.Error(err)) + return err + } + return nil +} diff --git a/pkg/cluster2/node/clusterconfig/storage.go b/pkg/cluster2/node/clusterconfig/storage.go index bf5d5dd..ee5c724 100644 --- a/pkg/cluster2/node/clusterconfig/storage.go +++ b/pkg/cluster2/node/clusterconfig/storage.go @@ -9,6 +9,7 @@ import ( "github.com/WuKongIM/WuKongIM/pkg/cluster2/node/clusterconfig/key" "github.com/WuKongIM/WuKongIM/pkg/raft/types" "github.com/WuKongIM/WuKongIM/pkg/wklog" + "github.com/WuKongIM/WuKongIM/pkg/wkutil" "github.com/cockroachdb/pebble" "go.uber.org/zap" ) @@ -105,6 +106,26 @@ func (p *PebbleShardLogStorage) Apply(logs []types.Log) error { return p.setAppliedIndex(logs[len(logs)-1].Index) } +func (p *PebbleShardLogStorage) SaveConfig(cfg types.Config) error { + if !p.s.config.hasWillJoinNode() { + p.Info("not has will join node", zap.String("cfg", cfg.String())) + return nil + } + + willNodes := p.s.config.willJoinNodes() + + for _, willNode := range willNodes { + if wkutil.ArrayContainsUint64(cfg.Replicas, willNode.Id) { + err := p.s.ProposeJoining(willNode.Id) + if err != nil { + return err + } + } + } + + return nil +} + // TruncateLogTo 截断日志 func (p *PebbleShardLogStorage) TruncateLogTo(index uint64) error { if index == 0 { diff --git a/pkg/cluster2/node/event/handle_node_joining.go b/pkg/cluster2/node/event/handle_node_joining.go new file mode 100644 index 0000000..e81bfc7 --- /dev/null +++ b/pkg/cluster2/node/event/handle_node_joining.go @@ -0,0 +1,149 @@ +package event + +import ( + "github.com/WuKongIM/WuKongIM/pkg/cluster2/node/types" + "github.com/WuKongIM/WuKongIM/pkg/wkutil" +) + +func (s *Server) handleNodeJoining() error { + slots := s.cfgServer.Slots() + if len(slots) == 0 { + return nil + } + var joiningNode *types.Node + for _, node := range s.cfgServer.Nodes() { + if node.Status == types.NodeStatus_NodeStatusJoining { + joiningNode = node + break + } + } + + if joiningNode == nil { + return nil + } + + firstSlot := slots[0] + + var migrateSlots []*types.Slot // 迁移的槽列表 + + voteNodes := s.cfgServer.AllowVoteNodes() + + if uint32(len(firstSlot.Replicas)) < s.cfgServer.SlotReplicaCount() { // 如果当前槽的副本数量小于配置的副本数量,则可以将新节点直接加入到学习节点中 + for _, slot := range slots { + newSlot := slot.Clone() + newSlot.MigrateFrom = joiningNode.Id + newSlot.MigrateTo = joiningNode.Id + newSlot.Learners = append(slot.Learners, joiningNode.Id) + migrateSlots = append(migrateSlots, newSlot) + } + } else { + voteNodeCount := uint32(len(voteNodes)) // 投票节点数量 + avgSlotCount := uint32(len(slots)) * s.cfgServer.SlotReplicaCount() / voteNodeCount // 平均每个节点的槽数量 + remainSlotCount := uint32(len(slots)) * s.cfgServer.SlotReplicaCount() % voteNodeCount // 剩余的槽数量 + if remainSlotCount > 0 { + avgSlotCount += 1 + } + + avgSlotLeaderCount := uint32(len(slots)) / voteNodeCount // 平均每个节点的槽领导数量 + remainSlotLeaderCount := uint32(len(slots)) % voteNodeCount // 剩余的槽领导数量 + if remainSlotLeaderCount > 0 { + avgSlotLeaderCount += 1 + } + + nodeSlotLeaderCountMap := make(map[uint64]uint32) // 每个节点的槽领导数量 + nodeSlotCountMap := make(map[uint64]uint32) // 每个节点目前的槽数量 + nodeSlotCountMap[joiningNode.Id] = 0 // 新节点槽数量肯定为0 + for _, slot := range slots { + nodeSlotLeaderCountMap[slot.Leader]++ + for _, replicaId := range slot.Replicas { + nodeSlotCountMap[replicaId]++ + } + } + + // 计算每个节点应该迁入/迁出的槽数量 + migrateToCountMap := make(map[uint64]uint32) // 每个节点应该迁入的槽数量 + migrateFromCountMap := make(map[uint64]uint32) // 每个节点应该迁出的槽数量 + migrateSlotLeaderFromCountMap := make(map[uint64]uint32) // 每个节点应该迁出的槽领导数量 + for nodeId, slotCount := range nodeSlotCountMap { + if slotCount < avgSlotCount { + migrateToCountMap[nodeId] = avgSlotCount - slotCount + } else if slotCount > avgSlotCount { + migrateFromCountMap[nodeId] = slotCount - avgSlotCount + } + } + + for nodeId, slotLeaderCount := range nodeSlotLeaderCountMap { + if slotLeaderCount > avgSlotLeaderCount { + migrateSlotLeaderFromCountMap[nodeId] = slotLeaderCount - avgSlotLeaderCount + } + } + + for _, node := range voteNodes { + fromSlotCount := migrateFromCountMap[node.Id] + fromSlotLeaderCount := migrateSlotLeaderFromCountMap[node.Id] + + if fromSlotCount == 0 { // 已分配完毕 + continue + } + + for _, slot := range slots { + exist := false // 是否存在迁移,如果存在则忽略 + + for _, migrateSlot := range migrateSlots { + if slot.Id == migrateSlot.Id { + exist = true + break + } + } + + if exist { // 存在迁移则忽略 + continue + } + + // ------------------- 分配槽领导 ------------------- + allocSlotLeader := false // 是否已经分配完槽领导 + if fromSlotCount > 0 && fromSlotLeaderCount > 0 && slot.Leader == node.Id { + + allocSlotLeader = true + newSlot := slot.Clone() + newSlot.MigrateFrom = slot.Leader + newSlot.MigrateTo = joiningNode.Id + newSlot.Learners = append(newSlot.Learners, joiningNode.Id) + migrateSlots = append(migrateSlots, newSlot) + fromSlotLeaderCount-- + nodeSlotLeaderCountMap[node.Id] = fromSlotLeaderCount + + fromSlotCount-- + migrateFromCountMap[node.Id] = fromSlotCount + + } + + // ------------------- 分配槽副本 ------------------- + if fromSlotCount > 0 && !allocSlotLeader { + if wkutil.ArrayContainsUint64(slot.Replicas, node.Id) { + newSlot := slot.Clone() + newSlot.MigrateFrom = node.Id + newSlot.MigrateTo = joiningNode.Id + newSlot.Learners = append(newSlot.Learners, joiningNode.Id) + migrateSlots = append(migrateSlots, newSlot) + fromSlotCount-- + migrateFromCountMap[node.Id] = fromSlotCount + } + } + + } + + } + + } + + if len(migrateSlots) > 0 { + err := s.cfgServer.ProposeJoined(joiningNode.Id, migrateSlots) + if err != nil { + return err + } + } + + return nil + +} diff --git a/pkg/cluster2/node/event/handler_clusterinit.go b/pkg/cluster2/node/event/handler_clusterinit.go index 589e0b6..f067959 100644 --- a/pkg/cluster2/node/event/handler_clusterinit.go +++ b/pkg/cluster2/node/event/handler_clusterinit.go @@ -20,23 +20,38 @@ func (h *handler) handleClusterInit() { opts := h.cfgOptions var replicas []uint64 - for nodeId, addr := range opts.InitNodes { - apiAddr := "" - if nodeId == opts.NodeId { - apiAddr = opts.ApiServerAddr + + if len(opts.InitNodes) > 0 { + for nodeId, addr := range opts.InitNodes { + apiAddr := "" + if nodeId == opts.NodeId { + apiAddr = opts.ApiServerAddr + } + nodes = append(nodes, &types.Node{ + Id: nodeId, + ClusterAddr: addr, + ApiServerAddr: apiAddr, + Online: true, + AllowVote: true, + Role: types.NodeRole_NodeRoleReplica, + Status: types.NodeStatus_NodeStatusJoined, + CreatedAt: time.Now().Unix(), + }) + replicas = append(replicas, nodeId) } + } else { // 没有initNodes,则认为是单节点模式 nodes = append(nodes, &types.Node{ - Id: nodeId, - ClusterAddr: addr, - ApiServerAddr: apiAddr, + Id: opts.NodeId, + ApiServerAddr: opts.ApiServerAddr, Online: true, AllowVote: true, Role: types.NodeRole_NodeRoleReplica, Status: types.NodeStatus_NodeStatusJoined, CreatedAt: time.Now().Unix(), }) - replicas = append(replicas, nodeId) + replicas = append(replicas, opts.NodeId) } + cfg.Nodes = nodes if len(replicas) > 0 { diff --git a/pkg/cluster2/node/event/server.go b/pkg/cluster2/node/event/server.go index 19e3231..2e648b5 100644 --- a/pkg/cluster2/node/event/server.go +++ b/pkg/cluster2/node/event/server.go @@ -95,8 +95,16 @@ func (s *Server) handleEvents() { s.handler.handleCompare() if s.cfgServer.IsLeader() { + + // 处理节点加入 + err := s.handleNodeJoining() + if err != nil { + s.Error("handleJoinNode failed", zap.Error(err)) + return + } + // 检查和均衡槽领导 - err := s.handleSlotLeaderAutoBalance() + err = s.handleSlotLeaderAutoBalance() if err != nil { s.Error("handleSlotLeaderAutoBalance failed", zap.Error(err)) return diff --git a/pkg/cluster2/slot/event.go b/pkg/cluster2/slot/event.go index 5c2c5fc..7720e16 100644 --- a/pkg/cluster2/slot/event.go +++ b/pkg/cluster2/slot/event.go @@ -29,6 +29,15 @@ func (s *Server) AddOrUpdateSlotRaft(slot *types.Slot) { shardNo := SlotIdToKey(slot.Id) rft := s.raftGroup.GetRaft(shardNo) + + // 如果当前节点不是slot的replica或者learner则不处理 + if !wkutil.ArrayContainsUint64(slot.Replicas, s.opts.NodeId) && !wkutil.ArrayContainsUint64(slot.Learners, s.opts.NodeId) { + if rft != nil { + s.raftGroup.RemoveRaft(rft) + } + return + } + if rft == nil { // 添加slot的raft slotNode := newSlot(slot, s) s.raftGroup.AddRaft(slotNode) diff --git a/pkg/fasthash/fasthash.go b/pkg/fasthash/fasthash.go new file mode 100644 index 0000000..4dd93c8 --- /dev/null +++ b/pkg/fasthash/fasthash.go @@ -0,0 +1,14 @@ +package fasthash + +func Hash(key string) uint32 { + const ( + offset32 = 2166136261 + prime32 = 16777619 + ) + hash := offset32 + for i := 0; i < len(key); i++ { + hash ^= int(key[i]) + hash *= prime32 + } + return uint32(hash) +} diff --git a/pkg/raft/raft/raft.go b/pkg/raft/raft/raft.go index 82ea65f..3e0ff25 100644 --- a/pkg/raft/raft/raft.go +++ b/pkg/raft/raft/raft.go @@ -2,6 +2,7 @@ package raft import ( "context" + "errors" "fmt" "sync" "time" @@ -9,6 +10,7 @@ import ( "github.com/WuKongIM/WuKongIM/pkg/raft/types" wt "github.com/WuKongIM/WuKongIM/pkg/wait" "github.com/WuKongIM/WuKongIM/pkg/wklog" + "github.com/WuKongIM/WuKongIM/pkg/wkutil" "github.com/lni/goutils/syncutil" "github.com/panjf2000/ants/v2" "go.uber.org/atomic" @@ -214,18 +216,24 @@ func (r *Raft) readyEvents() { events := r.node.Ready() for _, e := range events { switch e.Type { - case types.StoreReq: + case types.StoreReq: // 处理存储请求 r.handleStoreReq(e) continue - case types.GetLogsReq: + case types.GetLogsReq: // 处理获取日志请求 r.handleGetLogsReq(e) continue case types.TruncateReq: // 截断请求 r.handleTruncateReq(e) continue - case types.ApplyReq: + case types.ApplyReq: // 处理应用请求 r.handleApplyReq(e) continue + // 角色转换 + case types.LearnerToFollowerReq, + types.LearnerToLeaderReq, + types.FollowerToLeaderReq: + r.handleRoleChangeReq(e) + continue } if e.To == None { @@ -469,3 +477,127 @@ func (r *Raft) advance() { default: } } + +func (r *Raft) handleRoleChangeReq(e types.Event) { + + err := r.pool.Submit(func() { + + var ( + newCfg types.Config + err error + respEventType types.EventType + ) + switch e.Type { + case types.LearnerToLeaderReq, + types.LearnerToFollowerReq: + newCfg, err = r.learnTo(e.From) + if err != nil { + r.Error("learn switch failed", zap.Error(err), zap.String("type", e.Type.String()), zap.String("cfg", r.node.Config().String())) + } + if e.Type == types.LearnerToFollowerReq { + respEventType = types.LearnerToFollowerResp + } else if e.Type == types.LearnerToLeaderReq { + respEventType = types.LearnerToLeaderResp + } + case types.FollowerToLeaderReq: + newCfg, err = r.followerToLeader(e.From) + if err != nil { + r.Error("follower switch to leader failed", zap.Error(err), zap.String("cfg", r.node.Config().String())) + } + respEventType = types.FollowerToLeaderResp + default: + err = errors.New("unknown role switch") + } + + if err != nil { + + r.stepC <- stepReq{event: types.Event{ + Type: respEventType, + From: e.From, + Reason: types.ReasonError, + }} + return + } + + err = r.opts.Storage.SaveConfig(newCfg) + if err != nil { + r.Error("change role failed", zap.Error(err)) + r.stepC <- stepReq{event: types.Event{ + Type: respEventType, + From: e.From, + Reason: types.ReasonError, + }} + return + } + r.stepC <- stepReq{event: types.Event{ + Type: respEventType, + From: e.From, + Reason: types.ReasonOk, + Config: newCfg, + }} + }) + if err != nil { + r.Error("submit role change req failed", zap.Error(err)) + + var respEventType types.EventType + + switch e.Type { + case types.LearnerToLeaderReq: + respEventType = types.LearnerToLeaderResp + case types.LearnerToFollowerReq: + respEventType = types.LearnerToFollowerResp + case types.FollowerToLeaderReq: + respEventType = types.FollowerToLeaderResp + } + r.stepC <- stepReq{event: types.Event{ + Type: respEventType, + From: e.From, + Reason: types.ReasonError, + }} + } +} + +// 学习者转换 +func (r *Raft) learnTo(learnerId uint64) (types.Config, error) { + cfg := r.node.Config().Clone() + + if learnerId != cfg.MigrateTo { + r.Warn("learnerId not equal migrateTo", zap.Uint64("learnerId", learnerId), zap.Uint64("migrateTo", cfg.MigrateTo)) + return types.Config{}, errors.New("learnerId not equal migrateTo") + } + + cfg.Learners = wkutil.RemoveUint64(cfg.Learners, learnerId) + + if !wkutil.ArrayContainsUint64(cfg.Replicas, cfg.MigrateTo) { + cfg.Replicas = append(cfg.Replicas, cfg.MigrateTo) + } + + if cfg.MigrateFrom != cfg.MigrateTo { + cfg.Replicas = wkutil.RemoveUint64(cfg.Replicas, cfg.MigrateFrom) + } + + if cfg.MigrateFrom == r.LeaderId() { // 学习者转领导 + cfg.Leader = cfg.MigrateTo + } + cfg.MigrateFrom = 0 + cfg.MigrateTo = 0 + + return cfg, nil +} + +// follower转换成leader +func (r *Raft) followerToLeader(followerId uint64) (types.Config, error) { + cfg := r.node.Config().Clone() + if !wkutil.ArrayContainsUint64(cfg.Replicas, followerId) { + r.Error("followerToLeader: follower not in replicas", zap.Uint64("followerId", followerId)) + return types.Config{}, fmt.Errorf("follower not in replicas") + } + + cfg.Leader = followerId + cfg.Term = cfg.Term + 1 + cfg.MigrateFrom = 0 + cfg.MigrateTo = 0 + + return cfg, nil + +} diff --git a/pkg/raft/raft/raft_test.go b/pkg/raft/raft/raft_test.go index ee73e58..0bb5c9b 100644 --- a/pkg/raft/raft/raft_test.go +++ b/pkg/raft/raft/raft_test.go @@ -415,6 +415,10 @@ func (s *testStorage) Apply(logs []types.Log) error { return nil } +func (s *testStorage) SaveConfig(cfg types.Config) error { + return nil +} + // 等到某个节点成为领导者 func waitBecomeLeader(rr ...*raft.Raft) { for { diff --git a/pkg/raft/raft/storage.go b/pkg/raft/raft/storage.go index 368b38a..568c8a3 100644 --- a/pkg/raft/raft/storage.go +++ b/pkg/raft/raft/storage.go @@ -17,4 +17,6 @@ type Storage interface { DeleteLeaderTermStartIndexGreaterThanTerm(term uint32) error // Apply 应用日志 Apply(logs []types.Log) error + // SaveConfig 保存配置 + SaveConfig(cfg types.Config) error } diff --git a/pkg/raft/types/types.go b/pkg/raft/types/types.go index 92914f9..1dfbe32 100644 --- a/pkg/raft/types/types.go +++ b/pkg/raft/types/types.go @@ -625,7 +625,7 @@ func (c Config) Clone() Config { func (c Config) String() string { - return "" + return fmt.Sprintf("MigrateFrom: %d, MigrateTo: %d, Replicas: %v, Learners: %v, Role: %s, Term: %d, Version: %d, Leader: %d", c.MigrateFrom, c.MigrateTo, c.Replicas, c.Learners, c.Role, c.Term, c.Version, c.Leader) } func (c Config) IsEmpty() bool { diff --git a/pkg/ringlock/ringlock.go b/pkg/ringlock/ringlock.go new file mode 100644 index 0000000..844d387 --- /dev/null +++ b/pkg/ringlock/ringlock.go @@ -0,0 +1,39 @@ +package ringlock + +import ( + "sync" + + "github.com/WuKongIM/WuKongIM/pkg/fasthash" +) + +type RingLock struct { + locker []sync.Mutex + size uint32 // 环的大小 +} + +func NewRingLock(size int) *RingLock { + locker := make([]sync.Mutex, size) + return &RingLock{ + locker: locker, + size: uint32(size), + } +} + +// 计算哈希值并确定锁的位置 +func (r *RingLock) lockPosition(key string) int { + hash := fasthash.Hash(key) + position := hash % r.size // 使用哈希值的第一个字节来决定锁的位置 + return int(position) +} + +// 获取环型哈希锁 +func (r *RingLock) Lock(key string) { + position := r.lockPosition(key) + r.locker[position].Lock() +} + +// 释放环型哈希锁 +func (r *RingLock) Unlock(key string) { + position := r.lockPosition(key) + r.locker[position].Unlock() +}