fix: cmd tag error

This commit is contained in:
tt
2025-01-03 13:28:18 +08:00
parent 9dfbad259c
commit 0efdbdcb17
11 changed files with 292 additions and 69 deletions

View File

@@ -16,7 +16,7 @@ deploy-arm:
docker tag wukongimarm64 wukongim/wukongim:latest-arm64
docker push wukongim/wukongim:latest-arm64
deploy-v2-dev:
docker build -t wukongim .
docker build -t wukongim . --platform linux/amd64
docker tag wukongim registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.1.1-20241230-dev
docker push registry.cn-shanghai.aliyuncs.com/wukongim/wukongim:v2.1.1-20241230-dev
deploy-v2:

View File

@@ -228,6 +228,10 @@ func (ch *channel) addSubscriberWithReq(req subscriberAddReq) error {
ch.Error("移除所有订阅者失败!", zap.Error(err), zap.String("channelId", req.ChannelId), zap.Uint8("channelType", req.ChannelType))
return err
}
tagKey := service.TagManager.GetChannelTag(req.ChannelId, req.ChannelType)
if tagKey != "" {
service.TagManager.RemoveTag(tagKey)
}
} else {
members, err := service.Store.GetSubscribers(req.ChannelId, req.ChannelType)
if err != nil {

View File

@@ -1,6 +1,8 @@
package handler
import (
"errors"
"github.com/WuKongIM/WuKongIM/internal/eventbus"
"github.com/WuKongIM/WuKongIM/internal/ingress"
"github.com/WuKongIM/WuKongIM/internal/options"
@@ -22,7 +24,7 @@ func (h *Handler) distribute(ctx *eventbus.ChannelContext) {
// 消息分发
if options.G.IsOnlineCmdChannel(ctx.ChannelId) {
// 分发cmd消息
// 分发在线cmd消息
h.distributeOnlineCmd(ctx)
} else {
// 分发普通消息
@@ -182,7 +184,7 @@ func (h *Handler) getCommonTag(ctx *eventbus.ChannelContext) (*types.Tag, error)
// 如果当前节点是频道的领导者节点则可以make tag
if options.G.IsLocalNode(ctx.LeaderId) {
return h.getOrMakeTag(ctx.ChannelId, ctx.ChannelType)
return h.getOrMakeTagForLeader(ctx.ChannelId, ctx.ChannelType)
}
tagKey := ctx.Events[0].TagKey
tag, err := h.commonService.GetOrRequestAndMakeTagWithLocal(ctx.ChannelId, ctx.ChannelType, tagKey)
@@ -217,33 +219,22 @@ func (h *Handler) requestTag(leaderId uint64, tagKey string) (*types.Tag, error)
return tag, nil
}
func (h *Handler) getOrMakeTag(fakeChannelId string, channelType uint8) (*types.Tag, error) {
func (h *Handler) getOrMakeTagForLeader(fakeChannelId string, channelType uint8) (*types.Tag, error) {
var (
tag *types.Tag
err error
orgFakeChannelId string = fakeChannelId
tag *types.Tag
err error
)
if options.G.IsCmdChannel(fakeChannelId) {
orgFakeChannelId = options.G.CmdChannelConvertOrginalChannel(fakeChannelId)
}
tagKey := service.TagManager.GetChannelTag(orgFakeChannelId, channelType)
tagKey := service.TagManager.GetChannelTag(fakeChannelId, channelType)
if tagKey != "" {
tag = service.TagManager.Get(tagKey)
}
if tag == nil {
// 如果是命令频道不能创建tag只能去原频道获取
if options.G.IsCmdChannel(fakeChannelId) {
tag, err = h.commonService.GetOrRequestAndMakeTag(orgFakeChannelId, channelType, tagKey, 0)
if err != nil {
h.Error("processMakeTag: getOrRequestAndMakeTag failed", zap.Error(err), zap.String("tagKey", tagKey), zap.String("orgFakeChannelId", orgFakeChannelId))
return nil, err
}
} else {
tag, err = h.makeChannelTag(orgFakeChannelId, channelType)
if err != nil {
h.Error("processMakeTag: makeTag failed", zap.Error(err), zap.String("tagKey", tagKey))
return nil, err
}
// 如果没有则制作tag
tag, err = h.makeChannelTag(fakeChannelId, channelType)
if err != nil {
h.Error("processMakeTag: makeTag failed", zap.Error(err), zap.String("tagKey", tagKey))
return nil, err
}
}
@@ -253,18 +244,66 @@ func (h *Handler) getOrMakeTag(fakeChannelId string, channelType uint8) (*types.
func (h *Handler) makeChannelTag(fakeChannelId string, channelType uint8) (*types.Tag, error) {
var (
subscribers []string
orgFakeChannelId string = fakeChannelId
subscribers []string
)
if options.G.IsCmdChannel(fakeChannelId) {
// 处理命令频道
orgFakeChannelId = options.G.CmdChannelConvertOrginalChannel(fakeChannelId)
}
if channelType == wkproto.ChannelTypePerson { // 个人频道
var orgFakeChannelId = fakeChannelId
if options.G.IsCmdChannel(fakeChannelId) {
// 处理命令频道
orgFakeChannelId = options.G.CmdChannelConvertOrginalChannel(fakeChannelId)
}
u1, u2 := options.GetFromUIDAndToUIDWith(orgFakeChannelId)
subscribers = append(subscribers, u1, u2)
} else {
// 如果是cmd频道需要去对应的源频道获取订阅者来制作tag
if options.G.IsCmdChannel(fakeChannelId) {
var err error
subscribers, err = h.getCmdSubscribers(fakeChannelId, channelType)
if err != nil {
h.Error("processMakeTag: getCmdSubscribers failed", zap.Error(err), zap.String("fakeChannelId", fakeChannelId), zap.Uint8("channelType", channelType))
return nil, err
}
} else {
members, err := service.Store.GetSubscribers(fakeChannelId, channelType)
if err != nil {
h.Error("processMakeTag: getSubscribers failed", zap.Error(err), zap.String("fakeChannelId", fakeChannelId), zap.Uint8("channelType", channelType))
return nil, err
}
for _, member := range members {
subscribers = append(subscribers, member.Uid)
}
}
}
tag, err := service.TagManager.MakeTag(subscribers)
if err != nil {
h.Error("processMakeTag: makeTag failed", zap.Error(err), zap.String("fakeChannelId", fakeChannelId), zap.Uint8("channelType", channelType))
return nil, err
}
service.TagManager.SetChannelTag(fakeChannelId, channelType, tag.Key)
return tag, nil
}
// 获取cmd频道的订阅者
func (h *Handler) getCmdSubscribers(channelId string, channelType uint8) ([]string, error) {
// 原频道id
orgFakeChannelId := options.G.CmdChannelConvertOrginalChannel(channelId)
// 获取原频道的领导节点id
leaderNode, err := service.Cluster.LeaderOfChannelForRead(orgFakeChannelId, channelType)
if err != nil {
h.Error("processMakeTag: get leaderNode failed", zap.Error(err), zap.String("fakeChannelId", channelId), zap.Uint8("channelType", channelType))
return nil, err
}
if leaderNode == nil {
h.Error("processMakeTag: leaderNode is nil", zap.String("fakeChannelId", channelId), zap.Uint8("channelType", channelType))
return nil, errors.New("leaderNode is nil")
}
leaderId := leaderNode.Id
// 如果是本地节点,则直接获取订阅者
var subscribers []string
if options.G.IsLocalNode(leaderId) {
members, err := service.Store.GetSubscribers(orgFakeChannelId, channelType)
if err != nil {
h.Error("processMakeTag: getSubscribers failed", zap.Error(err), zap.String("orgFakeChannelId", orgFakeChannelId), zap.Uint8("channelType", channelType))
@@ -273,14 +312,15 @@ func (h *Handler) makeChannelTag(fakeChannelId string, channelType uint8) (*type
for _, member := range members {
subscribers = append(subscribers, member.Uid)
}
} else {
// 如果不是本地节点,则去请求领导节点获取订阅者
subscribers, err = h.client.RequestSubscribers(leaderId, orgFakeChannelId, channelType)
if err != nil {
h.Error("processMakeTag: requestSubscribers failed", zap.Error(err), zap.String("orgFakeChannelId", orgFakeChannelId), zap.Uint8("channelType", channelType))
return nil, err
}
}
tag, err := service.TagManager.MakeTag(subscribers)
if err != nil {
h.Error("processMakeTag: makeTag failed", zap.Error(err), zap.String("orgFakeChannelId", orgFakeChannelId), zap.Uint8("channelType", channelType))
return nil, err
}
service.TagManager.SetChannelTag(orgFakeChannelId, channelType, tag.Key)
return tag, nil
return subscribers, nil
}
func (h *Handler) isOnline(uid string) bool {

View File

@@ -1,7 +1,6 @@
package common
import (
"context"
"time"
"github.com/RussellLuo/timingwheel"
@@ -65,14 +64,14 @@ func (s *everyScheduler) Next(prev time.Time) time.Time {
// targetNodeId 目标节点的uids如果为0表示获取所有节点的uids
func (s *Service) GetOrRequestAndMakeTag(fakeChannelId string, channelType uint8, tagKey string, targetNodeId uint64) (*types.Tag, error) {
realFakeChannelId := fakeChannelId
if options.G.IsCmdChannel(fakeChannelId) {
realFakeChannelId = options.G.CmdChannelConvertOrginalChannel(fakeChannelId)
}
// realFakeChannelId := fakeChannelId
// if options.G.IsCmdChannel(fakeChannelId) {
// realFakeChannelId = options.G.CmdChannelConvertOrginalChannel(fakeChannelId)
// }
var tag *types.Tag
if tagKey == "" {
tagKey = service.TagManager.GetChannelTag(realFakeChannelId, channelType)
tagKey = service.TagManager.GetChannelTag(fakeChannelId, channelType)
}
if tagKey != "" {
tag = service.TagManager.Get(tagKey)
@@ -81,33 +80,35 @@ func (s *Service) GetOrRequestAndMakeTag(fakeChannelId string, channelType uint8
return tag, nil
}
if channelType == wkproto.ChannelTypePerson {
return s.getOrMakePersonTag(realFakeChannelId)
return s.getOrMakePersonTag(fakeChannelId)
}
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
leader, err := service.Cluster.LeaderOfChannel(timeoutCtx, realFakeChannelId, channelType)
cancel()
leader, err := service.Cluster.LeaderOfChannelForRead(fakeChannelId, channelType)
if err != nil {
wklog.Error("GetOrRequestTag: getLeaderOfChannel failed", zap.Error(err), zap.String("channelId", realFakeChannelId), zap.Uint8("channelType", channelType))
wklog.Error("GetOrRequestTag: getLeaderOfChannel failed", zap.Error(err), zap.String("channelId", fakeChannelId), zap.Uint8("channelType", channelType))
return nil, err
}
if leader == nil {
wklog.Warn("GetOrRequestTag: leader is nil", zap.String("channelId", fakeChannelId), zap.Uint8("channelType", channelType))
return nil, errors.ChannelNotExist(fakeChannelId)
}
// tagKey在频道的领导节点是一定存在的
// 如果不存在可能就是失效了,这里直接忽略,只能等下条消息触发重构tag
if leader.Id == options.G.Cluster.NodeId {
wklog.Warn("tag not exist in leader node", zap.String("tagKey", tagKey), zap.String("fakeChannelId", realFakeChannelId), zap.Uint8("channelType", channelType))
if options.G.IsLocalNode(leader.Id) {
wklog.Warn("tag not exist in leader node", zap.String("tagKey", tagKey), zap.String("fakeChannelId", fakeChannelId), zap.Uint8("channelType", channelType))
return nil, errors.TagNotExist(tagKey)
}
// 去领导节点请求
tagResp, err := s.client.RequestTag(leader.Id, &ingress.TagReq{
TagKey: tagKey,
ChannelId: realFakeChannelId,
ChannelId: fakeChannelId,
ChannelType: channelType,
NodeId: targetNodeId,
})
if err != nil {
s.Error("GetOrRequestTag: get tag failed", zap.Error(err), zap.Uint64("leaderId", leader.Id), zap.String("channelId", realFakeChannelId), zap.Uint8("channelType", channelType))
s.Error("GetOrRequestTag: get tag failed", zap.Error(err), zap.Uint64("leaderId", leader.Id), zap.String("channelId", fakeChannelId), zap.Uint8("channelType", channelType))
return nil, err
}
@@ -116,7 +117,7 @@ func (s *Service) GetOrRequestAndMakeTag(fakeChannelId string, channelType uint8
s.Error("GetOrRequestTag: make tag failed", zap.Error(err))
return nil, err
}
service.TagManager.SetChannelTag(realFakeChannelId, channelType, tagKey)
service.TagManager.SetChannelTag(fakeChannelId, channelType, tagKey)
return tag, nil
}
@@ -126,8 +127,13 @@ func (s *Service) GetOrRequestAndMakeTagWithLocal(fakeChannelId string, channelT
// 获取个人频道的投递tag
func (s *Service) getOrMakePersonTag(fakeChannelId string) (*types.Tag, error) {
realFakeChannelId := fakeChannelId
if options.G.IsCmdChannel(fakeChannelId) {
realFakeChannelId = options.G.CmdChannelConvertOrginalChannel(fakeChannelId)
}
// 处理普通假个人频道
u1, u2 := options.GetFromUIDAndToUIDWith(fakeChannelId)
u1, u2 := options.GetFromUIDAndToUIDWith(realFakeChannelId)
subscribers := []string{u1, u2}
tag, err := service.TagManager.MakeTag(subscribers)
if err != nil {

View File

@@ -8,4 +8,7 @@ var (
}
TagSlotLeaderIsZero = errors.New("tag slot leader is 0")
TagKeyEmpty = errors.New("tagKey is empty")
ChannelNotExist = func(channelId string) error {
return errors.New("channel not exist: " + channelId)
}
)

View File

@@ -76,6 +76,33 @@ func (c *Client) RequestAllowSendForPerson(toNodeId uint64, from, to string) (*p
return c.request(toNodeId, "/wk/ingress/allowSend", data)
}
func (c *Client) RequestSubscribers(toNodeId uint64, channelId string, channelType uint8) ([]string, error) {
req := &ChannelReq{
ChannelId: channelId,
ChannelType: channelType,
}
data, err := req.Encode()
if err != nil {
return nil, err
}
resp, err := c.request(toNodeId, "/wk/ingress/getSubscribers", data)
if err != nil {
return nil, err
}
err = c.handleRespError(resp)
if err != nil {
return nil, err
}
subResp := &SubscribersResp{}
err = subResp.Decode(resp.Body)
if err != nil {
return nil, err
}
return subResp.Subscribers, nil
}
func (c *Client) request(toNodeId uint64, path string, body []byte) (*proto.Response, error) {
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()

View File

@@ -161,3 +161,58 @@ func (t *TagUpdateReq) Decode(data []byte) error {
t.ChannelTag = wkutil.Uint8ToBool(channelTag)
return nil
}
type ChannelReq struct {
ChannelId string
ChannelType uint8
}
func (c *ChannelReq) Encode() ([]byte, error) {
enc := wkproto.NewEncoder()
enc.WriteString(c.ChannelId)
enc.WriteUint8(c.ChannelType)
return enc.Bytes(), nil
}
func (c *ChannelReq) Decode(data []byte) error {
dec := wkproto.NewDecoder(data)
var err error
if c.ChannelId, err = dec.String(); err != nil {
return err
}
if c.ChannelType, err = dec.Uint8(); err != nil {
return err
}
return nil
}
type SubscribersResp struct {
Subscribers []string
}
func (s *SubscribersResp) Encode() ([]byte, error) {
enc := wkproto.NewEncoder()
defer enc.End()
enc.WriteUint32(uint32(len(s.Subscribers)))
for _, uid := range s.Subscribers {
enc.WriteString(uid)
}
return enc.Bytes(), nil
}
func (s *SubscribersResp) Decode(data []byte) error {
dec := wkproto.NewDecoder(data)
count, err := dec.Uint32()
if err != nil {
return err
}
s.Subscribers = make([]string, 0, count)
for i := 0; i < int(count); i++ {
uid, err := dec.String()
if err != nil {
return err
}
s.Subscribers = append(s.Subscribers, uid)
}
return nil
}

View File

@@ -3,7 +3,6 @@ package ingress
import (
"errors"
"github.com/WuKongIM/WuKongIM/internal/options"
"github.com/WuKongIM/WuKongIM/internal/service"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
"github.com/WuKongIM/WuKongIM/pkg/wkserver"
@@ -31,6 +30,8 @@ func (i *Ingress) SetRoutes() {
service.Cluster.Route("/wk/ingress/allowSend", i.handleAllowSend)
// 更新tag
service.Cluster.Route("/wk/ingress/updateTag", i.handleUpdateTag)
// 获取订阅者
service.Cluster.Route("/wk/ingress/getSubscribers", i.handleGetSubscribers)
}
@@ -125,15 +126,15 @@ func (i *Ingress) handleUpdateTag(c *wkserver.Context) {
return
}
realFakeChannelId := req.ChannelId
if options.G.IsCmdChannel(req.ChannelId) {
realFakeChannelId = options.G.CmdChannelConvertOrginalChannel(req.ChannelId)
}
// realFakeChannelId := req.ChannelId
// if options.G.IsCmdChannel(req.ChannelId) {
// realFakeChannelId = options.G.CmdChannelConvertOrginalChannel(req.ChannelId)
// }
tagKey := req.TagKey
if tagKey == "" {
if req.ChannelId != "" {
tagKey = service.TagManager.GetChannelTag(realFakeChannelId, req.ChannelType)
tagKey = service.TagManager.GetChannelTag(req.ChannelId, req.ChannelType)
}
}
if tagKey != "" {
@@ -155,7 +156,7 @@ func (i *Ingress) handleUpdateTag(c *wkserver.Context) {
if err != nil {
i.Warn("handleUpdateTag: rename tag failed", zap.Error(err))
}
service.TagManager.SetChannelTag(realFakeChannelId, req.ChannelType, newTagKey)
service.TagManager.SetChannelTag(req.ChannelId, req.ChannelType, newTagKey)
}
} else {
@@ -170,3 +171,42 @@ func (i *Ingress) handleUpdateTag(c *wkserver.Context) {
c.WriteOk()
}
func (i *Ingress) handleGetSubscribers(c *wkserver.Context) {
req := &ChannelReq{}
err := req.Decode(c.Body())
if err != nil {
i.Error("handleGetSubscribers: decode failed", zap.Error(err))
c.WriteErr(err)
return
}
if req.ChannelId == "" {
i.Error("handleGetSubscribers: channelId is nil", zap.Any("req", req))
c.WriteErr(errors.New("channelId is nil"))
return
}
members, err := service.Store.GetSubscribers(req.ChannelId, req.ChannelType)
if err != nil {
i.Error("handleGetSubscribers: get subscribers failed", zap.Error(err))
c.WriteErr(err)
return
}
subscribers := make([]string, 0, len(members))
for _, member := range members {
subscribers = append(subscribers, member.Uid)
}
resp := &SubscribersResp{
Subscribers: subscribers,
}
data, err := resp.Encode()
if err != nil {
i.Error("handleGetSubscribers: encode failed", zap.Error(err))
c.WriteErr(err)
return
}
c.Write(data)
}

View File

@@ -25,15 +25,18 @@ type tagBlucket struct {
}
stopper *syncutil.Stopper
wklog.Log
existTagFnc func(tagKey string) bool
}
func newTagBlucket(index int, expire time.Duration) *tagBlucket {
func newTagBlucket(index int, expire time.Duration, existTagFnc func(tagKey string) bool) *tagBlucket {
b := &tagBlucket{
index: index,
expire: expire,
stopper: syncutil.NewStopper(),
Log: wklog.NewWKLog("tagBlucket"),
index: index,
expire: expire,
stopper: syncutil.NewStopper(),
Log: wklog.NewWKLog("tagBlucket"),
existTagFnc: existTagFnc,
}
b.channel.m = make(map[string]string)
b.tag.m = make(map[string]*types.Tag)
@@ -70,8 +73,7 @@ func (b *tagBlucket) loop() {
func (b *tagBlucket) checkExpireTags() {
b.tag.Lock()
defer b.tag.Unlock()
// tag过期检查
var removeTags []string // 需要移除的tagKey
for _, tag := range b.tag.m {
if time.Since(tag.LastGetTime) > b.expire {
@@ -89,6 +91,27 @@ func (b *tagBlucket) checkExpireTags() {
b.Info("checkExpireTags: remove tags", zap.Int("count", len(removeTags)), zap.String("removeTag", removeTags[0]))
}
b.tag.Unlock()
// channel tag过期检查
b.channel.Lock()
var removeChannels []string
for channelKey, tagKey := range b.channel.m {
if !b.existTagFnc(tagKey) {
if removeChannels == nil {
removeChannels = make([]string, 0, 20)
}
removeChannels = append(removeChannels, channelKey)
}
}
if len(removeChannels) > 0 {
for _, removeChannelKey := range removeChannels {
delete(b.channel.m, removeChannelKey)
}
b.Info("checkExpireTags: remove channels", zap.Int("count", len(removeChannels)), zap.String("removeChannel", removeChannels[0]))
}
b.channel.Unlock()
}
func (b *tagBlucket) setTag(tag *types.Tag) {
@@ -109,12 +132,25 @@ func (b *tagBlucket) removeTag(tagKey string) {
delete(b.tag.m, tagKey)
}
func (b *tagBlucket) existTag(tagKey string) bool {
b.tag.RLock()
defer b.tag.RUnlock()
_, ok := b.tag.m[tagKey]
return ok
}
func (b *tagBlucket) setChannelTag(channelId string, channelType uint8, tagKey string) {
b.channel.Lock()
b.channel.m[wkutil.ChannelToKey(channelId, channelType)] = tagKey
b.channel.Unlock()
}
func (b *tagBlucket) removeChannelTag(channelId string, channelType uint8) {
b.channel.Lock()
delete(b.channel.m, wkutil.ChannelToKey(channelId, channelType))
b.channel.Unlock()
}
func (b *tagBlucket) getChannelTag(channelId string, channelType uint8) string {
b.channel.RLock()
defer b.channel.RUnlock()

View File

@@ -27,7 +27,7 @@ func NewTagManager(blucketCount int, nodeVersion func() uint64) *TagManager {
}
tg.bluckets = make([]*tagBlucket, blucketCount)
for i := 0; i < blucketCount; i++ {
tg.bluckets[i] = newTagBlucket(i, time.Minute*20)
tg.bluckets[i] = newTagBlucket(i, time.Minute*20, tg.existTag)
}
return tg
}
@@ -196,6 +196,11 @@ func (t *TagManager) GetChannelTag(fakeChannelId string, channelType uint8) stri
return blucket.getChannelTag(fakeChannelId, channelType)
}
func (t *TagManager) RemoveChannelTag(fakeChannelId string, channelType uint8) {
blucket := t.getBlucketByChannel(fakeChannelId, channelType)
blucket.removeChannelTag(fakeChannelId, channelType)
}
func (t *TagManager) getBlucketByTagKey(tagKey string) *tagBlucket {
h := fnv.New32a()
h.Write([]byte(tagKey))
@@ -307,3 +312,8 @@ func (t *TagManager) removeTag(tagKey string) {
blucket := t.getBlucketByTagKey(tagKey)
blucket.removeTag(tagKey)
}
func (t *TagManager) existTag(tagKey string) bool {
blucket := t.getBlucketByTagKey(tagKey)
return blucket.existTag(tagKey)
}

View File

@@ -28,4 +28,6 @@ type ITagManager interface {
SetChannelTag(fakeChannelId string, channelType uint8, tagKey string)
// GetChannelTag 获取频道对应的tag
GetChannelTag(fakeChannelId string, channelType uint8) string
// RemoveChannelTag 移除频道对应的tag
RemoveChannelTag(fakeChannelId string, channelType uint8)
}