perf: Improve Raft performance

This commit is contained in:
tt
2024-12-31 00:13:58 +08:00
committed by tt
parent e5185a7cd1
commit 31920f5ce7
15 changed files with 563 additions and 94 deletions

View File

@@ -1,94 +0,0 @@
package bytequeue
import (
"bytes"
"testing"
)
func TestByteQueue(t *testing.T) {
// 创建一个新的 ByteQueue
bq := New()
defer bq.Reset() // 确保测试结束后重置
// 测试 Write 方法
t.Run("Write", func(t *testing.T) {
data := []byte("hello")
n, err := bq.Write(data)
if err != nil {
t.Fatalf("Write() failed: %v", err)
}
if n != len(data) {
t.Errorf("Write() = %d, want %d", n, len(data))
}
if !bytes.Equal(bq.buffer.B, data) {
t.Errorf("Write() buffer = %v, want %v", bq.buffer.B, data)
}
})
// 测试 Peek 方法
t.Run("Peek", func(t *testing.T) {
data := []byte("hello")
// 在偏移量 0 处读取 5 个字节
result := bq.Peek(0, 5)
if !bytes.Equal(result, data) {
t.Errorf("Peek() = %v, want %v", result, data)
}
// 读取部分数据,从位置 2 开始读取 3 个字节
result = bq.Peek(2, 3)
expected := []byte("llo")
if !bytes.Equal(result, expected) {
t.Errorf("Peek() = %v, want %v", result, expected)
}
// 测试越界startPosition 大于数据总大小
result = bq.Peek(100, 3)
if result != nil {
t.Errorf("Peek() = %v, want nil", result)
}
// 测试读取超过缓冲区剩余数据
result = bq.Peek(0, 10) // 请求更多字节
if !bytes.Equal(result, data) {
t.Errorf("Peek() = %v, want %v", result, data)
}
})
// 测试 Discard 方法
t.Run("Discard", func(t *testing.T) {
// 写入一些数据
_, _ = bq.Write([]byte("world"))
// 丢弃前 5 个字节
bq.Discard(5)
if len(bq.buffer.B) != 5 {
t.Errorf("Discard() buffer size = %d, want 5", len(bq.buffer.B))
}
// 丢弃所有数据
bq.Discard(10)
if len(bq.buffer.B) != 0 {
t.Errorf("Discard() buffer size = %d, want 0", len(bq.buffer.B))
}
})
// 测试 Reset 方法
t.Run("Reset", func(t *testing.T) {
// 写入数据
_, _ = bq.Write([]byte("reset"))
// 重置
bq.Reset()
// 确保缓冲区已清空
if len(bq.buffer.B) != 0 {
t.Errorf("Reset() buffer size = %d, want 0", len(bq.buffer.B))
}
if bq.offsetSize != 0 {
t.Errorf("Reset() offsetSize = %d, want 0", bq.offsetSize)
}
if bq.totalSize != 0 {
t.Errorf("Reset() totalSize = %d, want 0", bq.totalSize)
}
})
}

13
pkg/raft/README.md Normal file
View File

@@ -0,0 +1,13 @@
```go
node := raft.NewNode(opt...)
// ready
ready := node.Ready()
// step
node.Step(m)
// switch config
node.SwitchConfig(cfg)
```

22
pkg/raft/common.go Normal file
View File

@@ -0,0 +1,22 @@
package raft
import (
"crypto/rand"
"math/big"
"sync"
)
var globalRand = &lockedRand{}
type lockedRand struct {
mu sync.Mutex
}
func (r *lockedRand) Intn(n int) int {
r.mu.Lock()
v, _ := rand.Int(rand.Reader, big.NewInt(int64(n)))
r.mu.Unlock()
return int(v.Int64())
}
const None uint64 = 0

82
pkg/raft/event.go Normal file
View File

@@ -0,0 +1,82 @@
package raft
type EventType uint16
const (
// Unknown 未知
Unknown EventType = iota
// Ping 心跳, leader -> follower, follower节点收到心跳后需要发起Sync来响应
Ping
// ConfChange 配置变更, all node
ConfChange
// Propose 提案, leader
Propose
// SyncReq 同步 follower -> leader
SyncReq
// SyncResp 同步响应, leader -> follower
SyncResp
// VoteReq 投票, candidate
VoteReq
// VoteResp 投票响应, all node -> candidate
VoteResp
// StorageReq 存储日志, local event
StorageReq
// StorageResp 存储日志响应, local event
StorageResp
// ApplyReq 应用日志, local event
ApplyReq
// ApplyResp 应用日志响应, local event
ApplyResp
)
func (e EventType) String() string {
switch e {
case Unknown:
return "Unknown"
case Ping:
return "Ping"
case ConfChange:
return "ConfChange"
case Propose:
return "Propose"
case SyncReq:
return "SyncReq"
case SyncResp:
return "SyncResp"
case VoteReq:
return "VoteReq"
case VoteResp:
return "VoteResp"
case StorageReq:
return "StorageReq"
case StorageResp:
return "StorageResp"
case ApplyReq:
return "ApplyReq"
case ApplyResp:
return "ApplyResp"
default:
return "Unknown"
}
}
type Event struct {
// Type 事件类型
Type EventType
// From 事件发起者
From uint64
// To 事件接收者
To uint64
// Term 任期
Term uint32
// Index 日志下标
Index uint64
// CommittedIndex 已提交日志下标
CommittedIndex uint64
// Logs 日志
Logs []*Log
// Reject 是否拒绝
Reject bool
Config Config
}

