mirror of
https://gitee.com/WuKongDev/WuKongIM.git
synced 2025-12-06 14:59:08 +08:00
feat: support agent channel
This commit is contained in:
@@ -224,6 +224,8 @@ func sendMessageToChannel(req messageSendReq, channelId string, channelType uint
|
||||
fakeChannelId := channelId
|
||||
if channelType == wkproto.ChannelTypePerson {
|
||||
fakeChannelId = options.GetFakeChannelIDWith(req.FromUID, channelId)
|
||||
} else if channelType == wkproto.ChannelTypeAgent {
|
||||
fakeChannelId = options.GetAgentChannelIDWith(req.FromUID, channelId)
|
||||
}
|
||||
|
||||
if req.Header.SyncOnce == 1 && !options.G.IsOnlineCmdChannel(channelId) && channelType != wkproto.ChannelTypeTemp { // 命令消息,将原频道转换为cmd频道
|
||||
|
||||
@@ -138,7 +138,7 @@ func (h *Handler) distributeByTag(slotLeaderId uint64, tag *types.Tag, channelId
|
||||
continue
|
||||
}
|
||||
isOnline, masterIsOnline := h.deviceOnlineStatus(uid)
|
||||
if !masterIsOnline {
|
||||
if !masterIsOnline && channelType != wkproto.ChannelTypeAgent { // agent不需要触发离线的webhook
|
||||
if offlineUids == nil {
|
||||
offlineUids = make([]string, 0, len(node.Uids))
|
||||
}
|
||||
@@ -300,8 +300,10 @@ func (h *Handler) makeChannelTag(fakeChannelId string, channelType uint8) (*type
|
||||
}
|
||||
u1, u2 := options.GetFromUIDAndToUIDWith(orgFakeChannelId)
|
||||
subscribers = append(subscribers, u1, u2)
|
||||
} else if channelType == wkproto.ChannelTypeAgent { // agent频道
|
||||
u1, agent := options.GetUidAndAgentUIDWith(fakeChannelId)
|
||||
subscribers = append(subscribers, u1, agent)
|
||||
} else {
|
||||
|
||||
// 如果是cmd频道需要去对应的源频道获取订阅者来制作tag
|
||||
if options.G.IsCmdChannel(fakeChannelId) {
|
||||
var err error
|
||||
|
||||
@@ -63,6 +63,11 @@ func (h *Handler) hasPermissionForChannel(channelId string, channelType uint8) (
|
||||
return wkproto.ReasonSuccess, nil
|
||||
}
|
||||
|
||||
// agent频道,直接通过
|
||||
if channelType == wkproto.ChannelTypeAgent {
|
||||
return wkproto.ReasonSuccess, nil
|
||||
}
|
||||
|
||||
// 查询频道基本信息
|
||||
channelInfo, err := service.Store.GetChannel(channelId, channelType)
|
||||
if err != nil {
|
||||
@@ -101,6 +106,13 @@ func (h *Handler) hasPermissionForSender(channelId string, channelType uint8, e
|
||||
return wkproto.ReasonSuccess, nil
|
||||
}
|
||||
|
||||
if channelType == wkproto.ChannelTypeAgent {
|
||||
uid, agentUID := options.GetUidAndAgentUIDWith(channelId)
|
||||
if fromUid == uid || fromUid == agentUID {
|
||||
return wkproto.ReasonSuccess, nil
|
||||
}
|
||||
}
|
||||
|
||||
// 系统发的消息直接通过
|
||||
if options.G.IsSystemDevice(e.Conn.DeviceId) {
|
||||
return wkproto.ReasonSuccess, nil
|
||||
|
||||
@@ -32,6 +32,20 @@ func GetFromUIDAndToUIDWith(channelId string) (string, string) {
|
||||
return "", ""
|
||||
}
|
||||
|
||||
// GetAgentChannelIDWith 获取Agent频道ID
|
||||
func GetAgentChannelIDWith(uid, agentUID string) string {
|
||||
return uid + "@" + agentUID
|
||||
}
|
||||
|
||||
// GetUidAndAgentUIDWith 获取用户ID和AgentID
|
||||
func GetUidAndAgentUIDWith(channelId string) (uid string, agentUID string) {
|
||||
channelIDs := strings.Split(channelId, "@")
|
||||
if len(channelIDs) == 2 {
|
||||
return channelIDs[0], channelIDs[1]
|
||||
}
|
||||
return "", ""
|
||||
}
|
||||
|
||||
// GetCommunityTopicParentChannelID 获取社区话题频道的父频道ID
|
||||
func GetCommunityTopicParentChannelID(channelID string) string {
|
||||
channelIDs := strings.Split(channelID, "@")
|
||||
|
||||
@@ -312,6 +312,12 @@ type Options struct {
|
||||
DisableJSONRPC bool // 是否禁用jsonrpc
|
||||
|
||||
DisableCMDMessageSync bool // 是否禁用命令消息同步,设置为true后,将不会同步离线的cmd消息,离线cmd消息接口都会返回空的成功
|
||||
|
||||
Agent struct {
|
||||
Webhook struct {
|
||||
HTTPAddr string // webhook的http地址 通过此地址通知数据给第三方 格式为 http://xxxxx
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type MigrateStep string
|
||||
@@ -997,6 +1003,9 @@ func (o *Options) ConfigureWithViper(vp *viper.Viper) {
|
||||
o.Plugin.Install = installPlugins
|
||||
}
|
||||
|
||||
// =================== agent ===================
|
||||
o.Agent.Webhook.HTTPAddr = o.getString("agent.webhook.httpAddr", o.Agent.Webhook.HTTPAddr)
|
||||
|
||||
// =================== other ===================
|
||||
deadlock.Opts.Disable = !o.DeadlockCheck
|
||||
// deadlock.Opts.Disable = false
|
||||
@@ -1344,6 +1353,10 @@ func (o *Options) WebhookGRPCOn() bool {
|
||||
return strings.TrimSpace(o.Webhook.GRPCAddr) != ""
|
||||
}
|
||||
|
||||
func (o *Options) AgentWebhookOn() bool {
|
||||
return strings.TrimSpace(o.Agent.Webhook.HTTPAddr) != ""
|
||||
}
|
||||
|
||||
// HasDatasource 是否有配置数据源
|
||||
func (o *Options) HasDatasource() bool {
|
||||
return strings.TrimSpace(o.Datasource.Addr) != ""
|
||||
@@ -1951,6 +1964,12 @@ func WithDbSlotShardNum(slotShardNum int) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func WithAgentWebhookHTTPAddr(httpAddr string) Option {
|
||||
return func(opts *Options) {
|
||||
opts.Agent.Webhook.HTTPAddr = httpAddr
|
||||
}
|
||||
}
|
||||
|
||||
func WithOpts(opt ...Option) Option {
|
||||
return func(opts *Options) {
|
||||
for _, o := range opt {
|
||||
|
||||
@@ -45,6 +45,8 @@ func (h *Handler) handleOnSend(event *eventbus.Event) {
|
||||
fakeChannelId := channelId
|
||||
if channelType == wkproto.ChannelTypePerson {
|
||||
fakeChannelId = options.GetFakeChannelIDWith(channelId, conn.Uid)
|
||||
} else if channelType == wkproto.ChannelTypeAgent {
|
||||
fakeChannelId = options.GetAgentChannelIDWith(conn.Uid, channelId)
|
||||
}
|
||||
|
||||
if options.G.Logger.TraceOn {
|
||||
|
||||
@@ -402,54 +402,150 @@ func (w *Webhook) pushMessages(messages []wkdb.Message, errMessageIDMap map[int6
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// 分组消息
|
||||
normalMessages, agentMessages := w.groupMessagesByType(messages)
|
||||
|
||||
// 处理普通消息
|
||||
normalRetryable := w.processBatchMessages(normalMessages, errMessageIDMap, "normal", w.sendNormalMessages)
|
||||
|
||||
// 处理Agent消息
|
||||
agentRetryable := w.processBatchMessages(agentMessages, errMessageIDMap, "agent", w.sendAgentMessages)
|
||||
|
||||
// 合并需要重试的消息
|
||||
retryableMessages = append(normalRetryable, agentRetryable...)
|
||||
|
||||
return len(retryableMessages) == 0, retryableMessages
|
||||
}
|
||||
|
||||
// groupMessagesByType 将消息按类型分组
|
||||
func (w *Webhook) groupMessagesByType(messages []wkdb.Message) (normalMessages, agentMessages []wkdb.Message) {
|
||||
if len(messages) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// 第一次遍历:统计准确数量
|
||||
var agentCount int
|
||||
agentWebhookOn := options.G.AgentWebhookOn() // 缓存条件判断结果
|
||||
|
||||
for i := range messages {
|
||||
if agentWebhookOn && messages[i].ChannelType == wkproto.ChannelTypeAgent {
|
||||
agentCount++
|
||||
}
|
||||
}
|
||||
|
||||
normalCount := len(messages) - agentCount
|
||||
|
||||
// 根据准确数量预分配数组
|
||||
if normalCount > 0 {
|
||||
normalMessages = make([]wkdb.Message, 0, normalCount)
|
||||
}
|
||||
if agentCount > 0 {
|
||||
agentMessages = make([]wkdb.Message, 0, agentCount)
|
||||
}
|
||||
|
||||
// 第二次遍历:填充数组
|
||||
for i := range messages {
|
||||
if agentWebhookOn && messages[i].ChannelType == wkproto.ChannelTypeAgent {
|
||||
agentMessages = append(agentMessages, messages[i])
|
||||
} else {
|
||||
normalMessages = append(normalMessages, messages[i])
|
||||
}
|
||||
}
|
||||
|
||||
return normalMessages, agentMessages
|
||||
}
|
||||
|
||||
// processBatchMessages 处理一批同类型的消息
|
||||
func (w *Webhook) processBatchMessages(messages []wkdb.Message, errMessageIDMap map[int64]int, messageType string, sendFunc func([]wkdb.Message) error) []wkdb.Message {
|
||||
if len(messages) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
err := sendFunc(messages)
|
||||
|
||||
if err != nil {
|
||||
w.Error("Failed to send webhook for a batch of messages",
|
||||
zap.Error(err),
|
||||
zap.String("type", messageType),
|
||||
zap.Int("message_count", len(messages)))
|
||||
|
||||
return w.handleSendFailure(messages, errMessageIDMap)
|
||||
}
|
||||
|
||||
// 发送成功,清理错误计数
|
||||
w.handleSendSuccess(messages, errMessageIDMap, messageType)
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendNormalMessages 发送普通消息
|
||||
func (w *Webhook) sendNormalMessages(messages []wkdb.Message) error {
|
||||
messageResps := w.convertToMessageResps(messages)
|
||||
messageData, err := w.marshalMessages(messageResps)
|
||||
if err != nil {
|
||||
w.Error("Failed to marshal normal messages for webhook", zap.Error(err), zap.Int("message_count", len(messages)))
|
||||
return err
|
||||
}
|
||||
|
||||
if options.G.WebhookGRPCOn() {
|
||||
return w.sendWebhookForGRPC(types.EventMsgNotify, messageData)
|
||||
}
|
||||
return w.sendWebhookForHttp(types.EventMsgNotify, messageData)
|
||||
}
|
||||
|
||||
// sendAgentMessages 发送Agent消息
|
||||
func (w *Webhook) sendAgentMessages(messages []wkdb.Message) error {
|
||||
messageResps := w.convertToMessageResps(messages)
|
||||
messageData, err := w.marshalMessages(messageResps)
|
||||
if err != nil {
|
||||
w.Error("Failed to marshal agent messages for webhook", zap.Error(err), zap.Int("message_count", len(messages)))
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: Replace with the correct Agent Webhook URL from options.
|
||||
// The correct configuration option could not be determined automatically.
|
||||
return w.sendAgentWebhookForHttp(types.EventMsgNotify, messageData)
|
||||
}
|
||||
|
||||
// convertToMessageResps 将wkdb.Message转换为MessageResp
|
||||
func (w *Webhook) convertToMessageResps(messages []wkdb.Message) []*types.MessageResp {
|
||||
messageResps := make([]*types.MessageResp, 0, len(messages))
|
||||
for _, msg := range messages {
|
||||
resp := &types.MessageResp{}
|
||||
resp.From(msg, options.G.SystemUID)
|
||||
messageResps = append(messageResps, resp)
|
||||
}
|
||||
return messageResps
|
||||
}
|
||||
|
||||
messageData, err := w.marshalMessages(messageResps)
|
||||
if err != nil {
|
||||
w.Error("Failed to marshal messages for webhook", zap.Error(err), zap.Int("message_count", len(messages)))
|
||||
return true, nil // 序列化错误,不重试此批次
|
||||
}
|
||||
// handleSendFailure 处理发送失败的情况
|
||||
func (w *Webhook) handleSendFailure(messages []wkdb.Message, errMessageIDMap map[int64]int) []wkdb.Message {
|
||||
retryableMessages := make([]wkdb.Message, 0)
|
||||
|
||||
var sendErr error
|
||||
if options.G.WebhookGRPCOn() {
|
||||
sendErr = w.sendWebhookForGRPC(types.EventMsgNotify, messageData)
|
||||
} else {
|
||||
sendErr = w.sendWebhookForHttp(types.EventMsgNotify, messageData)
|
||||
}
|
||||
for _, msg := range messages {
|
||||
errCount := errMessageIDMap[msg.MessageID]
|
||||
errCount++
|
||||
errMessageIDMap[msg.MessageID] = errCount
|
||||
|
||||
if sendErr != nil {
|
||||
w.Error("Failed to send webhook for a batch of messages", zap.Error(sendErr), zap.Int("message_count", len(messages)))
|
||||
|
||||
retryableMessages = make([]wkdb.Message, 0)
|
||||
processedAllInBatch := true // 假设一开始批次中的消息都处理完了(可能成功,可能达到最大重试)
|
||||
|
||||
for _, originalMsg := range messages {
|
||||
errCount := errMessageIDMap[originalMsg.MessageID]
|
||||
errCount++
|
||||
errMessageIDMap[originalMsg.MessageID] = errCount
|
||||
|
||||
if errCount >= options.G.Webhook.MsgNotifyEventRetryMaxCount {
|
||||
w.Warn("Message reached max retry count and will be dropped from error tracking map for this webhook cycle.", zap.Int64("messageID", originalMsg.MessageID))
|
||||
delete(errMessageIDMap, originalMsg.MessageID) // 从重试map中删除
|
||||
} else {
|
||||
retryableMessages = append(retryableMessages, originalMsg) // 加入到可重试列表
|
||||
processedAllInBatch = false // 只要有一个消息需要重试,就标记整个批次未完全处理
|
||||
}
|
||||
if errCount >= options.G.Webhook.MsgNotifyEventRetryMaxCount {
|
||||
w.Warn("Message reached max retry count and will be dropped",
|
||||
zap.Int64("messageID", msg.MessageID))
|
||||
delete(errMessageIDMap, msg.MessageID)
|
||||
} else {
|
||||
retryableMessages = append(retryableMessages, msg)
|
||||
}
|
||||
return processedAllInBatch, retryableMessages
|
||||
}
|
||||
|
||||
// 推送成功,清理 errMessageIDMap 中这些消息的错误计数
|
||||
for _, originalMsg := range messages {
|
||||
delete(errMessageIDMap, originalMsg.MessageID)
|
||||
return retryableMessages
|
||||
}
|
||||
|
||||
// handleSendSuccess 处理发送成功的情况
|
||||
func (w *Webhook) handleSendSuccess(messages []wkdb.Message, errMessageIDMap map[int64]int, messageType string) {
|
||||
for _, msg := range messages {
|
||||
delete(errMessageIDMap, msg.MessageID)
|
||||
}
|
||||
w.Info("Successfully pushed messages to webhook", zap.Int("count", len(messages)))
|
||||
return true, nil
|
||||
w.Info("Successfully pushed messages to webhook",
|
||||
zap.String("type", messageType),
|
||||
zap.Int("count", len(messages)))
|
||||
}
|
||||
|
||||
// marshalMessages 序列化消息(可以后续优化为使用对象池)
|
||||
@@ -533,6 +629,28 @@ func (w *Webhook) sendWebhookForHttp(event string, data []byte) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Webhook) sendAgentWebhookForHttp(event string, data []byte) error {
|
||||
// TODO: Replace options.G.Webhook.HTTPAddr with the correct Agent Webhook URL from options.
|
||||
// The correct configuration option could not be determined automatically.
|
||||
agentWebhookAddr := options.G.Agent.Webhook.HTTPAddr
|
||||
eventURL := fmt.Sprintf("%s?event=%s", agentWebhookAddr, event)
|
||||
startTime := time.Now().UnixNano() / 1000 / 1000
|
||||
w.Debug("agent webhook开始请求", zap.String("eventURL", eventURL))
|
||||
resp, err := w.httpClient.Post(eventURL, "application/json", bytes.NewBuffer(data))
|
||||
w.Debug("agent webhook请求结束 耗时", zap.Int64("mill", time.Now().UnixNano()/1000/1000-startTime))
|
||||
if err != nil {
|
||||
w.Warn("调用第三方agent消息通知失败!", zap.String("Webhook", agentWebhookAddr), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
w.Warn("第三方agent消息通知接口返回状态错误!", zap.Int("status", resp.StatusCode), zap.String("Webhook", agentWebhookAddr))
|
||||
return errors.New("第三方agent消息通知接口返回状态错误!")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Webhook) sendWebhookForGRPC(event string, data []byte) error {
|
||||
|
||||
startNow := time.Now()
|
||||
|
||||
Reference in New Issue
Block a user