feat: add message flow log

This commit is contained in:
tt
2025-05-03 17:49:04 +08:00
parent 529b0fd1bb
commit 8f0742cc2e
10 changed files with 159 additions and 62 deletions

View File

@@ -152,6 +152,7 @@ func (h *Handler) distributeByTag(leaderId uint64, tag *types.Tag, channelId str
cloneMsg.ChannelType = channelType
cloneMsg.Type = eventbus.EventPushOnline
pubshEvents = append(pubshEvents, cloneMsg)
}
}
}

View File

@@ -69,6 +69,22 @@ func (h *Handler) persist(ctx *eventbus.ChannelContext) {
break
}
}
if options.G.Logger.TraceOn {
msgTip := "消息保存成功..."
if reasonCode != wkproto.ReasonSuccess {
msgTip = "消息保存失败..."
}
h.Trace(msgTip,
"persist",
zap.Int64("messageId", event.MessageId),
zap.Uint64("messageSeq", event.MessageSeq),
zap.String("from", event.Conn.Uid),
zap.String("channelId", ctx.ChannelId),
zap.Uint8("channelType", ctx.ChannelType),
zap.String("resson", reasonCode.String()),
)
}
}
}

View File

@@ -3,33 +3,31 @@ package handler
import (
"github.com/WuKongIM/WuKongIM/internal/eventbus"
"github.com/WuKongIM/WuKongIM/internal/options"
"github.com/WuKongIM/WuKongIM/internal/track"
wkproto "github.com/WuKongIM/WuKongIMGoProto"
"go.uber.org/zap"
)
func (h *Handler) sendack(ctx *eventbus.ChannelContext) {
events := ctx.Events
for _, e := range events {
sendPacket := e.Frame.(*wkproto.SendPacket)
e.Track.Record(track.PositionChannelSendack)
if options.G.Logger.TraceOn {
h.Trace(e.Track.String(),
"sendack",
zap.Int64("messageId", e.MessageId),
zap.Uint64("messageSeq", e.MessageSeq),
zap.Uint64("clientSeq", sendPacket.ClientSeq),
zap.String("clientMsgNo", sendPacket.ClientMsgNo),
zap.String("channelId", ctx.ChannelId),
zap.Uint8("channelType", ctx.ChannelType),
zap.String("reasonCode", e.ReasonCode.String()),
zap.String("conn.uid", e.Conn.Uid),
zap.String("conn.deviceId", e.Conn.DeviceId),
zap.Uint64("conn.fromNode", e.Conn.NodeId),
zap.Int64("conn.connId", e.Conn.ConnId),
)
}
}
// for _, e := range events {
// sendPacket := e.Frame.(*wkproto.SendPacket)
// e.Track.Record(track.PositionChannelSendack)
// if options.G.Logger.TraceOn {
// h.Trace(e.Track.String(),
// "sendack",
// zap.Int64("messageId", e.MessageId),
// zap.Uint64("messageSeq", e.MessageSeq),
// zap.Uint64("clientSeq", sendPacket.ClientSeq),
// zap.String("clientMsgNo", sendPacket.ClientMsgNo),
// zap.String("channelId", ctx.ChannelId),
// zap.Uint8("channelType", ctx.ChannelType),
// zap.String("reasonCode", e.ReasonCode.String()),
// zap.String("conn.uid", e.Conn.Uid),
// zap.String("conn.deviceId", e.Conn.DeviceId),
// zap.Uint64("conn.fromNode", e.Conn.NodeId),
// zap.Int64("conn.connId", e.Conn.ConnId),
// )
// }
// }
var uidMap = make(map[string]struct{}, len(events))
for _, e := range events {

View File

@@ -9,6 +9,7 @@ import (
"github.com/WuKongIM/WuKongIM/internal/options"
"github.com/WuKongIM/WuKongIM/internal/types"
"github.com/WuKongIM/WuKongIM/pkg/wklog"
wkproto "github.com/WuKongIM/WuKongIMGoProto"
"go.uber.org/zap"
)
@@ -76,16 +77,33 @@ func (r *RetryManager) retry(msg *types.RetryMessage) {
msg.Retry++
if msg.Retry > options.G.MessageRetry.MaxCount {
r.Warn("exceeded the maximum number of retries", zap.String("uid", msg.Uid), zap.String("channelId", msg.ChannelId), zap.Uint8("channelType", msg.ChannelType), zap.Int64("messageId", msg.MessageId), zap.Int("messageMaxRetryCount", options.G.MessageRetry.MaxCount))
if options.G.Logger.TraceOn && msg.ChannelType == wkproto.ChannelTypePerson {
r.Trace("超过最大重试次数,暂停重试", "retry", zap.String("uid", msg.Uid), zap.Int64("messageId", msg.MessageId), zap.Int64("connId", msg.ConnId), zap.Int("retryCount", msg.Retry), zap.Int("maxRetryCount", options.G.MessageRetry.MaxCount))
}
return
}
conn := eventbus.User.ConnById(msg.Uid, msg.FromNode, msg.ConnId)
if conn == nil {
r.Debug("conn offline", zap.String("uid", msg.Uid), zap.Int64("messageId", msg.MessageId), zap.Int64("connId", msg.ConnId))
if options.G.Logger.TraceOn && msg.ChannelType == wkproto.ChannelTypePerson {
r.Trace("连接已经离线,暂停重试", "retry", zap.String("uid", msg.Uid), zap.Int64("messageId", msg.MessageId), zap.Int64("connId", msg.ConnId))
conns := eventbus.User.ConnsByUid(msg.Uid)
if len(conns) > 0 {
for _, conn := range conns {
r.Trace("用户名下有其他连接(仅提醒)", "retry", zap.String("uid", msg.Uid), zap.Int64("messageId", msg.MessageId), zap.Int64("connId", conn.ConnId), zap.String("deviceId", conn.DeviceId), zap.String("deviceFlag", conn.DeviceFlag.String()))
}
}
}
return
}
// 添加到重试队列
r.AddRetry(msg)
if options.G.Logger.TraceOn && msg.ChannelType == wkproto.ChannelTypePerson {
r.Trace("重试消息", "retry", zap.String("uid", msg.Uid), zap.Int64("messageId", msg.MessageId), zap.Int64("connId", msg.ConnId), zap.String("deviceId", conn.DeviceId), zap.String("deviceFlag", conn.DeviceFlag.String()), zap.Int("retryCount", msg.Retry), zap.Int("maxRetryCount", options.G.MessageRetry.MaxCount))
}
// 发送消息
// 在需要打印日志的地方添加概率控制
if rand.Float64() < 0.1 { // 10%的概率

View File

@@ -56,6 +56,24 @@ func (h *Handler) processChannelPush(events []*eventbus.Event) {
continue
}
if options.G.Logger.TraceOn && e.ChannelType == wkproto.ChannelTypePerson { // 暂时只打印个人频道的推送,因为群的话日志会太多
h.Trace("推送在线消息...",
"pushOnline",
zap.Int64("messageId", e.MessageId),
zap.Uint64("messageSeq", e.MessageSeq),
zap.String("fromUid", e.Conn.Uid),
zap.String("fromDeviceId", e.Conn.DeviceId),
zap.String("fromDeviceFlag", e.Conn.DeviceFlag.String()),
zap.Int64("fromConnId", e.Conn.ConnId),
zap.String("toUid", toConn.Uid),
zap.String("toDeviceId", toConn.DeviceId),
zap.String("toDeviceFlag", toConn.DeviceFlag.String()),
zap.Int64("toConnId", toConn.ConnId),
zap.String("channelId", e.ChannelId),
zap.Uint8("channelType", e.ChannelType),
)
}
recvPacket := &wkproto.RecvPacket{}
recvPacket.Framer = wkproto.Framer{
@@ -153,29 +171,29 @@ func (h *Handler) processChannelPush(events []*eventbus.Event) {
eventbus.User.Advance(e.ToUid)
}
if options.G.Logger.TraceOn {
if len(events) < options.G.Logger.TraceMaxMsgCount { // 消息数小于指定数量才打印,要不然日志太多了
for _, e := range events {
sendPacket := e.Frame.(*wkproto.SendPacket)
// 记录消息轨迹
e.Track.Record(track.PositionPushOnlineEnd)
connCount := eventbus.User.ConnCountByUid(e.ToUid)
h.Trace(e.Track.String(),
"pushOnline",
zap.Int64("messageId", e.MessageId),
zap.Uint64("messageSeq", e.MessageSeq),
zap.Uint64("clientSeq", sendPacket.ClientSeq),
zap.String("clientMsgNo", sendPacket.ClientMsgNo),
zap.String("toUid", e.ToUid),
zap.Int("toConnCount", connCount),
zap.String("conn.uid", e.Conn.Uid),
zap.String("conn.deviceId", e.Conn.DeviceId),
zap.Uint64("conn.nodeId", e.Conn.NodeId),
zap.Int64("conn.connId", e.Conn.ConnId),
)
}
}
}
// if options.G.Logger.TraceOn {
// if len(events) < options.G.Logger.TraceMaxMsgCount { // 消息数小于指定数量才打印,要不然日志太多了
// for _, e := range events {
// sendPacket := e.Frame.(*wkproto.SendPacket)
// // 记录消息轨迹
// e.Track.Record(track.PositionPushOnlineEnd)
// connCount := eventbus.User.ConnCountByUid(e.ToUid)
// h.Trace(e.Track.String(),
// "pushOnline",
// zap.Int64("messageId", e.MessageId),
// zap.Uint64("messageSeq", e.MessageSeq),
// zap.Uint64("clientSeq", sendPacket.ClientSeq),
// zap.String("clientMsgNo", sendPacket.ClientMsgNo),
// zap.String("toUid", e.ToUid),
// zap.Int("toConnCount", connCount),
// zap.String("conn.uid", e.Conn.Uid),
// zap.String("conn.deviceId", e.Conn.DeviceId),
// zap.Uint64("conn.nodeId", e.Conn.NodeId),
// zap.Int64("conn.connId", e.Conn.ConnId),
// )
// }
// }
// }
}

View File

@@ -47,6 +47,20 @@ func (h *Handler) handleOnSend(event *eventbus.Event) {
fakeChannelId = options.GetFakeChannelIDWith(channelId, conn.Uid)
}
if options.G.Logger.TraceOn {
h.Trace("用户发送消息...",
"onSend",
zap.Int64("messageId", event.MessageId),
zap.Uint64("messageSeq", event.MessageSeq),
zap.String("from", event.Conn.Uid),
zap.String("deviceId", event.Conn.DeviceId),
zap.String("deviceFlag", event.Conn.DeviceFlag.String()),
zap.Int64("connId", event.Conn.ConnId),
zap.String("channelId", fakeChannelId),
zap.Uint8("channelType", channelType),
)
}
// 根据配置决定是否解密消息
if !options.G.DisableEncryption && !conn.IsJsonRpc {
newPayload, err := h.decryptPayload(sendPacket, conn)

View File

@@ -2,8 +2,10 @@ package handler
import (
"github.com/WuKongIM/WuKongIM/internal/eventbus"
"github.com/WuKongIM/WuKongIM/internal/options"
"github.com/WuKongIM/WuKongIM/internal/service"
"github.com/WuKongIM/WuKongIM/internal/track"
"github.com/WuKongIM/WuKongIM/internal/types"
"github.com/WuKongIM/WuKongIM/pkg/trace"
"github.com/WuKongIM/WuKongIM/pkg/wkdb"
wkproto "github.com/WuKongIM/WuKongIMGoProto"
@@ -24,9 +26,11 @@ func (h *Handler) recvack(event *eventbus.Event) {
isCmd := recvackPacket.SyncOnce // 是命令消息
isMaster := conn.DeviceLevel == wkproto.DeviceLevelMaster // 是master设备只有master设备才能擦除指令消息
var currMsg *types.RetryMessage
if persist {
currMsg = service.RetryManager.RetryMessage(conn.NodeId, conn.ConnId, recvackPacket.MessageID)
}
if isCmd && persist && isMaster {
currMsg := service.RetryManager.RetryMessage(conn.NodeId, conn.ConnId, recvackPacket.MessageID)
if currMsg != nil {
// 更新最近会话的已读位置
err := service.Store.UpdateConversationIfSeqGreaterAsync(conn.Uid, currMsg.ChannelId, currMsg.ChannelType, uint64(recvackPacket.MessageSeq))
@@ -41,6 +45,36 @@ func (h *Handler) recvack(event *eventbus.Event) {
if err != nil {
h.Warn("removeRetry error", zap.Error(err), zap.String("uid", conn.Uid), zap.String("deviceId", conn.DeviceId), zap.Int64("connId", conn.ConnId), zap.Uint64("nodeId", conn.NodeId), zap.Int64("messageID", recvackPacket.MessageID))
}
if options.G.Logger.TraceOn {
if currMsg != nil {
if currMsg.ChannelType == wkproto.ChannelTypePerson { // 只打印个人的 理论上currMsg应该一直有值。
h.Trace("收到消息回执...",
"recvack",
zap.Int64("messageId", recvackPacket.MessageID),
zap.Uint64("messageSeq", uint64(recvackPacket.MessageSeq)),
zap.String("uid", conn.Uid),
zap.String("deviceId", conn.DeviceId),
zap.String("deviceFlag", conn.DeviceFlag.String()),
zap.Int64("connId", conn.ConnId),
zap.String("channelId", currMsg.ChannelId),
zap.Uint8("channelType", currMsg.ChannelType),
)
}
} else {
h.Trace("收到消息回执",
"recvack",
zap.Int64("messageId", recvackPacket.MessageID),
zap.Uint64("messageSeq", uint64(recvackPacket.MessageSeq)),
zap.String("uid", conn.Uid),
zap.String("deviceId", conn.DeviceId),
zap.String("deviceFlag", conn.DeviceFlag.String()),
zap.Int64("connId", conn.ConnId),
)
}
}
}
}

View File

@@ -126,7 +126,7 @@ func determineMessageType(probe *Probe) (msgType int, version string, err error)
// Determine type PRELIMINARILY
// Note: We refine this based on validation checks later
prelimIsNotification := methodIsPresent && (!idIsPresent || idIsNull)
prelimIsResponse := idIsPresent && !idIsNull && !methodIsPresent && (resultIsPresent || errorIsPresent)
prelimIsResponse := (idIsPresent && !idIsNull && !methodIsPresent && (resultIsPresent || errorIsPresent))
prelimIsRequest := methodIsPresent && idIsPresent && !idIsNull
// Validate field combinations
@@ -156,7 +156,7 @@ func determineMessageType(probe *Probe) (msgType int, version string, err error)
// Valid notification: method, no id or null id
// Check if method is a known notification type (optional, depending on strictness)
switch probe.Method {
case MethodRecv, MethodDisconnect, MethodPong:
case MethodRecv, MethodDisconnect:
msgType = msgTypeNotification
default:
// If method is present but ID is missing/null, AND method is not known,
@@ -359,10 +359,6 @@ func Decode(decoder *json.Decoder) (interface{}, Probe, error) {
return nil, probe, fmt.Errorf("%w: %s params: %w", ErrUnmarshalFieldFailed, MethodDisconnect, err)
}
return notif, probe, nil
case MethodPong:
var notif PongNotification
notif.BaseNotification = baseNotif
return notif, probe, nil
default:
return nil, probe, fmt.Errorf("%w: %s", ErrUnknownMethod, probe.Method)
}
@@ -436,10 +432,10 @@ func FromFrame(reqId string, frame wkproto.Frame) (interface{}, error) {
Params: params,
}, nil
case wkproto.PONG:
return PongNotification{
BaseNotification: BaseNotification{
return PongResponse{
BaseResponse: BaseResponse{
Jsonrpc: jsonRPCVersion,
Method: MethodPong,
ID: reqId,
},
}, nil
}

View File

@@ -219,14 +219,15 @@ func TestEncodeDecode_PingPong(t *testing.T) {
assert.Nil(t, decodedPingReq.Params)
// --- Test Pong Response ---
pongResp := PongNotification{
BaseNotification: BaseNotification{Jsonrpc: "2.0", Method: MethodPong},
pongResp := PongResponse{
BaseResponse: BaseResponse{Jsonrpc: "2.0", ID: "req-pong-1"},
Result: json.RawMessage(`{}`),
}
pongRespBytes := testEncode(t, pongResp)
decodedPongMsg := testDecode(t, pongRespBytes)
pongNotif := assertDecodedAs[PongNotification](t, decodedPongMsg)
decodedPongResp := assertDecodedAs[GenericResponse](t, decodedPongMsg)
assert.Equal(t, "pong", pongNotif.Method)
assert.Equal(t, "req-pong-1", decodedPongResp.ID)
}
func TestDecode_EdgeCases(t *testing.T) { // Renamed for clarity

View File

@@ -254,8 +254,9 @@ type SubscriptionResponse struct {
Error *ErrorObject `json:"error,omitempty"`
}
type PongNotification struct {
BaseNotification
type PongResponse struct {
BaseResponse
Result json.RawMessage `json:"result,omitempty"`
}
type RecvAckResponse struct {