13
pkg/raft/log.go Normal file
View File

@@ -0,0 +1,13 @@
package raft
import "time"
type Log struct {
Id uint64
Index uint64 // 日志下标
Term uint32 // 领导任期
Data []byte // 日志数据
// 不参与编码
Time time.Time // 日志时间
}

14
pkg/raft/model.go Normal file
View File

@@ -0,0 +1,14 @@
package raft
type Config struct {
MigrateFrom uint64 // 迁移源节点
MigrateTo uint64 // 迁移目标节点
Replicas []uint64 // 副本集合(不包含节点自己)
Learners []uint64 // 学习节点集合
Role Role // 节点角色
Term uint32 // 领导任期
Version uint64 // 配置版本
// 不参与编码
Leader uint64 // 领导ID
}

67
pkg/raft/node.go Normal file
View File

@@ -0,0 +1,67 @@
package raft
import (
"github.com/WuKongIM/WuKongIM/pkg/wklog"
)
type Role uint8
const (
RoleUnknown Role = iota
// RoleFollower 跟随者
RoleFollower
// RoleCandidate 候选人
RoleCandidate
// RoleLeader 领导者
RoleLeader
// RoleLearner 学习者
RoleLearner
)
type electionState struct {
electionElapsed int // 选举计时器
randomizedElectionTimeout int // 随机选举超时时间
voteFor uint64 // 投票给了谁
}
type Node struct {
events []Event
opts *Options
syncElapsed int // 同步计数
leaderId uint64 // 领导者ID
tickFnc func()
stepFunc func(event Event) error
queue *queue
wklog.Log
heartbeatElapsed int // 心跳计时器
cfg Config // 分布式配置
electionState // 选举状态
}
func NewNode(opts *Options) *Node {
n := &Node{
opts: opts,
Log: wklog.NewWKLog("raft.node"),
}
for _, id := range opts.Replicas {
if id == opts.NodeId {
continue
}
n.cfg.Replicas = append(n.cfg.Replicas, id)
}
n.cfg.Term = opts.Log.LastTerm
n.queue = newQueue(opts.Log.AppliedIndex, opts.Log.LastIndex)
return n
}
func (n *Node) Ready() []Event {
return nil
}
func (n *Node) switchConfig(cfg Config) {
}

33
pkg/raft/node_become.go Normal file
View File

@@ -0,0 +1,33 @@
package raft
import "go.uber.org/zap"
func (n *Node) becomeCandidate() {
if n.cfg.Role == RoleLeader {
n.Panic("invalid transition [leader -> candidate]")
}
n.cfg.Term++
n.stepFunc = n.stepCandidate
n.reset()
n.tickFnc = n.tickCandidate
n.voteFor = n.opts.NodeId
n.leaderId = 0
n.cfg.Role = RoleCandidate
n.Info("become candidate", zap.Uint32("term", n.cfg.Term))
}
func (n *Node) becomeFollower(term uint32, leaderId uint64) {
}
func (n *Node) becomeLeader() {
}
func (n *Node) becomeLearner(term uint32, leaderId uint64) {
}
func (n *Node) reset() {
}

29
pkg/raft/node_send.go Normal file
View File

@@ -0,0 +1,29 @@
package raft
func (n *Node) sendSyncReq(to uint64) {
n.events = append(n.events, Event{
Type: SyncReq,
From: n.opts.NodeId,
To: to,
Term: n.cfg.Term,
})
}
func (n *Node) sendVoteReq(to uint64) {
n.events = append(n.events, Event{
Type: VoteReq,
From: n.opts.NodeId,
To: to,
Term: n.cfg.Term,
})
}
func (n *Node) sendVoteResp(to uint64, reject bool) {
n.events = append(n.events, Event{
Type: VoteResp,
From: n.opts.NodeId,
To: to,
Term: n.cfg.Term,
Reject: reject,
})
}

97
pkg/raft/node_step.go Normal file
View File

