feat: implement auto-suspend and destruction features in raft and cluster2 packages

This commit is contained in:
tt
2025-01-13 18:15:12 +08:00
parent 1406615eda
commit 060fb56df1
15 changed files with 230 additions and 37 deletions

View File

@@ -43,7 +43,7 @@ func createChannel(cfg wkdb.ChannelClusterConfig, s *Server, rg *raftgroup.RaftG
return nil, err
}
ch.Node = raft.NewNode(lastLogStartIndex, state, raft.NewOptions(raft.WithKey(channelKey), raft.WithNodeId(s.opts.NodeId)))
ch.Node = raft.NewNode(lastLogStartIndex, state, raft.NewOptions(raft.WithKey(channelKey), raft.WithAutoSuspend(true), raft.WithAutoDestory(true), raft.WithNodeId(s.opts.NodeId)))
return ch, nil
}

View File

@@ -211,6 +211,16 @@ func (s *Server) ExistChannel(channelId string, channelType uint8) bool {
return rg.GetRaft(channelKey) != nil
}
func (s *Server) Channel(channelId string, channelType uint8) *Channel {
channelKey := wkutil.ChannelToKey(channelId, channelType)
rg := s.getRaftGroup(channelKey)
raft := rg.GetRaft(channelKey)
if raft != nil {
return raft.(*Channel)
}
return nil
}
func (s *Server) RemoveChannel(channelId string, channelType uint8) {
channelKey := wkutil.ChannelToKey(channelId, channelType)
rg := s.getRaftGroup(channelKey)

View File

@@ -187,9 +187,13 @@ func (s *Server) nodeChannelsGet(c *wkhttp.Context) {
resp := NewChannelClusterConfigRespFromClusterConfig(slot.Leader, slot.Id, cfg)
if cfg.LeaderId == s.opts.ConfigOptions.NodeId {
if s.channelServer.ExistChannel(cfg.ChannelId, cfg.ChannelType) {
channel := s.channelServer.Channel(cfg.ChannelId, cfg.ChannelType)
if channel != nil {
resp.Active = 1
resp.ActiveFormat = "运行中"
if channel.Suspend() {
resp.ActiveFormat = "挂起"
}
} else {
resp.Active = 0
resp.ActiveFormat = "未运行"

View File

@@ -31,4 +31,6 @@ type SyncInfo struct {
SyncTick int // 同步计时器
GetingLogs bool // 领导是否正在查询此副本的日志中
roleSwitching bool // 角色切换中
emptySyncTick int // 空同步计时器(连续多少次tick没有同步到数据)
suspend bool // 是否挂起
}

View File

@@ -43,6 +43,10 @@ type Node struct {
sync.Mutex
truncating bool // 截断中
stopPropose bool // 停止提案
suspend bool // 挂起
idleTick int // 服务空闲计数
}
func NewNode(lastTermStartLogIndex uint64, raftState types.RaftState, opts *Options) *Node {
@@ -120,6 +124,11 @@ func (n *Node) HasReady() bool {
return len(n.events) > 0
}
// Suspend 是否挂起
func (n *Node) Suspend() bool {
return n.suspend
}
// Ready 获取待处理的事件
func (n *Node) Ready() []types.Event {
@@ -148,6 +157,13 @@ func (n *Node) Ready() []types.Event {
}
}
if n.opts.AutoDestory {
if n.idleTick > n.opts.DestoryAfterIdleTick {
n.Info("auto destory")
n.sendDestory()
}
}
events := n.events
n.events = n.events[:0]
return events

View File

@@ -59,6 +59,7 @@ func (n *Node) reset() {
n.stopPropose = false
n.syncState.replicaSync = make(map[uint64]*SyncInfo)
n.onlySync = false
n.suspend = false
// 重置选举超时时间
n.resetRandomizedElectionTimeout()
}

View File

@@ -58,7 +58,11 @@ func (n *Node) roleChangeIfNeed(oldCfg, newCfg types.Config) {
}
if oldCfg.Role != newCfg.Role || oldCfg.Leader != newCfg.Leader || oldCfg.Term != newCfg.Term {
n.Info("role change", zap.String("old", oldCfg.Role.String()), zap.String("new", newCfg.Role.String()))
if oldCfg.Role != types.RoleUnknown {
n.Info("role change", zap.String("old", oldCfg.Role.String()), zap.String("new", newCfg.Role.String()))
} else {
n.Info("role change", zap.String("new", newCfg.Role.String()))
}
if newCfg.Leader == n.opts.NodeId {
n.BecomeLeader(newCfg.Term)

View File

@@ -10,6 +10,12 @@ func (n *Node) sendNotifySync(to uint64) {
if replicaId == n.opts.NodeId {
continue
}
// 解除挂起
syncInfo := n.replicaSync[replicaId]
if syncInfo != nil {
syncInfo.emptySyncTick = 0
syncInfo.suspend = false
}
n.events = append(n.events, types.Event{
Type: types.NotifySync,
From: n.opts.NodeId,
@@ -18,6 +24,12 @@ func (n *Node) sendNotifySync(to uint64) {
})
}
} else {
// 解除挂起
syncInfo := n.replicaSync[to]
if syncInfo != nil {
syncInfo.emptySyncTick = 0
syncInfo.suspend = false
}
n.events = append(n.events, types.Event{
Type: types.NotifySync,
From: n.opts.NodeId,
@@ -158,7 +170,7 @@ func (n *Node) sendGetLogsReq(syncEvent types.Event) {
})
}
func (n *Node) sendSyncResp(to uint64, syncIndex uint64, logs []types.Log, reson types.Reason) {
func (n *Node) sendSyncResp(to uint64, syncIndex uint64, logs []types.Log, reson types.Reason, speed types.Speed) {
n.events = append(n.events, types.Event{
Type: types.SyncResp,
From: n.opts.NodeId,
@@ -168,6 +180,7 @@ func (n *Node) sendSyncResp(to uint64, syncIndex uint64, logs []types.Log, reson
Logs: logs,
CommittedIndex: n.queue.committedIndex,
Reason: reson,
Speed: speed,
})
}
@@ -220,3 +233,10 @@ func (n *Node) sendConfigResp(to uint64, cfg types.Config) {
Config: cfg,
})
}
func (n *Node) sendDestory() {
n.events = append(n.events, types.Event{
Type: types.Destory,
To: types.LocalNode,
})
}

View File

@@ -86,6 +86,7 @@ func (n *Node) stepLeader(e types.Event) error {
n.Foucs("stop propose", zap.String("key", n.Key()))
return ErrProposalDropped
}
n.idleTick = 0
err := n.queue.append(e.Logs...)
if err != nil {
return err
@@ -95,6 +96,7 @@ func (n *Node) stepLeader(e types.Event) error {
// }
n.advance()
case types.SyncReq: // 同步
n.idleTick = 0
isLearner := n.isLearner(e.From) // 当前同步节点是否是学习者
n.updateSyncInfo(e) // 更新副本同步信息
if !isLearner {
@@ -110,9 +112,21 @@ func (n *Node) stepLeader(e types.Event) error {
// 无数据可同步
if (e.Reason == types.ReasonOnlySync && e.Index > n.queue.storedIndex) || n.queue.storedIndex == 0 {
n.sendSyncResp(e.From, e.Index, nil, types.ReasonOk)
var speed types.Speed
if n.opts.AutoSuspend {
syncInfo.emptySyncTick++
// 超过空同步阈值,挂起
if syncInfo.emptySyncTick > n.opts.SuspendAfterEmptySyncTick {
speed = types.SpeedSuspend
}
}
n.sendSyncResp(e.From, e.Index, nil, types.ReasonOk, speed)
return nil
}
syncInfo.emptySyncTick = 0
if !syncInfo.GetingLogs {
syncInfo.GetingLogs = true
n.sendGetLogsReq(e)
@@ -136,7 +150,6 @@ func (n *Node) stepLeader(e types.Event) error {
// 通知副本过来同步日志
n.sendNotifySync(All)
n.advance()
}
case types.GetLogsResp: // 获取日志返回
syncInfo := n.replicaSync[e.To]
@@ -149,7 +162,7 @@ func (n *Node) stepLeader(e types.Event) error {
e.Logs[i].Record.Add(track.PositionSyncResp)
}
n.sendSyncResp(e.To, e.Index, e.Logs, e.Reason)
n.sendSyncResp(e.To, e.Index, e.Logs, e.Reason, types.SpeedFast)
n.advance()
// 角色转换
@@ -181,7 +194,7 @@ func (n *Node) stepFollower(e types.Event) error {
n.BecomeFollower(e.Term, e.From)
}
n.updateFollowCommittedIndex(e.CommittedIndex) // 更新提交索引
n.idleTick = 0
// 如果领导的配置版本大于本地配置版本,那么请求配置
if e.ConfigVersion > n.cfg.Version {
n.sendConfigReq()
@@ -190,10 +203,13 @@ func (n *Node) stepFollower(e types.Event) error {
// if n.Key() == "2&ch1" {
// n.Info("NotifySync...", zap.Uint64("from", e.From), zap.Uint64("index", e.Index))
// }
n.idleTick = 0
n.suspend = false // 解除挂起
n.sendSyncReq()
n.advance()
case types.SyncResp: // 同步返回
n.electionElapsed = 0
n.idleTick = 0
if !n.onlySync {
n.onlySync = true
}
@@ -208,6 +224,10 @@ func (n *Node) stepFollower(e types.Event) error {
}
// 推进去存储
n.advance()
} else {
if e.Speed == types.SpeedSuspend {
n.suspend = true
}
}
n.updateFollowCommittedIndex(e.CommittedIndex) // 更新提交索引
@@ -273,19 +293,22 @@ func (n *Node) stepLearner(e types.Event) error {
if n.cfg.Leader == None {
n.BecomeLearner(e.Term, e.From)
}
n.idleTick = 0
// 如果领导的配置版本大于本地配置版本,那么请求配置
if e.ConfigVersion > n.cfg.Version {
n.sendConfigReq()
}
case types.NotifySync:
n.idleTick = 0
n.sendSyncReq()
n.advance()
case types.SyncResp: // 同步返回
n.electionElapsed = 0
n.idleTick = 0
if !n.onlySync {
n.onlySync = true
}
if e.Reason == types.ReasonOk {
// if n.Key() == "clusterconfig" {
// n.Info("SyncResp...", zap.Uint64("from", e.From), zap.Uint64("index", e.Index), zap.Int("len", len(e.Logs)))
@@ -295,6 +318,10 @@ func (n *Node) stepLearner(e types.Event) error {
if err != nil {
return err
}
} else {
if e.Speed == types.SpeedSuspend {
n.suspend = true
}
}
} else if e.Reason == types.ReasonTruncate {

View File

@@ -6,7 +6,7 @@ import (
)
func (n *Node) Tick() {
n.idleTick++
if n.tickFnc != nil {
n.tickFnc()
}
@@ -17,12 +17,15 @@ func (n *Node) tickLeader() {
}
func (n *Node) tickFollower() {
n.syncElapsed++
if n.syncElapsed >= n.opts.SyncInterval && n.cfg.Leader != 0 {
if !n.hasSyncReq() {
n.sendSyncReq()
if !n.suspend { // 非挂起状态
n.syncElapsed++
if n.syncElapsed >= n.opts.SyncInterval && n.cfg.Leader != 0 {
if !n.hasSyncReq() {
n.sendSyncReq()
}
n.syncElapsed = 0
}
n.syncElapsed = 0
}
if n.opts.ElectionOn {
@@ -30,25 +33,16 @@ func (n *Node) tickFollower() {
}
}
func (n *Node) hasSyncReq() bool {
if len(n.events) == 0 {
return false
}
for _, e := range n.events {
if e.Type == types.SyncReq {
return true
}
}
return false
}
func (n *Node) tickLearner() {
n.syncElapsed++
if n.syncElapsed >= n.opts.SyncInterval && n.cfg.Leader != 0 {
if !n.hasSyncReq() {
n.sendSyncReq()
if !n.suspend { // 非挂起状态
n.syncElapsed++
if n.syncElapsed >= n.opts.SyncInterval && n.cfg.Leader != 0 {
if !n.hasSyncReq() {
n.sendSyncReq()
}
n.syncElapsed = 0
}
n.syncElapsed = 0
}
}
@@ -84,14 +78,29 @@ func (n *Node) tickHeartbeat() {
continue
}
syncInfo := n.syncState.replicaSync[replicaId]
if syncInfo != nil {
syncInfo.SyncTick++
if syncInfo == nil {
syncInfo = &SyncInfo{}
n.syncState.replicaSync[replicaId] = syncInfo
}
if syncInfo == nil || syncInfo.SyncTick > n.opts.SyncInterval {
syncInfo.SyncTick++
if syncInfo.SyncTick > n.opts.SyncInterval && !syncInfo.suspend {
n.sendPing(replicaId)
}
}
}
func (n *Node) hasSyncReq() bool {
if len(n.events) == 0 {
return false
}
for _, e := range n.events {
if e.Type == types.SyncReq {
return true
}
}
return false
}
// 是否超过选举超时时间

View File

@@ -46,6 +46,18 @@ type Options struct {
// FollowerToLeaderMinLogGap 跟随者转换为领导者的最小日志差距
FollowerToLeaderMinLogGap uint64
// AutoSuspend 是否允许自动挂起
AutoSuspend bool
// AutoDestory 是否自动销毁
AutoDestory bool
// 在空同步指定次数后,进入挂起状态
SuspendAfterEmptySyncTick int
// 空闲多久后销毁
DestoryAfterIdleTick int
}
func NewOptions(opt ...Option) *Options {
@@ -61,6 +73,10 @@ func NewOptions(opt ...Option) *Options {
LearnerToFollowerMinLogGap: 100,
LearnerToLeaderMinLogGap: 100,
FollowerToLeaderMinLogGap: 100,
SuspendAfterEmptySyncTick: 10,
AutoSuspend: false,
AutoDestory: false,
DestoryAfterIdleTick: 10 * 60 * 60, // 如果TickInterval是100ms, 那么10 * 60 * 60这个值是1小时具体时间根据TickInterval来定
}
for _, o := range opt {
@@ -154,3 +170,46 @@ func WithProposeTimeout(proposeTimeout time.Duration) Option {
opts.ProposeTimeout = proposeTimeout
}
}
func WithLearnerToLeaderMinLogGap(gap uint64) Option {
return func(opts *Options) {
opts.LearnerToLeaderMinLogGap = gap
}
}
func WithLearnerToFollowerMinLogGap(gap uint64) Option {
return func(opts *Options) {
opts.LearnerToFollowerMinLogGap = gap
}
}
func WithFollowerToLeaderMinLogGap(gap uint64) Option {
return func(opts *Options) {
opts.FollowerToLeaderMinLogGap = gap
}
}
func WithSuspendAfterEmptySyncTick(suspendAfterEmptySyncTick int) Option {
return func(opts *Options) {
opts.SuspendAfterEmptySyncTick = suspendAfterEmptySyncTick
}
}
func WithDestoryAfterIdleTick(destoryAfterIdleTick int) Option {
return func(opts *Options) {
opts.DestoryAfterIdleTick = destoryAfterIdleTick
}
}
func WithAutoSuspend(autoSuspend bool) Option {
return func(opts *Options) {
opts.AutoSuspend = autoSuspend
}
}
func WithAutoDestory(autoDestory bool) Option {
return func(opts *Options) {
opts.AutoDestory = autoDestory
}
}

View File

@@ -229,7 +229,7 @@ func (r *Raft) readyEvents() {
}
if e.To == None {
fmt.Println("none node event--->", e)
r.Foucs("none node event", zap.Any("event", e))
continue
}

View File

@@ -366,3 +366,7 @@ func (rg *RaftGroup) followerToLeader(r IRaft, followerId uint64) (types.Config,
return cfg, nil
}
func (rg *RaftGroup) handleDestory(r IRaft) {
rg.RemoveRaft(r)
}

View File

@@ -256,6 +256,9 @@ func (rg *RaftGroup) handleReady(r IRaft) bool {
case types.ApplyReq: // 处理应用请求
rg.handleApplyReq(r, e)
continue
case types.Destory: // 处理销毁请求
rg.handleDestory(r)
continue
// 角色转换
case types.LearnerToFollowerReq,
@@ -265,7 +268,7 @@ func (rg *RaftGroup) handleReady(r IRaft) bool {
continue
}
if e.To == 0 {
rg.Error("none node event", zap.Any("event", e))
rg.Foucs("none node event", zap.Any("event", e))
continue
}
if e.To == types.LocalNode {

View File

@@ -104,6 +104,8 @@ const (
ConfigReq
// ConfigResp 配置响应
ConfigResp
// Destory 销毁节点
Destory
)
func (e EventType) String() string {
@@ -164,11 +166,22 @@ func (e EventType) String() string {
return "ConfigReq"
case ConfigResp:
return "ConfigResp"
case Destory:
return "Destory"
default:
return "Unknown"
}
}
type Speed uint8
const (
// SpeedFast 快速
SpeedFast Speed = iota
// SpeedSuspend 暂停
SpeedSuspend
)
type Event struct {
// Type 事件类型
Type EventType
@@ -195,6 +208,9 @@ type Event struct {
// Reason 原因
Reason Reason
// Speed 同步速度
Speed Speed
// 不参与编码
TermStartIndexInfo *TermStartIndexInfo
StartIndex uint64
@@ -214,6 +230,7 @@ const (
configFlag
logsFlag
reasonFlag
speedFlag
)
func (e Event) Marshal() ([]byte, error) {
@@ -282,6 +299,9 @@ func (e Event) Marshal() ([]byte, error) {
if flag&reasonFlag != 0 {
enc.WriteUint8(uint8(e.Reason))
}
if flag&speedFlag != 0 {
enc.WriteUint8(uint8(e.Speed))
}
return enc.Bytes(), nil
}
@@ -434,6 +454,14 @@ func (e *Event) Unmarshal(data []byte) error {
}
e.Reason = Reason(reason)
}
if flag&speedFlag != 0 {
speed, err := dec.Uint8()
if err != nil {
return err
}
e.Speed = Speed(speed)
}
return nil
}
@@ -463,6 +491,9 @@ func (e Event) getFlag() uint32 {
if e.LastLogTerm != 0 {
flag |= lastLogTermFlag
}
if e.ConfigVersion != 0 {
flag |= configVersionFlag
}
if !e.Config.IsEmpty() {
flag |= configFlag
}
@@ -472,6 +503,9 @@ func (e Event) getFlag() uint32 {
if e.Reason != ReasonUnknown {
flag |= reasonFlag
}
if e.Speed != SpeedFast {
flag |= speedFlag
}
return flag
}