mirror of
https://gitee.com/WuKongDev/WuKongIM.git
synced 2025-12-06 06:49:06 +08:00
fix: proxy proto格式错误会导致程序崩溃 #458
This commit is contained in:
@@ -15,18 +15,21 @@ import (
|
||||
"github.com/WuKongIM/WuKongIM/pkg/wkdb"
|
||||
"github.com/WuKongIM/WuKongIM/pkg/wklog"
|
||||
"github.com/WuKongIM/WuKongIM/pkg/wkutil"
|
||||
wkproto "github.com/WuKongIM/WuKongIMGoProto"
|
||||
"github.com/sendgrid/rest"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type request struct {
|
||||
wklog.Log
|
||||
s *Server
|
||||
}
|
||||
|
||||
func newRequset() *request {
|
||||
func newRequset(s *Server) *request {
|
||||
|
||||
return &request{
|
||||
Log: wklog.NewWKLog("request"),
|
||||
s: s,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -289,6 +292,39 @@ func (s *request) processSingleChannel(uid string, msgCount int, channel *channe
|
||||
}
|
||||
}
|
||||
|
||||
streamClientMsgNos := make([]string, 0, len(messageResps)) // 已存储的流消息ID
|
||||
|
||||
for _, message := range messageResps {
|
||||
setting := wkproto.Setting(message.Setting)
|
||||
if !setting.IsSet(wkproto.SettingStream) {
|
||||
continue
|
||||
}
|
||||
streamClientMsgNos = append(streamClientMsgNos, message.ClientMsgNo)
|
||||
}
|
||||
|
||||
if len(streamClientMsgNos) > 0 {
|
||||
streamV2s, err := s.loadStreamV2Messages(fakeChannelID, channel.ChannelType, streamClientMsgNos)
|
||||
if err != nil {
|
||||
s.Error("syncMessages: loadStreamV2Messages failed", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
for _, message := range messageResps {
|
||||
setting := wkproto.Setting(message.Setting)
|
||||
if !setting.IsSet(wkproto.SettingStream) {
|
||||
continue
|
||||
}
|
||||
for _, streamV2 := range streamV2s {
|
||||
if message.MessageId == streamV2.MessageId {
|
||||
|
||||
message.End = streamV2.End
|
||||
message.EndReason = streamV2.EndReason
|
||||
message.StreamData = streamV2.Payload
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &channelRecentMessage{
|
||||
ChannelId: channel.ChannelId,
|
||||
ChannelType: channel.ChannelType,
|
||||
@@ -311,3 +347,16 @@ func handlerIMError(resp *rest.Response) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (s *request) loadStreamV2Messages(channelId string, channelType uint8, clientMsgNos []string) ([]*wkdb.StreamV2, error) {
|
||||
slotLeaderId, err := service.Cluster.SlotLeaderIdOfChannel(channelId, channelType)
|
||||
if err != nil {
|
||||
s.Error("loadStreamMessages: get leader id failed", zap.Error(err), zap.String("channelId", channelId), zap.Uint8("channelType", channelType))
|
||||
return nil, err
|
||||
}
|
||||
// 如果是本节点,则直接加载
|
||||
if options.G.IsLocalNode(slotLeaderId) {
|
||||
return service.CommonService.GetStreamsForLocal(clientMsgNos)
|
||||
}
|
||||
// 请求远程节点
|
||||
return s.s.client.RequestStreamsV2(slotLeaderId, clientMsgNos)
|
||||
}
|
||||
|
||||
@@ -24,11 +24,11 @@ type Server struct {
|
||||
func New() *Server {
|
||||
s := &Server{
|
||||
Log: wklog.NewWKLog("ApiServer"),
|
||||
requset: newRequset(),
|
||||
timingWheel: timingwheel.NewTimingWheel(options.G.TimingWheelTick, options.G.TimingWheelSize),
|
||||
uptime: time.Now(),
|
||||
client: ingress.NewClient(),
|
||||
}
|
||||
s.requset = newRequset(s)
|
||||
s.apiServer = newApiServer(s)
|
||||
s.migrateTask = NewMigrateTask(s) // 迁移任务
|
||||
s.managerServer = newManagerServer(s)
|
||||
|
||||
@@ -128,7 +128,9 @@ func parseProxyProto(buff []byte) (remoteAddr net.Addr, size int, err error) {
|
||||
if bytes.Equal(signature[:5], SIGV1) {
|
||||
return parseProxyProtoV1(buff)
|
||||
}
|
||||
|
||||
if dataLen <= 12 {
|
||||
return nil, 0, ErrNoProxyProtocol
|
||||
}
|
||||
signature = buff[:12]
|
||||
if bytes.Equal(signature[:12], SIGV2) {
|
||||
fmt.Println("proxyproto: proxy protocol v2")
|
||||
|
||||
Reference in New Issue
Block a user