@@ -0,0 +1,97 @@
package raft
import "go.uber.org/zap"
func (n *Node) Step(e Event) error {
switch {
case n.cfg.Term == 0: // 本地消息
case n.cfg.Term < e.Term: // 低于当前任期
n.Info("received event with lower term", zap.Uint32("term", e.Term), zap.Uint32("currentTerm", n.cfg.Term), zap.Uint64("from", e.From), zap.Uint64("to", e.To), zap.String("type", e.Type.String()))
return nil
case n.cfg.Term > e.Term: // 高于当前任期
if n.cfg.Term > 0 {
n.Info("received event with higher term", zap.Uint32("term", n.cfg.Term), zap.Uint32("currentTerm", e.Term), zap.Uint64("from", e.From), zap.Uint64("to", e.To), zap.String("type", e.Type.String()))
}
if e.Type == Ping || e.Type == SyncResp {
if n.cfg.Role == RoleLearner {
n.becomeLearner(e.Term, e.From)
} else {
n.becomeFollower(e.Term, e.From)
}
} else {
if n.cfg.Role == RoleLearner {
n.Warn("become learner but leader is none", zap.Uint64("from", e.From), zap.String("type", e.Type.String()), zap.Uint32("term", e.Term))
n.becomeLearner(e.Term, None)
} else {
n.Warn("become follower but leader is none", zap.Uint64("from", e.From), zap.String("type", e.Type.String()), zap.Uint32("term", e.Term))
n.becomeFollower(e.Term, None)
}
}
}
switch e.Type {
case ConfChange: // 配置变更
n.switchConfig(e.Config)
case VoteReq: // 投票请求
if n.canVote(e) {
n.sendVoteResp(e.From, false)
n.voteFor = e.From
n.electionElapsed = 0
n.Info("agree vote", zap.Uint64("voteFor", e.From), zap.Uint32("term", e.Term), zap.Uint64("index", e.Index))
} else {
if n.voteFor != None {
n.Info("already vote for other", zap.Uint64("voteFor", n.voteFor))
} else if e.Index < n.queue.lastLogIndex {
n.Info("lower config version, reject vote")
} else if e.Term < n.cfg.Term {
n.Info("lower term, reject vote")
}
n.Info("reject vote", zap.Uint64("from", e.From), zap.Uint32("term", e.Term), zap.Uint64("index", e.Index))
n.sendVoteResp(e.From, true)
}
}
return nil
}
func (n *Node) stepLeader(e Event) error {
switch e.Type {
case Propose: // 提案
case SyncReq: // 同步
}
return nil
}
func (n *Node) stepFollower(e Event) error {
return nil
}
func (n *Node) stepCandidate(e Event) error {
return nil
}
// 是否可以投票
func (n *Node) canVote(e Event) bool {
if n.cfg.Term > e.Term { // 如果当前任期大于候选人任期,拒绝投票
return false
}
if n.voteFor != None && n.voteFor != e.From { // 如果已经投票给其他节点,拒绝投票
return false
}
lastIndex := n.queue.lastLogIndex
lastTerm := n.cfg.Term // 获取当前节点最后一条日志下标和任期
candidateLog := e.Logs[0] // 候选人最后一条日志信息
if candidateLog.Term < lastTerm || candidateLog.Term == lastTerm && candidateLog.Index < lastIndex { // 如果候选人日志小于本地日志,拒绝投票
return false
}
return true
}

55
pkg/raft/node_tick.go Normal file
View File

@@ -0,0 +1,55 @@
package raft
import "go.uber.org/zap"
func (n *Node) Tick() {
if n.tickFnc != nil {
n.tickFnc()
}
}
func (n *Node) tickFollower() {
n.syncElapsed++
if n.syncElapsed >= n.opts.SyncInterval && n.leaderId != 0 {
n.sendSyncReq(n.leaderId)
n.syncElapsed = 0
}
if n.opts.ElectionOn {
n.tickElection()
}
}
func (n *Node) tickCandidate() {
n.tickElection()
}
func (n *Node) tickElection() {
n.electionElapsed++
if n.pastElectionTimeout() {
n.electionElapsed = 0
n.campaign()
}
}
// 是否超过选举超时时间
func (n *Node) pastElectionTimeout() bool {
return n.electionElapsed >= n.randomizedElectionTimeout
}
// 重置随机选举超时时间
func (n *Node) resetRandomizedElectionTimeout() {
n.randomizedElectionTimeout = n.opts.ElectionInterval + globalRand.Intn(n.opts.ElectionInterval)
}
// 开始选举
func (n *Node) campaign() {
n.becomeCandidate()
for _, nodeId := range n.cfg.Replicas {
n.Info("sent vote request", zap.Uint64("from", n.opts.NodeId), zap.Uint64("to", nodeId), zap.Uint32("term", n.cfg.Term))
n.sendVoteReq(nodeId)
}
// 自己给自己投一票
n.sendVoteReq(n.opts.NodeId)
}

35
pkg/raft/options.go Normal file
View File

@@ -0,0 +1,35 @@
package raft
type Options struct {
// NodeId 节点ID
NodeId uint64
// SyncInterval 同步间隔, 单位: tick, 表示多少个tick发起一次同步
SyncInterval int
// ElectionOn 是否开启选举, 如果开启选举那么节点之间会自己选举出一个领导者默认为false
ElectionOn bool
// HeartbeatInterval 心跳间隔tick次数, 就是tick触发几次算一次心跳一般为1 一次tick算一次心跳
HeartbeatInterval int
// ElectionInterval 选举间隔tick次数超过此tick数则发起选举
ElectionInterval int
// Replicas 副本的节点id不包含节点自己
Replicas []uint64
Log struct {
// AppliedIndex 已应用的日志下标
AppliedIndex uint64
// LastIndex 最新日志下标
LastIndex uint64
// LastTerm 最新任期
LastTerm uint32
}
}
func NewOptions() *Options {
return &Options{
SyncInterval: 1,
ElectionOn: false,
HeartbeatInterval: 1,
ElectionInterval: 10,
}
}

44
pkg/raft/queue.go Normal file
View File

@@ -0,0 +1,44 @@
package raft
import (
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"go.uber.org/zap"
)
type queue struct {
logs []Log
wklog.Log
storageOffset uint64 // 存储的日志偏移,比如日志 1 2 3 4 5 6 7 8 9存储的日志偏移是6 表示1 2 3 4 5 6已经存储
lastLogIndex uint64 // 最后一条日志下标
}
func newQueue(storageOffset, lastLogIndex uint64) *queue {
return &queue{
Log: wklog.NewWKLog("queue"),
storageOffset: storageOffset,
lastLogIndex: lastLogIndex,
}
}
func (r *queue) append(log ...Log) {
appendStartLogIndex := log[0].Index - 1
if appendStartLogIndex == r.storageOffset+uint64(len(r.logs)) {
r.logs = append(r.logs, log...)
r.lastLogIndex = log[len(log)-1].Index
return
} else {
r.Warn("append log index is not continuous", zap.Uint64("appendStartLogIndex", appendStartLogIndex), zap.Uint64("offset", r.storageOffset), zap.Int("logsLen", len(r.logs)))
}
}
// storageTo 截取日志到指定日志下标比如storageTo(6)如果日志是1 2 3 4 5 6 7 8 9截取后是7 8 9
func (r *queue) storageTo(logIndex uint64) {
if logIndex <= r.storageOffset {
return
}
if logIndex > r.lastLogIndex {
r.Panic("storageTo logIndex is out of bound", zap.Uint64("logIndex", logIndex), zap.Uint64("lastLogIndex", r.lastLogIndex))
}
r.logs = r.logs[logIndex-r.storageOffset:]
r.storageOffset = logIndex
}

51
pkg/raft/queue_test.go Normal file
View File

@@ -0,0 +1,51 @@
package raft
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestQueueAppend(t *testing.T) {
q := newQueue(0, 0)
log1 := Log{Id: 1, Index: 1, Term: 1, Data: []byte("log1"), Time: time.Now()}
log2 := Log{Id: 2, Index: 2, Term: 1, Data: []byte("log2"), Time: time.Now()}
q.append(log1, log2)
assert.Equal(t, uint64(2), q.lastLogIndex)
assert.Equal(t, 2, len(q.logs))
assert.Equal(t, log1, q.logs[0])
assert.Equal(t, log2, q.logs[1])
}
func TestQueueStorageTo(t *testing.T) {
q := newQueue(0, 0)
log1 := Log{Id: 1, Index: 1, Term: 1, Data: []byte("log1"), Time: time.Now()}
log2 := Log{Id: 2, Index: 2, Term: 1, Data: []byte("log2"), Time: time.Now()}
log3 := Log{Id: 3, Index: 3, Term: 1, Data: []byte("log3"), Time: time.Now()}
q.append(log1, log2, log3)
q.storageTo(2)
assert.Equal(t, uint64(2), q.storageOffset)
assert.Equal(t, 1, len(q.logs))
assert.Equal(t, log3, q.logs[0])
}
func TestQueueStorageToOutOfBound(t *testing.T) {
q := newQueue(0, 0)
log1 := Log{Id: 1, Index: 1, Term: 1, Data: []byte("log1"), Time: time.Now()}
log2 := Log{Id: 2, Index: 2, Term: 1, Data: []byte("log2"), Time: time.Now()}
q.append(log1, log2)
assert.Panics(t, func() {
q.storageTo(3)
})
}

8
pkg/raft/raft.go Normal file
View File

@@ -0,0 +1,8 @@
package raft
type Raft struct {
}
func New() *Raft {
return &Raft{}
}