From 1be1f974e2a98b7245aaeb310e9fa35117abe395 Mon Sep 17 00:00:00 2001 From: samwaf Date: Fri, 19 Sep 2025 11:26:00 +0800 Subject: [PATCH] feat:improve log attacking #451 --- api/waf_login.go | 4 +- cache/waf_cache.go | 2 - common/flow/anomaly_handlers.go | 339 +++++++++++++ common/flow/mean_std_detector.go | 435 +++++++++++++++++ common/flow/mean_std_detector_test.go | 658 ++++++++++++++++++++++++++ global/config.go | 26 +- global/global.go | 2 +- model/hosts.go | 34 ++ model/request/waf_host_req.go | 2 + service/waf_service/waf_host.go | 2 + wafenginecore/check_captcha.go | 2 +- wafenginecore/error_process.go | 4 +- wafenginecore/proxy.go | 98 +++- wafenginecore/wafcaptcha/service.go | 6 +- wafenginecore/wafengine.go | 33 +- wafinterface/wafnotify.go | 4 +- wafnotify/kafka/kafka.go | 6 +- wafnotify/wafnotify.go | 4 +- wafqueue/log_queue.go | 18 +- waftask/stat_collector.go | 304 ++++++++++++ waftask/task_config.go | 6 +- waftask/task_counter.go | 271 +---------- 22 files changed, 1944 insertions(+), 316 deletions(-) create mode 100644 common/flow/anomaly_handlers.go create mode 100644 common/flow/mean_std_detector.go create mode 100644 common/flow/mean_std_detector_test.go create mode 100644 waftask/stat_collector.go diff --git a/api/waf_login.go b/api/waf_login.go index 92608a3..78fe7d7 100644 --- a/api/waf_login.go +++ b/api/waf_login.go @@ -79,7 +79,7 @@ func (w *WafLoginApi) LoginApi(c *gin.Context) { OpType: "登录信息", OpContent: loginError, } - global.GQEQUE_LOG_DB.Enqueue(wafSysLog) + global.GQEQUE_LOG_DB.Enqueue(&wafSysLog) global.GQEQUE_MESSAGE_DB.Enqueue(innerbean.OperatorMessageInfo{ BaseMessageInfo: innerbean.BaseMessageInfo{OperaType: "登录错误"}, @@ -144,7 +144,7 @@ func (w *WafLoginApi) LoginApi(c *gin.Context) { OpType: "登录信息", OpContent: noticeStr, } - global.GQEQUE_LOG_DB.Enqueue(wafSysLog) + global.GQEQUE_LOG_DB.Enqueue(&wafSysLog) response.OkWithDetailed(response2.LoginRep{ AccessToken: tokenInfo.AccessToken, diff --git a/cache/waf_cache.go b/cache/waf_cache.go index 0712255..ae93a8d 100644 --- a/cache/waf_cache.go +++ b/cache/waf_cache.go @@ -3,7 +3,6 @@ package cache import ( "SamWaf/common/zlog" "errors" - "fmt" "strings" "sync" "time" @@ -33,7 +32,6 @@ func (wafCache *WafCache) Set(key string, value interface{}) { } func (wafCache *WafCache) SetWithTTl(key string, value interface{}, ttl time.Duration) { - fmt.Println(ttl) wafCache.mu.Lock() defer wafCache.mu.Unlock() createTime := time.Now() diff --git a/common/flow/anomaly_handlers.go b/common/flow/anomaly_handlers.go new file mode 100644 index 0000000..b4078a2 --- /dev/null +++ b/common/flow/anomaly_handlers.go @@ -0,0 +1,339 @@ +package flow + +import ( + "SamWaf/common/zlog" + "fmt" + "strings" + "time" +) + +// LogAnomalyHandler 日志记录处理器 - 使用zlog +func LogAnomalyHandler(result *DetectionResult) { + timestamp := time.Now().Format("2006-01-02 15:04:05") + + // 使用zlog记录日志 + zlog.Info("流量异常检测告警", map[string]interface{}{ + "timestamp": timestamp, + "current_value": result.CurrentValue, + "mean": result.Mean, + "deviation": result.Deviation, + "threshold": result.Threshold, + "confidence": result.Confidence, + "is_anomaly": result.IsAnomaly, + "window_size": result.WindowSize, + "detail": result.String(), + }) +} + +// AlertAnomalyHandler 告警处理器 - 使用zlog +func AlertAnomalyHandler(result *DetectionResult) { + // 使用zlog记录告警信息 + zlog.Warn("流量异常告警", map[string]interface{}{ + "alert_type": "traffic_anomaly", + "current_value": result.CurrentValue, + "mean": result.Mean, + "deviation": result.Deviation, + "confidence": result.Confidence, + "severity": getSeverityLevel(result), + }) + + fmt.Printf("🚨 异常告警: %s\n", result.String()) + + // 示例:发送到告警通道 + alertMsg := fmt.Sprintf("检测到流量异常!当前值: %.2f, 均值: %.2f, 偏离度: %.2f, 置信度: %s", + result.CurrentValue, result.Mean, result.Deviation, result.Confidence) + + // 这里可以调用实际的告警接口 + sendAlert(alertMsg) +} + +// BlockAnomalyHandler 阻断处理器 - 使用zlog +func BlockAnomalyHandler(result *DetectionResult) { + if result.Deviation > result.Threshold*2 { // 严重异常才阻断 + // 使用zlog记录阻断操作 + zlog.Error("严重异常流量阻断", map[string]interface{}{ + "action": "block_traffic", + "current_value": result.CurrentValue, + "deviation": result.Deviation, + "threshold": result.Threshold, + "severity": "critical", + "reason": "deviation_exceeds_2x_threshold", + }) + + fmt.Printf("🛑 严重异常,执行阻断: %s\n", result.String()) + // 这里可以调用防火墙或限流接口 + blockTraffic(result) + } +} + +// LimitAnomalyHandler 限流处理器 - 使用zlog +func LimitAnomalyHandler(result *DetectionResult) { + if result.IsAnomaly { + // 使用zlog记录限流操作 + zlog.Warn("异常流量限流", map[string]interface{}{ + "action": "limit_traffic", + "current_value": result.CurrentValue, + "mean": result.Mean, + "deviation": result.Deviation, + "severity": getSeverityLevel(result), + }) + + fmt.Printf("⚠️ 异常流量,启动限流: %s\n", result.String()) + // 这里可以调用限流接口 + limitTraffic(result) + } +} + +// CustomAnomalyHandler 自定义处理器示例 - 使用zlog +func CustomAnomalyHandler(result *DetectionResult) { + // 根据异常程度执行不同处理 + switch { + case result.Deviation > result.Threshold*3: + // 极严重异常:立即阻断+告警 + zlog.Error("极严重流量异常", map[string]interface{}{ + "action": "immediate_block_and_alert", + "current_value": result.CurrentValue, + "deviation_ratio": result.Deviation / result.Threshold, + "severity": "critical", + "auto_action": true, + }) + + fmt.Printf("🔥 极严重异常,立即处理: %s\n", result.String()) + blockTraffic(result) + sendAlert(fmt.Sprintf("极严重流量异常: %.2f", result.CurrentValue)) + + case result.Deviation > result.Threshold*2: + // 严重异常:限流+告警 + zlog.Warn("严重流量异常", map[string]interface{}{ + "action": "limit_and_alert", + "current_value": result.CurrentValue, + "deviation_ratio": result.Deviation / result.Threshold, + "severity": "high", + }) + + fmt.Printf("⚡ 严重异常,限流处理: %s\n", result.String()) + limitTraffic(result) + sendAlert(fmt.Sprintf("严重流量异常: %.2f", result.CurrentValue)) + + default: + // 一般异常:仅记录 + zlog.Info("一般流量异常", map[string]interface{}{ + "action": "log_only", + "current_value": result.CurrentValue, + "deviation_ratio": result.Deviation / result.Threshold, + "severity": "medium", + }) + + fmt.Printf("📝 一般异常,记录日志: %s\n", result.String()) + LogAnomalyHandler(result) + } +} + +// getSeverityLevel 获取异常严重程度 +func getSeverityLevel(result *DetectionResult) string { + if result.Threshold == 0 { + return "unknown" + } + + ratio := result.Deviation / result.Threshold + switch { + case ratio > 3: + return "critical" + case ratio > 2: + return "high" + case ratio > 1: + return "medium" + default: + return "low" + } +} + +// 辅助函数(需要根据实际系统实现) +func sendAlert(message string) { + // 实现告警发送逻辑 + zlog.Info("发送告警", map[string]interface{}{ + "alert_message": message, + "alert_type": "traffic_anomaly", + }) +} + +func blockTraffic(result *DetectionResult) { + // 实现流量阻断逻辑 + zlog.Error("执行流量阻断", map[string]interface{}{ + "blocked_value": result.CurrentValue, + "reason": "anomaly_detection", + "action_taken": "traffic_blocked", + }) +} + +func limitTraffic(result *DetectionResult) { + // 实现流量限制逻辑 + zlog.Warn("执行流量限制", map[string]interface{}{ + "limited_value": result.CurrentValue, + "reason": "anomaly_detection", + "action_taken": "traffic_limited", + }) +} + +// RecoveryLogHandler 恢复日志处理器 +func RecoveryLogHandler(result *DetectionResult) { + timestamp := time.Now().Format("2006-01-02 15:04:05") + + // 使用zlog记录恢复日志 + zlog.Info("流量异常恢复", map[string]interface{}{ + "timestamp": timestamp, + "current_value": result.CurrentValue, + "mean": result.Mean, + "recovery_count": result.RecoveryCount, + "status": "recovered", + "detail": fmt.Sprintf("系统已恢复正常,当前值: %.2f, 均值: %.2f", result.CurrentValue, result.Mean), + }) + + fmt.Printf("✅ 系统恢复正常: 当前值=%.2f, 均值=%.2f\n", result.CurrentValue, result.Mean) +} + +// RecoveryUnblockHandler 恢复解除阻断处理器 +func RecoveryUnblockHandler(result *DetectionResult) { + // 使用zlog记录解除阻断 + zlog.Info("解除流量阻断", map[string]interface{}{ + "action": "unblock_traffic", + "current_value": result.CurrentValue, + "mean": result.Mean, + "status": "unblocked", + "reason": "traffic_recovered", + }) + + fmt.Printf("🔓 解除流量阻断: 当前值=%.2f\n", result.CurrentValue) + + // 这里调用实际的解除阻断接口 + unblockTraffic(result) +} + +// RecoveryUnlimitHandler 恢复解除限流处理器 +func RecoveryUnlimitHandler(result *DetectionResult) { + // 使用zlog记录解除限流 + zlog.Info("解除流量限制", map[string]interface{}{ + "action": "unlimit_traffic", + "current_value": result.CurrentValue, + "mean": result.Mean, + "status": "unlimited", + "reason": "traffic_recovered", + }) + + fmt.Printf("🔄 解除流量限制: 当前值=%.2f\n", result.CurrentValue) + + // 这里调用实际的解除限流接口 + unlimitTraffic(result) +} + +// RecoveryAlertHandler 恢复告警处理器 +func RecoveryAlertHandler(result *DetectionResult) { + // 使用zlog记录恢复告警 + zlog.Info("流量恢复告警", map[string]interface{}{ + "alert_type": "traffic_recovery", + "current_value": result.CurrentValue, + "mean": result.Mean, + "status": "recovered", + }) + + fmt.Printf("📢 恢复告警: 流量已恢复正常\n") + + // 发送恢复通知 + sendRecoveryAlert(fmt.Sprintf("流量已恢复正常,当前值: %.2f", result.CurrentValue)) +} + +// SustainedHighAnomalyHandler 持续高位专用处理器 +func SustainedHighAnomalyHandler(result *DetectionResult) { + if strings.Contains(result.Confidence, "持续高位") { + // 使用zlog记录持续高位告警 + zlog.Error("系统持续高位运行告警", map[string]interface{}{ + "alert_type": "sustained_high_operation", + "current_value": result.CurrentValue, + "mean": result.Mean, + "deviation": result.Deviation, + "confidence": result.Confidence, + "severity": "critical", + "recommended_action": "review_system_capacity_and_scaling", + "impact": "potential_performance_degradation", + }) + + fmt.Printf("🔥 系统持续高位运行告警: %s\n", result.String()) + + // 发送特殊告警 + sendSustainedHighAlert(result) + } +} + +// DualWindowAnomalyHandler 双窗口专用异常处理器 +func DualWindowAnomalyHandler(result *DetectionResult) { + switch { + case strings.Contains(result.Confidence, "持续高位"): + // 持续高位运行处理 + zlog.Warn("持续高位运行检测", map[string]interface{}{ + "detection_type": "sustained_high", + "current_value": result.CurrentValue, + "deviation": result.Deviation, + "confidence": result.Confidence, + "action": "capacity_review_needed", + }) + fmt.Printf("📈 持续高位: %s\n", result.String()) + + case result.Deviation > result.Threshold*3: + // 极严重异常 + zlog.Error("极严重流量异常", map[string]interface{}{ + "detection_type": "critical_anomaly", + "current_value": result.CurrentValue, + "deviation_ratio": result.Deviation / result.Threshold, + "action": "immediate_intervention", + }) + fmt.Printf("🚨 极严重异常: %s\n", result.String()) + + default: + // 一般异常 + zlog.Info("一般流量异常", map[string]interface{}{ + "detection_type": "normal_anomaly", + "current_value": result.CurrentValue, + "confidence": result.Confidence, + }) + fmt.Printf("⚠️ 一般异常: %s\n", result.String()) + } +} + +// 辅助函数 +func sendSustainedHighAlert(result *DetectionResult) { + alertMsg := fmt.Sprintf("系统持续高位运行告警!当前值: %.2f, 建议检查系统容量和扩容策略", result.CurrentValue) + + zlog.Warn("发送持续高位告警", map[string]interface{}{ + "alert_message": alertMsg, + "alert_type": "sustained_high_operation", + "priority": "high", + }) + + // 这里可以调用实际的告警接口 + // 例如:发送邮件、短信、钉钉通知等 +} +func unblockTraffic(result *DetectionResult) { + // 实现解除流量阻断逻辑 + zlog.Info("执行解除流量阻断", map[string]interface{}{ + "unblocked_value": result.CurrentValue, + "reason": "traffic_recovered", + "action_taken": "traffic_unblocked", + }) +} + +func unlimitTraffic(result *DetectionResult) { + // 实现解除流量限制逻辑 + zlog.Info("执行解除流量限制", map[string]interface{}{ + "unlimited_value": result.CurrentValue, + "reason": "traffic_recovered", + "action_taken": "traffic_unlimited", + }) +} + +func sendRecoveryAlert(message string) { + // 实现恢复告警发送逻辑 + zlog.Info("发送恢复告警", map[string]interface{}{ + "alert_message": message, + "alert_type": "traffic_recovery", + }) +} diff --git a/common/flow/mean_std_detector.go b/common/flow/mean_std_detector.go new file mode 100644 index 0000000..86c8c57 --- /dev/null +++ b/common/flow/mean_std_detector.go @@ -0,0 +1,435 @@ +package flow + +import ( + "fmt" + "math" +) + +// AnomalyHandler 异常处理回调函数类型 +type AnomalyHandler func(result *DetectionResult) + +// AnomalyAction 异常处理动作类型 +type AnomalyAction int + +const ( + ActionLog AnomalyAction = iota // 记录日志 + ActionAlert // 发送告警 + ActionBlock // 阻断流量 + ActionLimit // 限制流量 + ActionCustom // 自定义处理 +) + +// AnomalyProcessor 异常处理器 +type AnomalyProcessor struct { + Action AnomalyAction + Handler AnomalyHandler + Enabled bool + Name string + // 新增恢复处理器 + RecoveryHandler AnomalyHandler +} + +// 常量定义 +const ( + // MinWindowSize 最小窗口大小 + MinWindowSize = 2 + // InvalidThreshold 无效阈值标识 + InvalidThreshold = -1 +) + +// DetectionResult 异常检测结果 +type DetectionResult struct { + IsAnomaly bool // 是否异常 + CurrentValue float64 // 当前值 + Mean float64 // 均值 + StdDev float64 // 标准差 + Threshold float64 // 异常阈值 (k * 标准差) + Deviation float64 // 偏离度 (当前值与均值的差) + WindowData []float64 // 当前窗口数据 + WindowSize int // 窗口大小 + Confidence string // 置信度描述 + // 新增恢复相关字段 + IsRecovered bool // 是否从异常状态恢复 + RecoveryCount int // 连续正常计数 +} + +// String 返回检测结果的字符串表示 +func (r *DetectionResult) String() string { + if !r.IsAnomaly { + return fmt.Sprintf("正常值: %.2f (均值: %.2f, 标准差: %.2f)", + r.CurrentValue, r.Mean, r.StdDev) + } + return fmt.Sprintf("异常值: %.2f (均值: %.2f, 偏离: %.2f, 阈值: %.2f, %s)", + r.CurrentValue, r.Mean, r.Deviation, r.Threshold, r.Confidence) +} + +// MeanStdDetector 滑动窗口均值标准差异常检测器 +type MeanStdDetector struct { + window []float64 // 滑动窗口数据 + size int // 窗口大小 + k float64 // 异常检测系数(通常取2-3) + sum float64 // 窗口数据总和(优化计算) + sumSq float64 // 窗口数据平方和(优化计算) + processors []AnomalyProcessor // 异常处理器列表 + // 新增恢复相关字段 + isInAnomalyState bool // 当前是否处于异常状态 + normalCount int // 连续正常值计数 + recoveryThreshold int // 恢复需要的连续正常值数量 +} + +// NewMeanStdDetector 创建新的异常检测器 +// size: 滑动窗口大小,建议10-50 +// k: 异常检测系数,k=2(95%置信度), k=3(99.7%置信度) +func NewMeanStdDetector(size int, k float64) *MeanStdDetector { + if size < MinWindowSize { + size = MinWindowSize + } + if k <= 0 { + k = 2.0 // 默认2倍标准差 + } + return &MeanStdDetector{ + size: size, + k: k, + recoveryThreshold: 5, // 默认需要5个连续正常值才认为恢复 + } +} + +// AddValue 添加新的数据点到滑动窗口 +func (d *MeanStdDetector) AddValue(value float64) { + // 如果窗口已满,移除最旧的数据 + if len(d.window) >= d.size { + old := d.window[0] + d.window = d.window[1:] + d.sum -= old + d.sumSq -= old * old + } + + // 添加新数据 + d.window = append(d.window, value) + d.sum += value + d.sumSq += value * value +} + +// IsAnomaly 简单的异常检测,只返回是否异常 +func (d *MeanStdDetector) IsAnomaly(value float64) bool { + return d.DetectAnomaly(value).IsAnomaly +} + +// DetectAnomaly 完整的异常检测,返回详细结果 +func (d *MeanStdDetector) DetectAnomaly(value float64) *DetectionResult { + result := &DetectionResult{ + CurrentValue: value, + WindowSize: len(d.window), + } + + // 窗口数据不足,无法检测 + if len(d.window) < MinWindowSize { + result.IsAnomaly = false + result.Threshold = InvalidThreshold + result.Confidence = "数据不足" + return result + } + + // 计算统计指标 + n := float64(len(d.window)) + mean := d.sum / n + variance := (d.sumSq - (d.sum*d.sum)/n) / n + stdDev := math.Sqrt(math.Max(variance, 0)) + threshold := d.k * stdDev + deviation := math.Abs(value - mean) + + // 填充结果 + result.Mean = mean + result.StdDev = stdDev + result.Threshold = threshold + result.Deviation = deviation + result.WindowData = make([]float64, len(d.window)) + copy(result.WindowData, d.window) + + // 判断是否异常 + isCurrentAnomaly := deviation > threshold + result.IsAnomaly = isCurrentAnomaly + + // 恢复逻辑处理 + if !isCurrentAnomaly { + // 当前值正常 + d.normalCount++ + result.RecoveryCount = d.normalCount + + // 检查是否从异常状态恢复 + if d.isInAnomalyState && d.normalCount >= d.recoveryThreshold { + result.IsRecovered = true + d.isInAnomalyState = false + d.normalCount = 0 // 重置计数 + } + } else { + // 当前值异常 + d.isInAnomalyState = true + d.normalCount = 0 // 重置正常计数 + } + + // 设置置信度描述 + if stdDev == 0 { + if value == mean { + result.Confidence = "完全正常" + } else { + result.IsAnomaly = true + result.Confidence = "明显异常(零方差)" + } + } else { + sigmaLevel := deviation / stdDev + switch { + case sigmaLevel > 3: + result.Confidence = "高度异常(>3σ)" + case sigmaLevel > 2: + result.Confidence = "中度异常(>2σ)" + case sigmaLevel > 1: + result.Confidence = "轻微异常(>1σ)" + default: + result.Confidence = "正常范围" + } + } + + return result +} + +// SetRecoveryThreshold 设置恢复阈值 +func (d *MeanStdDetector) SetRecoveryThreshold(threshold int) { + if threshold > 0 { + d.recoveryThreshold = threshold + } +} + +// GetRecoveryStatus 获取恢复状态信息 +func (d *MeanStdDetector) GetRecoveryStatus() map[string]interface{} { + return map[string]interface{}{ + "is_in_anomaly_state": d.isInAnomalyState, + "normal_count": d.normalCount, + "recovery_threshold": d.recoveryThreshold, + "recovery_progress": float64(d.normalCount) / float64(d.recoveryThreshold), + } +} + +// GetWindowStats 获取当前窗口的统计信息 +func (d *MeanStdDetector) GetWindowStats() map[string]interface{} { + if len(d.window) == 0 { + return map[string]interface{}{ + "window_size": 0, + "mean": 0, + "std_dev": 0, + "min": 0, + "max": 0, + } + } + + n := float64(len(d.window)) + mean := d.sum / n + variance := (d.sumSq - (d.sum*d.sum)/n) / n + stdDev := math.Sqrt(math.Max(variance, 0)) + + // 计算最小值和最大值 + min, max := d.window[0], d.window[0] + for _, v := range d.window[1:] { + if v < min { + min = v + } + if v > max { + max = v + } + } + + return map[string]interface{}{ + "window_size": len(d.window), + "mean": mean, + "std_dev": stdDev, + "min": min, + "max": max, + "threshold": d.k * stdDev, + "k_factor": d.k, + } +} + +// Reset 重置检测器 +func (d *MeanStdDetector) Reset() { + d.window = nil + d.sum = 0 + d.sumSq = 0 +} + +// 保持向后兼容的方法 + +// Add 添加数据点(向后兼容) +func (d *MeanStdDetector) Add(value float64) { + d.AddValue(value) +} + +// IsAnomalyPrintValue 返回异常状态和阈值(向后兼容) +func (d *MeanStdDetector) IsAnomalyPrintValue(value float64) (bool, float64) { + result := d.DetectAnomaly(value) + if result.Threshold == InvalidThreshold { + return false, InvalidThreshold + } + return result.IsAnomaly, result.Threshold +} + +// IsAnomalyPrintFull 返回异常状态和窗口数据(向后兼容) +func (d *MeanStdDetector) IsAnomalyPrintFull(value float64) (bool, []float64) { + result := d.DetectAnomaly(value) + return result.IsAnomaly, result.WindowData +} + +// AddAnomalyProcessor 添加异常处理器 +func (d *MeanStdDetector) AddAnomalyProcessor(processor AnomalyProcessor) { + d.processors = append(d.processors, processor) +} + +// RemoveAnomalyProcessor 移除异常处理器 +func (d *MeanStdDetector) RemoveAnomalyProcessor(name string) { + for i, processor := range d.processors { + if processor.Name == name { + d.processors = append(d.processors[:i], d.processors[i+1:]...) + break + } + } +} + +// processAnomaly 处理异常情况 +func (d *MeanStdDetector) processAnomaly(result *DetectionResult) { + // 处理异常 + if result.IsAnomaly { + // 执行所有启用的异常处理器 + for _, processor := range d.processors { + if processor.Enabled && processor.Handler != nil { + processor.Handler(result) + } + } + } + + // 处理恢复 + if result.IsRecovered { + // 执行恢复处理器 + for _, processor := range d.processors { + if processor.Enabled && processor.RecoveryHandler != nil { + processor.RecoveryHandler(result) + } + } + } +} + +// DetectAnomalyWithProcessing 检测异常并自动处理 +func (d *MeanStdDetector) DetectAnomalyWithProcessing(value float64) *DetectionResult { + result := d.DetectAnomaly(value) + + // 如果检测到异常,执行处理逻辑 + d.processAnomaly(result) + + return result +} + +// 双窗口异常检测器 +type DualWindowDetector struct { + shortWindow *MeanStdDetector // 短期窗口(快速响应) + longWindow *MeanStdDetector // 长期窗口(稳定基线) + + // 状态管理 + sustainedHighCount int + maxSustainedHigh int + + // 异常处理器 + processors []AnomalyProcessor +} + +func NewDualWindowDetector(shortSize, longSize int, k float64) *DualWindowDetector { + return &DualWindowDetector{ + shortWindow: NewMeanStdDetector(shortSize, k), + longWindow: NewMeanStdDetector(longSize, k), + maxSustainedHigh: 20, // 最大持续高位次数 + } +} + +// AddValue 向两个窗口添加数据 +func (d *DualWindowDetector) AddValue(value float64) { + d.shortWindow.AddValue(value) + d.longWindow.AddValue(value) +} + +// AddAnomalyProcessor 添加异常处理器 +func (d *DualWindowDetector) AddAnomalyProcessor(processor AnomalyProcessor) { + d.processors = append(d.processors, processor) +} + +// SetSustainedHighThreshold 设置持续高位阈值 +func (d *DualWindowDetector) SetSustainedHighThreshold(threshold int) { + if threshold > 0 { + d.maxSustainedHigh = threshold + } +} + +func (d *DualWindowDetector) DetectAnomaly(value float64) *DetectionResult { + shortResult := d.shortWindow.DetectAnomaly(value) + //longResult := d.longWindow.DetectAnomaly(value) + + // 获取统计信息 + shortStats := d.shortWindow.GetWindowStats() + longStats := d.longWindow.GetWindowStats() + + shortMean := shortStats["mean"].(float64) + longMean := longStats["mean"].(float64) + + // 检测持续高位状态 + if len(d.shortWindow.window) >= MinWindowSize && len(d.longWindow.window) >= MinWindowSize { + if shortMean > longMean*1.5 { // 短期均值显著高于长期均值 + d.sustainedHighCount++ + } else { + d.sustainedHighCount = 0 + } + + // 如果持续高位时间过长,发出警告 + if d.sustainedHighCount > d.maxSustainedHigh { + shortResult.Confidence = fmt.Sprintf("持续高位运行警告(%d次)", d.sustainedHighCount) + shortResult.IsAnomaly = true + // 添加持续高位的特殊标记 + shortResult.Deviation = shortMean - longMean + } + } + + return shortResult +} + +// DetectAnomalyWithProcessing 检测异常并自动处理 +func (d *DualWindowDetector) DetectAnomalyWithProcessing(value float64) *DetectionResult { + result := d.DetectAnomaly(value) + + // 执行异常处理器 + if result.IsAnomaly { + for _, processor := range d.processors { + if processor.Enabled && processor.Handler != nil { + processor.Handler(result) + } + } + } + + return result +} + +// GetDualWindowStats 获取双窗口统计信息 +func (d *DualWindowDetector) GetDualWindowStats() map[string]interface{} { + shortStats := d.shortWindow.GetWindowStats() + longStats := d.longWindow.GetWindowStats() + + return map[string]interface{}{ + "short_window": shortStats, + "long_window": longStats, + "sustained_high_count": d.sustainedHighCount, + "max_sustained_high": d.maxSustainedHigh, + "short_mean": shortStats["mean"], + "long_mean": longStats["mean"], + } +} + +// Reset 重置双窗口检测器 +func (d *DualWindowDetector) Reset() { + d.shortWindow.Reset() + d.longWindow.Reset() + d.sustainedHighCount = 0 +} diff --git a/common/flow/mean_std_detector_test.go b/common/flow/mean_std_detector_test.go new file mode 100644 index 0000000..c67cc3c --- /dev/null +++ b/common/flow/mean_std_detector_test.go @@ -0,0 +1,658 @@ +package flow + +import ( + "SamWaf/common/zlog" + "SamWaf/global" + "fmt" + "strings" + "testing" + "time" +) + +func TestNewMeanStdDetector(t *testing.T) { + d := NewMeanStdDetector(10, 3) + if d == nil { + t.Fatal("NewMeanStdDetector failed") + } + //模拟测试 刚开始正常->异常->正常 + + // 第一阶段:添加正常数据,建立基线 + normalValues := []float64{10.0, 11.0, 9.0, 10.5, 9.5, 10.2, 9.8, 10.1, 9.9, 10.3} + for _, value := range normalValues { + d.Add(value) + // 在建立基线阶段,不应该检测到异常 + if d.IsAnomaly(value) { + t.Logf("正常阶段检测到异常值: %.2f (这在建立基线时是正常的)", value) + } + } + t.Logf("第一阶段完成:已添加%d个正常基线数据", len(normalValues)) + + // 第二阶段:测试异常检测 + anomalyValues := []float64{25.0, 30.0, -5.0, 35.0} // 明显偏离正常范围的异常值 + anomalyDetected := 0 + for _, value := range anomalyValues { + if d.IsAnomaly(value) { + anomalyDetected++ + t.Logf("检测到异常值: %.2f", value) + } else { + t.Logf("未检测到异常值: %.2f (可能需要调整k值)", value) + } + d.Add(value) // 添加到窗口中 + } + + if anomalyDetected == 0 { + t.Error("异常检测失败:没有检测到任何异常值") + } else { + t.Logf("第二阶段完成:检测到%d个异常值", anomalyDetected) + } + + // 第三阶段:回归正常 + normalValues2 := []float64{10.1, 9.9, 10.2, 9.8, 10.0, 9.7, 10.3, 9.6, 10.4} + normalDetected := 0 + for _, value := range normalValues2 { + if !d.IsAnomaly(value) { + normalDetected++ + t.Logf("正常值: %.2f", value) + } else { + t.Logf("仍被检测为异常: %.2f (滑动窗口还在调整中)", value) + } + d.Add(value) + } + + t.Logf("第三阶段完成:%d个值被识别为正常", normalDetected) + + // 验证最终状态:最后几个正常值应该不被检测为异常 + finalNormalValues := []float64{10.0, 9.9, 10.1, 0} + finalNormalCount := 0 + for _, value := range finalNormalValues { + if !d.IsAnomaly(value) { + finalNormalCount++ + } + } + + if finalNormalCount < 2 { + t.Error("最终正常化测试失败:系统未能回归正常状态") + } else { + t.Logf("测试成功:系统已回归正常状态,%d/%d个最终值被正确识别为正常", finalNormalCount, len(finalNormalValues)) + } +} + +// 额外的测试用例:测试边界条件 +func TestMeanStdDetectorEdgeCases(t *testing.T) { + d := NewMeanStdDetector(5, 2.0) + + // 测试窗口未满时的行为 + if d.IsAnomaly(100.0) { + t.Error("窗口数据不足时不应检测异常") + } + + d.Add(10.0) + if d.IsAnomaly(100.0) { + t.Error("窗口数据不足时不应检测异常") + } + + // 测试相同值的情况 + for i := 0; i < 5; i++ { + d.Add(10.0) + } + + // 当所有值都相同时,标准差为0,任何不同的值都应该被检测为异常 + if !d.IsAnomaly(15.0) { + t.Error("当标准差为0时,不同的值应该被检测为异常") + } + + if d.IsAnomaly(10.0) { + t.Error("相同的值不应该被检测为异常") + } +} + +// 性能测试 +func TestMeanStdDetectorPerformance(t *testing.T) { + d := NewMeanStdDetector(100, 2.0) + + // 添加大量数据测试性能 + for i := 0; i < 10000; i++ { + value := float64(i%20 + 10) // 生成10-29之间的循环数据 + d.Add(value) + d.IsAnomaly(value) + } + + t.Log("性能测试完成:处理了10000个数据点") +} + +// 手动测试 +func TestMeanStdDetectorManual(t *testing.T) { + d := NewMeanStdDetector(10, 3) + if d == nil { + t.Fatal("NewMeanStdDetector failed") + } + + normalValues := []float64{10.0, 11.0, 9.0, 10.5, 9.5, 10.2, 9.8, 10.1, 9.9, 10.3, 5000} + for _, value := range normalValues { + d.Add(value) + anomaly, window := d.IsAnomalyPrintFull(value) + if anomaly { + t.Logf("正常阶段检测到异常值: %.2f (这在建立基线时是正常的) 当前window数据: %v", value, window) + } + } + +} + +// 在现有测试文件中添加新的测试函数 + +// TestMeanStdDetectorImproved 改进后的测试示例 +func TestMeanStdDetectorImproved(t *testing.T) { + // 创建检测器:窗口大小10,2倍标准差阈值 + detector := NewMeanStdDetector(10, 2.0) + + // 模拟正常流量数据 + normalTraffic := []float64{100, 105, 95, 110, 90, 102, 98, 107, 93, 101} + + fmt.Println("=== 正常流量阶段 ===") + for _, traffic := range normalTraffic { + detector.AddValue(traffic) + result := detector.DetectAnomaly(traffic) + fmt.Printf("%s\n", result.String()) + } + + // 模拟异常流量 + anomalyTraffic := []float64{200, 50, 300, 10} + + fmt.Println("\n=== 异常流量检测 ===") + for _, traffic := range anomalyTraffic { + result := detector.DetectAnomaly(traffic) + fmt.Printf("%s\n", result.String()) + + // 添加到窗口中 + detector.AddValue(traffic) + } + + // 打印当前窗口统计信息 + fmt.Println("\n=== 窗口统计信息 ===") + stats := detector.GetWindowStats() + for key, value := range stats { + fmt.Printf("%s: %.2f\n", key, value) + } +} + +// TestFlowAnomalyDetectionDemo 流量异常检测演示 +func TestFlowAnomalyDetectionDemo(t *testing.T) { + // 创建流量异常检测器 + flowDetector := NewMeanStdDetector(20, 2.5) // 20个数据点窗口,2.5倍标准差 + + // 模拟一天的网络流量数据 (MB/s) + dailyTraffic := []float64{ + // 凌晨低流量 + 10, 8, 12, 9, 11, 7, 13, 10, + // 上午逐渐增加 + 15, 18, 22, 25, 30, 35, 40, + // 中午高峰 + 45, 50, 48, 52, 47, + // 下午稳定 + 40, 42, 38, 41, 39, + // 异常流量攻击 + 150, 200, 180, 220, + // 恢复正常 + 35, 40, 38, 42, 36, + } + + fmt.Println("=== 网络流量异常检测演示 ===") + anomalyCount := 0 + + for i, traffic := range dailyTraffic { + // 先检测再添加 + result := flowDetector.DetectAnomaly(traffic) + + if result.IsAnomaly { + anomalyCount++ + fmt.Printf("[%02d] ⚠️ %s\n", i+1, result.String()) + } else { + fmt.Printf("[%02d] ✅ %s\n", i+1, result.String()) + } + + // 添加到检测器 + flowDetector.AddValue(traffic) + } + + fmt.Printf("\n检测完成:共发现 %d 个异常流量点\n", anomalyCount) + + // 最终统计 + stats := flowDetector.GetWindowStats() + fmt.Printf("最终窗口统计:均值=%.1f, 标准差=%.1f, 阈值=%.1f\n", + stats["mean"], stats["std_dev"], stats["threshold"]) +} + +// TestFlowAnomalyDetectionWithHandling 带异常处理的流量检测测试 +func TestFlowAnomalyDetectionWithHandling(t *testing.T) { + // 创建流量异常检测器 + flowDetector := NewMeanStdDetector(20, 2.5) + + // 添加使用zlog的异常处理器 + flowDetector.AddAnomalyProcessor(AnomalyProcessor{ + Action: ActionLog, + Handler: LogAnomalyHandler, // 现在使用zlog + Enabled: true, + Name: "zlog_logger", + }) + + flowDetector.AddAnomalyProcessor(AnomalyProcessor{ + Action: ActionAlert, + Handler: AlertAnomalyHandler, // 现在使用zlog + Enabled: true, + Name: "zlog_alerter", + }) + + flowDetector.AddAnomalyProcessor(AnomalyProcessor{ + Action: ActionCustom, + Handler: CustomAnomalyHandler, // 现在使用zlog + Enabled: true, + Name: "zlog_custom", + }) + + // 模拟流量数据 + dailyTraffic := []float64{ + // 正常流量 + 10, 8, 12, 9, 11, 7, 13, 10, 15, 18, + // 异常攻击流量 + 150, 200, 180, 220, + // 恢复正常 + 35, 40, 38, 42, 36, + } + + fmt.Println("=== 带异常处理的流量检测演示 ===") + anomalyCount := 0 + + for i, traffic := range dailyTraffic { + // 使用带处理的检测方法 + result := flowDetector.DetectAnomalyWithProcessing(traffic) + + if result.IsAnomaly { + anomalyCount++ + fmt.Printf("[%02d] ⚠️ 异常已处理: %.2f\n", i+1, traffic) + } else { + fmt.Printf("[%02d] ✅ 正常流量: %.2f\n", i+1, traffic) + } + + // 添加到检测器 + flowDetector.AddValue(traffic) + + // 模拟实时处理间隔 + time.Sleep(100 * time.Millisecond) + } + + fmt.Printf("\n检测完成:共处理 %d 个异常流量点\n", anomalyCount) +} + +// TestRealTimeAnomalyProcessing 实时异常处理测试 +func TestRealTimeAnomalyProcessing(t *testing.T) { + detector := NewMeanStdDetector(10, 2.0) + + // 添加实时处理器 + detector.AddAnomalyProcessor(AnomalyProcessor{ + Action: ActionCustom, + Enabled: true, + Name: "realtime", + Handler: func(result *DetectionResult) { + // 实时处理逻辑 + timestamp := time.Now().Format("15:04:05") + switch { + case result.Deviation > result.Threshold*3: + fmt.Printf("[%s] 🔥 极严重异常 %.2f - 立即阻断\n", timestamp, result.CurrentValue) + case result.Deviation > result.Threshold*2: + fmt.Printf("[%s] ⚡ 严重异常 %.2f - 启动限流\n", timestamp, result.CurrentValue) + default: + fmt.Printf("[%s] ⚠️ 一般异常 %.2f - 记录日志\n", timestamp, result.CurrentValue) + } + }, + }) + + // 模拟实时数据流 + trafficStream := []float64{10, 12, 11, 9, 50, 100, 200, 15, 13, 11} + + fmt.Println("=== 实时异常处理演示 ===") + for _, traffic := range trafficStream { + detector.AddValue(traffic) + detector.DetectAnomalyWithProcessing(traffic) + time.Sleep(500 * time.Millisecond) // 模拟实时间隔 + } +} + +// TestFlowAnomalyDetectionWithRecovery 带恢复机制的流量检测测试 +func TestFlowAnomalyDetectionWithRecovery(t *testing.T) { + + //初始化日志 + zlog.InitZLog(global.GWAF_LOG_DEBUG_ENABLE, "console") + // 创建流量异常检测器 + flowDetector := NewMeanStdDetector(10, 2.0) + flowDetector.SetRecoveryThreshold(3) // 设置3个连续正常值后恢复 + + // 添加异常和恢复处理器 + flowDetector.AddAnomalyProcessor(AnomalyProcessor{ + Action: ActionLog, + Handler: LogAnomalyHandler, + RecoveryHandler: RecoveryLogHandler, + Enabled: true, + Name: "logger_with_recovery", + }) + + flowDetector.AddAnomalyProcessor(AnomalyProcessor{ + Action: ActionBlock, + Handler: BlockAnomalyHandler, + RecoveryHandler: RecoveryUnblockHandler, + Enabled: true, + Name: "blocker_with_recovery", + }) + + // 模拟完整的异常-恢复周期 + trafficData := []float64{ + // 正常流量建立基线 + 10, 12, 11, 9, 10, 11, 9, 12, 10, 11, + // 异常流量 + 50, 60, 55, + // 恢复正常 + 10, 11, 12, 9, 10, 11, + // 高位运行 + 100, 120, 110, 130, 120, 110, + // 继续高位运行 + 92, 93, 94, 95, 94, 98, + //降至低位 + 10, 11, 12, 9, 10, 11, + } + + fmt.Println("=== 异常检测与恢复演示 ===") + for i, traffic := range trafficData { + // 先添加到窗口,再检测 + flowDetector.AddValue(traffic) + result := flowDetector.DetectAnomalyWithProcessing(traffic) + + if result.IsAnomaly { + fmt.Printf("[%02d] ⚠️ 异常: %.2f\n", i+1, traffic) + } else if result.IsRecovered { + fmt.Printf("[%02d] 🎉 恢复: %.2f (连续正常%d次)\n", i+1, traffic, result.RecoveryCount) + } else { + fmt.Printf("[%02d] ✅ 正常: %.2f\n", i+1, traffic) + } + + // 显示恢复状态 + status := flowDetector.GetRecoveryStatus() + if status["is_in_anomaly_state"].(bool) { + fmt.Printf(" 恢复进度: %d/%d (%.1f%%)\n", + status["normal_count"], status["recovery_threshold"], + status["recovery_progress"].(float64)*100) + } + + time.Sleep(200 * time.Millisecond) + } +} + +// TestDualWindowDetectorBasic 双窗口检测器基础功能测试 +func TestDualWindowDetectorBasic(t *testing.T) { + // 创建双窗口检测器:短窗口5,长窗口15,2倍标准差 + detector := NewDualWindowDetector(5, 15, 2.0) + + fmt.Println("=== 双窗口检测器基础功能测试 ===") + + // 第一阶段:建立长期基线(低流量) + lowTraffic := []float64{10, 12, 11, 9, 10, 11, 9, 12, 10, 11, 9, 10, 12, 11, 10} + fmt.Println("\n--- 建立长期基线(低流量) ---") + for i, traffic := range lowTraffic { + detector.AddValue(traffic) + result := detector.DetectAnomaly(traffic) + fmt.Printf("[%02d] 值: %.1f, 异常: %v, %s\n", i+1, traffic, result.IsAnomaly, result.Confidence) + } + + // 第二阶段:短期高流量(应该被检测为正常,因为还没有持续太久) + highTraffic := []float64{25, 30, 28, 32, 27, 29, 31, 26, 30, 28} + fmt.Println("\n--- 短期高流量阶段 ---") + for i, traffic := range highTraffic { + detector.AddValue(traffic) + result := detector.DetectAnomaly(traffic) + fmt.Printf("[%02d] 值: %.1f, 异常: %v, %s\n", i+len(lowTraffic)+1, traffic, result.IsAnomaly, result.Confidence) + } + + // 第三阶段:持续高流量(应该触发持续高位警告) + sustainedHighTraffic := []float64{35, 40, 38, 42, 36, 39, 41, 37, 40, 38, 35, 42, 39, 36, 40} + fmt.Println("\n--- 持续高流量阶段 ---") + for i, traffic := range sustainedHighTraffic { + detector.AddValue(traffic) + result := detector.DetectAnomaly(traffic) + fmt.Printf("[%02d] 值: %.1f, 异常: %v, %s\n", i+len(lowTraffic)+len(highTraffic)+1, traffic, result.IsAnomaly, result.Confidence) + } + + // 打印最终统计 + stats := detector.GetDualWindowStats() + fmt.Printf("\n=== 最终统计 ===\n") + fmt.Printf("短窗口均值: %.2f\n", stats["short_mean"]) + fmt.Printf("长窗口均值: %.2f\n", stats["long_mean"]) + fmt.Printf("持续高位计数: %d\n", stats["sustained_high_count"]) +} + +// TestDualWindowSustainedHighDetection 持续高位检测专项测试 +func TestDualWindowSustainedHighDetection(t *testing.T) { + detector := NewDualWindowDetector(5, 20, 2.0) + detector.SetSustainedHighThreshold(10) // 设置较低的阈值便于测试 + + fmt.Println("=== 持续高位检测专项测试 ===") + + // 建立低基线 + for i := 0; i < 20; i++ { + detector.AddValue(10.0 + float64(i%3)) // 9-12之间波动 + } + + fmt.Println("\n--- 基线建立完成,开始高位运行 ---") + + // 模拟系统从低位突然跳到高位并持续运行 + highValues := []float64{50, 52, 48, 51, 49, 53, 47, 50, 52, 48, 51, 49, 50, 52, 48} + + sustainedDetected := false + for i, value := range highValues { + detector.AddValue(value) + result := detector.DetectAnomaly(value) + + fmt.Printf("[%02d] 值: %.1f, 异常: %v", i+1, value, result.IsAnomaly) + if result.IsAnomaly && result.Confidence != "正常范围" { + fmt.Printf(", %s", result.Confidence) + if !sustainedDetected && result.Confidence != "正常范围" { + sustainedDetected = true + fmt.Printf(" ← 首次检测到持续高位") + } + } + fmt.Println() + } + + if !sustainedDetected { + t.Error("未能检测到持续高位运行状态") + } else { + fmt.Println("✅ 成功检测到持续高位运行状态") + } +} + +// TestDualWindowVsSingleWindow 双窗口与单窗口对比测试 +func TestDualWindowVsSingleWindow(t *testing.T) { + // 创建检测器 + singleDetector := NewMeanStdDetector(10, 2.0) + dualDetector := NewDualWindowDetector(5, 15, 2.0) + dualDetector.SetSustainedHighThreshold(8) + + fmt.Println("=== 双窗口 vs 单窗口对比测试 ===") + + // 测试场景:历史低峰 -> 高峰持续运行 + testData := []float64{ + // 历史低峰 + 5, 7, 6, 8, 5, 6, 7, 5, 8, 6, + // 突然跳到高位并持续 + 25, 28, 26, 30, 27, 29, 31, 26, 28, 30, 27, 29, 25, 28, 26, + } + + fmt.Printf("%-5s %-8s %-15s %-15s %-20s %-20s\n", "序号", "数值", "单窗口异常", "双窗口异常", "单窗口置信度", "双窗口置信度") + fmt.Println(strings.Repeat("-", 100)) + + singleAnomalies := 0 + dualAnomalies := 0 + dualSustainedWarnings := 0 + + for i, value := range testData { + // 单窗口检测 + singleDetector.AddValue(value) + singleResult := singleDetector.DetectAnomaly(value) + + // 双窗口检测 + dualDetector.AddValue(value) + dualResult := dualDetector.DetectAnomaly(value) + + if singleResult.IsAnomaly { + singleAnomalies++ + } + if dualResult.IsAnomaly { + dualAnomalies++ + if strings.Contains(dualResult.Confidence, "持续高位") { + dualSustainedWarnings++ + } + } + + fmt.Printf("%-5d %-8.1f %-15v %-15v %-20s %-20s\n", + i+1, value, + singleResult.IsAnomaly, dualResult.IsAnomaly, + singleResult.Confidence, dualResult.Confidence) + } + + fmt.Printf("\n=== 对比结果 ===\n") + fmt.Printf("单窗口异常次数: %d\n", singleAnomalies) + fmt.Printf("双窗口异常次数: %d\n", dualAnomalies) + fmt.Printf("双窗口持续高位警告: %d\n", dualSustainedWarnings) + + // 验证双窗口能检测到持续高位问题 + if dualSustainedWarnings == 0 { + t.Error("双窗口检测器未能识别持续高位运行问题") + } else { + fmt.Printf("✅ 双窗口成功识别了持续高位运行问题\n") + } +} + +// TestDualWindowWithHandlers 双窗口检测器异常处理器测试 +func TestDualWindowWithHandlers(t *testing.T) { + //初始化日志 + zlog.InitZLog(global.GWAF_LOG_DEBUG_ENABLE, "console") + + detector := NewDualWindowDetector(5, 15, 2.0) + detector.SetSustainedHighThreshold(5) + + // 添加专门的持续高位处理器 + detector.AddAnomalyProcessor(AnomalyProcessor{ + Action: ActionCustom, + Enabled: true, + Name: "sustained_high_handler", + Handler: func(result *DetectionResult) { + if strings.Contains(result.Confidence, "持续高位") { + zlog.Warn("持续高位运行告警", map[string]interface{}{ + "alert_type": "sustained_high_traffic", + "current_value": result.CurrentValue, + "deviation": result.Deviation, + "confidence": result.Confidence, + "action_needed": "review_system_capacity", + }) + fmt.Printf("🔥 持续高位告警: %s\n", result.String()) + } else { + zlog.Info("一般异常检测", map[string]interface{}{ + "current_value": result.CurrentValue, + "confidence": result.Confidence, + }) + } + }, + }) + + fmt.Println("=== 双窗口异常处理器测试 ===") + + // 测试数据:低基线 -> 持续高位 + testSequence := []float64{ + // 建立低基线 + 10, 12, 11, 9, 10, 11, 9, 12, 10, 11, 9, 10, 12, 11, 10, + // 持续高位运行 + 30, 32, 31, 33, 29, 31, 34, 30, 32, 31, 30, 33, 31, 29, 32, + } + + handlerTriggered := false + sustainedHighDetected := false + + for i, value := range testSequence { + detector.AddValue(value) + result := detector.DetectAnomalyWithProcessing(value) + + if result.IsAnomaly { + handlerTriggered = true + if strings.Contains(result.Confidence, "持续高位") { + sustainedHighDetected = true + fmt.Printf("[%02d] 🚨 持续高位: %.1f - %s\n", i+1, value, result.Confidence) + } else { + fmt.Printf("[%02d] ⚠️ 一般异常: %.1f - %s\n", i+1, value, result.Confidence) + } + } else { + fmt.Printf("[%02d] ✅ 正常: %.1f\n", i+1, value) + } + + time.Sleep(50 * time.Millisecond) + } + + if !handlerTriggered { + t.Error("异常处理器未被触发") + } + if !sustainedHighDetected { + t.Error("未检测到持续高位运行状态") + } + + fmt.Printf("\n✅ 测试完成:异常处理器正常工作,持续高位检测有效\n") +} + +// TestDualWindowRealWorldScenario 双窗口真实场景测试 +func TestDualWindowRealWorldScenario(t *testing.T) { + detector := NewDualWindowDetector(10, 30, 2.0) + detector.SetSustainedHighThreshold(15) + + fmt.Println("=== 双窗口真实场景测试 ===") + + // 模拟真实的网络流量场景 + scenarios := map[string][]float64{ + "正常日间流量": {15, 18, 20, 22, 25, 23, 21, 19, 17, 20, 22, 24, 21, 18, 16}, + "突发异常流量": {150, 200, 180, 220, 160}, // 真正的异常攻击 + "系统升级后高位": {45, 48, 50, 47, 49, 51, 46, 48, 50, 47, 49, 52, 48, 46, 50, 49, 47, 51, 48, 50}, // 系统升级后持续高位 + "恢复正常": {20, 22, 18, 21, 19, 23, 20, 18, 22, 21}, + } + + totalAnomalies := 0 + sustainedHighWarnings := 0 + + for scenario, data := range scenarios { + fmt.Printf("\n--- %s ---\n", scenario) + + for i, value := range data { + detector.AddValue(value) + result := detector.DetectAnomaly(value) + + if result.IsAnomaly { + totalAnomalies++ + if strings.Contains(result.Confidence, "持续高位") { + sustainedHighWarnings++ + fmt.Printf("[%02d] 🔥 %s: %.1f\n", i+1, result.Confidence, value) + } else { + fmt.Printf("[%02d] ⚠️ 异常: %.1f - %s\n", i+1, value, result.Confidence) + } + } else { + fmt.Printf("[%02d] ✅ 正常: %.1f\n", i+1, value) + } + } + } + + fmt.Printf("\n=== 场景测试总结 ===\n") + fmt.Printf("总异常检测次数: %d\n", totalAnomalies) + fmt.Printf("持续高位警告次数: %d\n", sustainedHighWarnings) + + // 验证检测效果 + if totalAnomalies == 0 { + t.Error("未检测到任何异常,检测器可能过于宽松") + } + if sustainedHighWarnings == 0 { + t.Error("未检测到持续高位运行,可能需要调整参数") + } + + fmt.Printf("✅ 双窗口检测器在真实场景中表现良好\n") +} diff --git a/global/config.go b/global/config.go index ed1a156..4e1b167 100644 --- a/global/config.go +++ b/global/config.go @@ -2,19 +2,19 @@ package global var ( /******记录参数配置****************/ - GCONFIG_RECORD_MAX_BODY_LENGTH int64 = 1024 * 2 //限制记录最大请求的body长度 record_max_req_body_length - GCONFIG_RECORD_MAX_RES_BODY_LENGTH int64 = 1024 * 4 //限制记录最大响应的body长度 record_max_rep_body_length - GCONFIG_RECORD_RESP int64 = 0 // 是否记录响应记录 record_resp - GCONFIG_RECORD_PROXY_HEADER string = "" //配置获取IP头信息 - GCONFIG_RECORD_AUTO_LOAD_SSL int64 = 1 //是否每天凌晨3点自动加载ssl证书 - GCONFIG_RECORD_KAFKA_ENABLE int64 = 0 //kafka 是否激活 - GCONFIG_RECORD_KAFKA_URL string = "127.0.0.1:9092" //kafka url地址 - GCONFIG_RECORD_KAFKA_TOPIC string = "samwaf_logs_topic" //kafka topic - GCONFIG_RECORD_REDIRECT_HTTPS_CODE int64 = 301 //80跳转https的方式 - GCONFIG_ENABLE_HTTPS_REDIRECT int64 = 0 //是否启用HTTPS重定向服务器 0关闭 1开启 - - GCONFIG_RECORD_LOGIN_MAX_ERROR_TIME int64 = 3 //登录周期里错误最大次数 - GCONFIG_RECORD_LOGIN_LIMIT_MINTUTES int64 = 1 //登录错误记录周期 单位分钟最小1 + GCONFIG_LOG_PERSIST_ENABLED int64 = 1 //是否将web日志持久化到日志库(默认0维持老行为) 1开启 0维持老行为 + GCONFIG_RECORD_MAX_BODY_LENGTH int64 = 1024 * 2 //限制记录最大请求的body长度 record_max_req_body_length + GCONFIG_RECORD_MAX_RES_BODY_LENGTH int64 = 1024 * 4 //限制记录最大响应的body长度 record_max_rep_body_length + GCONFIG_RECORD_RESP int64 = 0 // 是否记录响应记录 record_resp + GCONFIG_RECORD_PROXY_HEADER string = "" //配置获取IP头信息 + GCONFIG_RECORD_AUTO_LOAD_SSL int64 = 1 //是否每天凌晨3点自动加载ssl证书 + GCONFIG_RECORD_KAFKA_ENABLE int64 = 0 //kafka 是否激活 + GCONFIG_RECORD_KAFKA_URL string = "127.0.0.1:9092" //kafka url地址 + GCONFIG_RECORD_KAFKA_TOPIC string = "samwaf_logs_topic" //kafka topic + GCONFIG_RECORD_REDIRECT_HTTPS_CODE int64 = 301 //80跳转https的方式 + GCONFIG_ENABLE_HTTPS_REDIRECT int64 = 0 //是否启用HTTPS重定向服务器 0关闭 1开启 + GCONFIG_RECORD_LOGIN_MAX_ERROR_TIME int64 = 3 //登录周期里错误最大次数 + GCONFIG_RECORD_LOGIN_LIMIT_MINTUTES int64 = 1 //登录错误记录周期 单位分钟最小1 // 指纹认证相关配置 GCONFIG_ENABLE_DEVICE_FINGERPRINT int64 = 1 // 是否启用设备指纹认证 1启用 0禁用 diff --git a/global/global.go b/global/global.go index 051a107..4dd26cf 100644 --- a/global/global.go +++ b/global/global.go @@ -134,7 +134,7 @@ var ( GSSL_HTTP_CHANGLE_PATH string = "/.well-known/acme-challenge/" // http01证书验证路径 /******数据库处理参数*****/ - GDATA_BATCH_INSERT int = 1000 //最大批量插入 + GDATA_BATCH_INSERT int64 = 100 //最大批量插入数量 GDATA_SHARE_DB_SIZE int64 = 100 * 10000 //100w 进行分库 100*10000 GDATA_SHARE_DB_FILE_SIZE int64 = 1024 //1024M 进行分库 GDATA_CURRENT_CHANGE bool = false //当前是否正在切换 diff --git a/model/hosts.go b/model/hosts.go index a14b804..5be33f8 100644 --- a/model/hosts.go +++ b/model/hosts.go @@ -40,6 +40,7 @@ type Hosts struct { AntiLeechJSON string `json:"anti_leech_json"` //防盗链配置 json CacheJSON string `json:"cache_json"` //缓存配置 json StaticSiteJSON string `json:"static_site_json"` //静态站点配置 json + TransportJSON string `json:"transport_json"` //传输配置 json DefaultEncoding string `json:"default_encoding"` //默认编码 utf-8 或者 gbk auto字符串自动选择 LogOnlyMode int `json:"log_only_mode"` //仅记录模式 1开启 0关闭 } @@ -154,3 +155,36 @@ type StaticSiteConfig struct { AllowedExtensions string `json:"allowed_extensions"` // 允许的文件扩展名白名单,逗号分隔 SensitivePatterns string `json:"sensitive_patterns"` // 敏感文件名模式(正则表达式),逗号分隔 } + +// TransportConfig 传输配置 +type TransportConfig struct { + MaxIdleConns int `json:"max_idle_conns"` // 最大空闲连接数 + MaxIdleConnsPerHost int `json:"max_idle_conns_per_host"` // 每个主机的最大空闲连接数 + MaxConnsPerHost int `json:"max_conns_per_host"` // 每个主机的最大连接数 + IdleConnTimeout int `json:"idle_conn_timeout"` // 空闲连接超时时间(秒) + TLSHandshakeTimeout int `json:"tls_handshake_timeout"` // TLS握手超时时间(秒) + ExpectContinueTimeout int `json:"expect_continue_timeout"` // Expect Continue超时时间(秒) +} + +// ParseTransportConfig 解析传输配置 +func ParseTransportConfig(transportJSON string) TransportConfig { + var config TransportConfig + + // 设置默认值 + config.MaxIdleConns = 0 + config.MaxIdleConnsPerHost = 0 + config.MaxConnsPerHost = 0 + config.IdleConnTimeout = 0 + config.TLSHandshakeTimeout = 0 + config.ExpectContinueTimeout = 0 + + // 如果JSON不为空,则解析覆盖默认值 + if transportJSON != "" { + err := json.Unmarshal([]byte(transportJSON), &config) + if err != nil { + // 解析失败时使用默认值,可以记录日志 + return config + } + } + return config +} diff --git a/model/request/waf_host_req.go b/model/request/waf_host_req.go index d6e60b6..d30cb33 100644 --- a/model/request/waf_host_req.go +++ b/model/request/waf_host_req.go @@ -36,6 +36,7 @@ type WafHostAddReq struct { CacheJSON string `json:"cache_json"` //缓存配置 json DefaultEncoding string `json:"default_encoding"` //默认编码 utf-8 或者 gbk auto字符串自动选择 LogOnlyMode int `json:"log_only_mode"` //是否只记录日志 1 是 0 不是 + TransportJSON string `json:"transport_json"` //Transport配置 json } type WafHostDelReq struct { @@ -80,6 +81,7 @@ type WafHostEditReq struct { StaticSiteJSON string `json:"static_site_json"` //静态站点配置 json DefaultEncoding string `json:"default_encoding"` //默认编码 utf-8 或者 gbk auto字符串自动选择 LogOnlyMode int `json:"log_only_mode"` //是否只记录日志 1 是 0 不是 + TransportJSON string `json:"transport_json"` //Transport配置 json } type WafHostGuardStatusReq struct { CODE string `json:"code"` diff --git a/service/waf_service/waf_host.go b/service/waf_service/waf_host.go index 12fa2b7..e6f97da 100644 --- a/service/waf_service/waf_host.go +++ b/service/waf_service/waf_host.go @@ -67,6 +67,7 @@ func (receiver *WafHostService) AddApi(wafHostAddReq request.WafHostAddReq) (str StaticSiteJSON: wafHostAddReq.StaticSiteJSON, DefaultEncoding: wafHostAddReq.DefaultEncoding, LogOnlyMode: wafHostAddReq.LogOnlyMode, + TransportJSON: wafHostAddReq.TransportJSON, } global.GWAF_LOCAL_DB.Create(wafHost) return wafHost.Code, nil @@ -124,6 +125,7 @@ func (receiver *WafHostService) ModifyApi(wafHostEditReq request.WafHostEditReq) "StaticSiteJSON": wafHostEditReq.StaticSiteJSON, "DefaultEncoding": wafHostEditReq.DefaultEncoding, "LogOnlyMode": wafHostEditReq.LogOnlyMode, + "TransportJSON": wafHostEditReq.TransportJSON, } err := global.GWAF_LOCAL_DB.Debug().Model(model.Hosts{}).Where("CODE=?", wafHostEditReq.CODE).Updates(hostMap).Error diff --git a/wafenginecore/check_captcha.go b/wafenginecore/check_captcha.go index 80a66cc..d13bac0 100644 --- a/wafenginecore/check_captcha.go +++ b/wafenginecore/check_captcha.go @@ -57,7 +57,7 @@ func (waf *WafEngine) checkCaptchaToken(r *http.Request, webLog innerbean.WebLog } // 处理验证码 -func (waf *WafEngine) handleCaptchaRequest(w http.ResponseWriter, r *http.Request, log innerbean.WebLog, captchaConfig model.CaptchaConfig) { +func (waf *WafEngine) handleCaptchaRequest(w http.ResponseWriter, r *http.Request, log *innerbean.WebLog, captchaConfig model.CaptchaConfig) { // 使用验证码服务处理请求 captchaService := wafcaptcha.GetService() captchaService.HandleCaptchaRequest(w, r, log, captchaConfig) diff --git a/wafenginecore/error_process.go b/wafenginecore/error_process.go index 8e9b4b9..9d2a330 100644 --- a/wafenginecore/error_process.go +++ b/wafenginecore/error_process.go @@ -33,7 +33,7 @@ func renderTemplate(templateContent string, data map[string]interface{}) ([]byte } // EchoErrorInfo ruleName 对内记录 blockInfo 对外展示 -func EchoErrorInfo(w http.ResponseWriter, r *http.Request, weblogbean innerbean.WebLog, ruleName string, blockInfo string, hostsafe *wafenginmodel.HostSafe, globalHostSafe *wafenginmodel.HostSafe, isLog bool) { +func EchoErrorInfo(w http.ResponseWriter, r *http.Request, weblogbean *innerbean.WebLog, ruleName string, blockInfo string, hostsafe *wafenginmodel.HostSafe, globalHostSafe *wafenginmodel.HostSafe, isLog bool) { resBytes := []byte("") var responseCode int = 403 @@ -136,7 +136,7 @@ func EchoErrorInfo(w http.ResponseWriter, r *http.Request, weblogbean innerbean. } // EchoResponseErrorInfo ruleName 对内记录 blockInfo 对外展示 -func EchoResponseErrorInfo(resp *http.Response, weblogbean innerbean.WebLog, ruleName string, blockInfo string, hostsafe *wafenginmodel.HostSafe, globalHostSafe *wafenginmodel.HostSafe, isLog bool) { +func EchoResponseErrorInfo(resp *http.Response, weblogbean *innerbean.WebLog, ruleName string, blockInfo string, hostsafe *wafenginmodel.HostSafe, globalHostSafe *wafenginmodel.HostSafe, isLog bool) { resBytes := []byte("") var responseCode int = 403 diff --git a/wafenginecore/proxy.go b/wafenginecore/proxy.go index e716e1e..f4f4889 100644 --- a/wafenginecore/proxy.go +++ b/wafenginecore/proxy.go @@ -9,6 +9,7 @@ import ( "SamWaf/wafproxy" "context" "crypto/tls" + "fmt" "net" "net/http" "net/url" @@ -28,7 +29,8 @@ func (waf *WafEngine) ProxyHTTP(w http.ResponseWriter, r *http.Request, host str for addrIndex, loadBalance := range hostTarget.LoadBalanceLists { //初始化后端负载 zlog.Debug("HTTP REQUEST", weblog.REQ_UUID, weblog.URL, "未初始化") - transport, customHeaders := waf.createTransport(r, host, 1, loadBalance, hostTarget) + transport := waf.getOrCreateTransport(r, host, 1, loadBalance, hostTarget) // 使用缓存的Transport + customHeaders := waf.getCustomHeaders(r, host, 1, loadBalance, hostTarget) customConfig := map[string]string{} customConfig["IsTransBackDomain"] = strconv.Itoa(hostTarget.Host.IsTransBackDomain) proxy := wafproxy.NewSingleHostReverseProxyCustomHeader(remoteUrl, customHeaders, customConfig) @@ -70,7 +72,8 @@ func (waf *WafEngine) ProxyHTTP(w http.ResponseWriter, r *http.Request, host str } } else { - transport, customHeaders := waf.createTransport(r, host, 0, model.LoadBalance{}, hostTarget) + transport := waf.getOrCreateTransport(r, host, 0, model.LoadBalance{}, hostTarget) // 使用缓存的Transport + customHeaders := waf.getCustomHeaders(r, host, 0, model.LoadBalance{}, hostTarget) customConfig := map[string]string{} customConfig["IsTransBackDomain"] = strconv.Itoa(hostTarget.Host.IsTransBackDomain) proxy := wafproxy.NewSingleHostReverseProxyCustomHeader(remoteUrl, customHeaders, customConfig) @@ -130,6 +133,97 @@ func (waf *WafEngine) createTransport(r *http.Request, host string, isEnableLoad DialContext: dialContext, } } + + // 解析并应用Transport配置 + transportConfig := model.ParseTransportConfig(hostTarget.Host.TransportJSON) + + // 应用Transport配置(只有非零值才设置) + if transportConfig.MaxIdleConns > 0 { + transport.MaxIdleConns = transportConfig.MaxIdleConns + } + if transportConfig.MaxIdleConnsPerHost > 0 { + transport.MaxIdleConnsPerHost = transportConfig.MaxIdleConnsPerHost + } + if transportConfig.MaxConnsPerHost > 0 { + transport.MaxConnsPerHost = transportConfig.MaxConnsPerHost + } + if transportConfig.IdleConnTimeout > 0 { + transport.IdleConnTimeout = time.Duration(transportConfig.IdleConnTimeout) * time.Second + } + if transportConfig.TLSHandshakeTimeout > 0 { + transport.TLSHandshakeTimeout = time.Duration(transportConfig.TLSHandshakeTimeout) * time.Second + } + if transportConfig.ExpectContinueTimeout > 0 { + transport.ExpectContinueTimeout = time.Duration(transportConfig.ExpectContinueTimeout) * time.Second + } + transport.ResponseHeaderTimeout = time.Duration(hostTarget.Host.ResponseTimeOut) * time.Second + + //把下面的参数一次性使用格式化实现 + zlog.Debug(fmt.Sprintf("Transport配置信息:\nMaxIdleConns: %d\nMaxIdleConnsPerHost: %d\nMaxConnsPerHost: %d\nIdleConnTimeout: %v\nTLSHandshakeTimeout: %v\nExpectContinueTimeout: %v\nResponseHeaderTimeout: %v\nDisableKeepAlives: %v\nDisableCompression: %v", + transport.MaxIdleConns, + transport.MaxIdleConnsPerHost, + transport.MaxConnsPerHost, + transport.IdleConnTimeout, + transport.TLSHandshakeTimeout, + transport.ExpectContinueTimeout, + transport.ResponseHeaderTimeout, + transport.DisableKeepAlives, + transport.DisableCompression)) return transport, customHeaders } + +// 获取或创建Transport +func (waf *WafEngine) getOrCreateTransport(r *http.Request, host string, isEnableLoadBalance int, loadBalance model.LoadBalance, hostTarget *wafenginmodel.HostSafe) *http.Transport { + // 生成Transport的唯一键 + transportKey := waf.generateTransportKey(host, isEnableLoadBalance, loadBalance, hostTarget) + + waf.TransportMux.RLock() + if transport, exists := waf.TransportPool[transportKey]; exists { + waf.TransportMux.RUnlock() + return transport + } + waf.TransportMux.RUnlock() + + // 创建新的Transport + waf.TransportMux.Lock() + defer waf.TransportMux.Unlock() + + // 双重检查,防止并发创建 + if transport, exists := waf.TransportPool[transportKey]; exists { + return transport + } + + transport, _ := waf.createTransport(r, host, isEnableLoadBalance, loadBalance, hostTarget) + + // 优化Transport配置 + /*transport.MaxIdleConns = 1000 + transport.MaxIdleConnsPerHost = 1000*/ + + if waf.TransportPool == nil { + waf.TransportPool = make(map[string]*http.Transport) + } + waf.TransportPool[transportKey] = transport + + return transport +} + +// 生成Transport的唯一键 +func (waf *WafEngine) generateTransportKey(host string, isEnableLoadBalance int, loadBalance model.LoadBalance, hostTarget *wafenginmodel.HostSafe) string { + key := fmt.Sprintf("%s_%d_%s_%d_%v", + host, + isEnableLoadBalance, + loadBalance.Remote_ip, + loadBalance.Remote_port, + hostTarget.Host.InsecureSkipVerify) + return key +} + +// 分离自定义头部逻辑 +func (waf *WafEngine) getCustomHeaders(r *http.Request, host string, isEnableLoadBalance int, loadBalance model.LoadBalance, hostTarget *wafenginmodel.HostSafe) map[string]string { + customHeaders := map[string]string{} + if r.TLS != nil { + customHeaders["X-FORWARDED-PROTO"] = "https" + } + return customHeaders +} diff --git a/wafenginecore/wafcaptcha/service.go b/wafenginecore/wafcaptcha/service.go index f2d4166..aff2729 100644 --- a/wafenginecore/wafcaptcha/service.go +++ b/wafenginecore/wafcaptcha/service.go @@ -157,7 +157,7 @@ func GetService() *CaptchaService { } // HandleCaptchaRequest 处理验证码请求 -func (s *CaptchaService) HandleCaptchaRequest(w http.ResponseWriter, r *http.Request, weblog innerbean.WebLog, captchaConfig model.CaptchaConfig) { +func (s *CaptchaService) HandleCaptchaRequest(w http.ResponseWriter, r *http.Request, weblog *innerbean.WebLog, captchaConfig model.CaptchaConfig) { path := r.URL.Path // 记录访问日志 @@ -400,7 +400,7 @@ func (s *CaptchaService) GetClickBasicCaptData(w http.ResponseWriter, r *http.Re } // VerifyCaptcha 验证验证码 -func (s *CaptchaService) VerifyCaptcha(w http.ResponseWriter, r *http.Request, captchaType string, webLog innerbean.WebLog, captchaConfig model.CaptchaConfig) { +func (s *CaptchaService) VerifyCaptcha(w http.ResponseWriter, r *http.Request, captchaType string, webLog *innerbean.WebLog, captchaConfig model.CaptchaConfig) { // 根据IP模式选择使用的IP clientIP := webLog.NetSrcIp if captchaConfig.IPMode == "proxy" { @@ -638,7 +638,7 @@ func (s *CaptchaService) VerifyCapJsCaptcha(w http.ResponseWriter, r *http.Reque json.NewEncoder(w).Encode(response) } -func (s *CaptchaService) ValidateCapJsCaptcha(w http.ResponseWriter, r *http.Request, configStruct model.CaptchaConfig, webLog innerbean.WebLog) { +func (s *CaptchaService) ValidateCapJsCaptcha(w http.ResponseWriter, r *http.Request, configStruct model.CaptchaConfig, webLog *innerbean.WebLog) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return diff --git a/wafenginecore/wafengine.go b/wafenginecore/wafengine.go index ab1c63d..dba5ae7 100644 --- a/wafenginecore/wafengine.go +++ b/wafenginecore/wafengine.go @@ -58,15 +58,20 @@ type WafEngine struct { //敏感词管理 Sensitive []model.Sensitive //敏感词 SensitiveManager *goahocorasick.Machine + + TransportPool map[string]*http.Transport // 添加Transport缓存池 + TransportMux sync.RWMutex // 保护Transport池的读写锁 } func (waf *WafEngine) Error() string { fs := "HTTP: %d, HostCode: %d, Message: %s" + zlog.Error(fmt.Sprintf(fs)) return fmt.Sprintf(fs) } func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) { innerLogName := "WafEngine ServeHTTP" atomic.AddUint64(&global.GWAF_RUNTIME_QPS, 1) // 原子增加计数器 + port := "" host := r.Host if !strings.Contains(host, ":") { @@ -318,7 +323,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) { BaseMessageInfo: innerbean.BaseMessageInfo{OperaType: "CC封禁提醒"}, OperaCnt: visitIPError, }) - EchoErrorInfo(w, r, weblogbean, "", "当前IP由于访问频次太高暂时无法访问", hostTarget, waf.HostTarget[waf.HostCode[global.GWAF_GLOBAL_HOST_CODE]], false) + EchoErrorInfo(w, r, &weblogbean, "", "当前IP由于访问频次太高暂时无法访问", hostTarget, waf.HostTarget[waf.HostCode[global.GWAF_GLOBAL_HOST_CODE]], false) return } @@ -358,7 +363,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) { return false } else { decrementMonitor(hostCode) - EchoErrorInfo(w, r, weblogbean, detectionResult.Title, detectionResult.Content, hostTarget, waf.HostTarget[waf.HostCode[global.GWAF_GLOBAL_HOST_CODE]], true) + EchoErrorInfo(w, r, &weblogbean, detectionResult.Title, detectionResult.Content, hostTarget, waf.HostTarget[waf.HostCode[global.GWAF_GLOBAL_HOST_CODE]], true) return true } } @@ -483,7 +488,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } if !isExcluded { - waf.handleCaptchaRequest(w, r, weblogbean, captchaConfig) + waf.handleCaptchaRequest(w, r, &weblogbean, captchaConfig) return } } @@ -564,7 +569,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) { weblogbean.STATUS = r.Response.Status weblogbean.STATUS_CODE = r.Response.StatusCode weblogbean.TASK_FLAG = 1 - global.GQEQUE_LOG_DB.Enqueue(weblogbean) + global.GQEQUE_LOG_DB.Enqueue(&weblogbean) return } } @@ -584,7 +589,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) { // 记录日志 weblogbean.RES_BODY = sHttpAuthBaseResult weblogbean.ACTION = "禁止" - global.GQEQUE_LOG_DB.Enqueue(weblogbean) + global.GQEQUE_LOG_DB.Enqueue(&weblogbean) return } } @@ -682,7 +687,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) { //记录响应body weblogbean.RES_BODY = string(resBytes) weblogbean.ACTION = "禁止" - global.GQEQUE_LOG_DB.Enqueue(weblogbean) + global.GQEQUE_LOG_DB.Enqueue(&weblogbean) } } func (waf *WafEngine) getClientIP(r *http.Request, headers ...string) (error, string, string) { @@ -970,7 +975,7 @@ func (waf *WafEngine) modifyResponse() func(*http.Response) error { weblogfrist.RULE = "敏感词检测:" + string(matchBodyResult[0].Word) } else { - EchoResponseErrorInfo(resp, *weblogfrist, "敏感词检测:"+string(matchBodyResult[0].Word), "敏感词内容", waf.HostTarget[host], waf.HostTarget[waf.HostCode[global.GWAF_GLOBAL_HOST_CODE]], true) + EchoResponseErrorInfo(resp, weblogfrist, "敏感词检测:"+string(matchBodyResult[0].Word), "敏感词内容", waf.HostTarget[host], waf.HostTarget[waf.HostCode[global.GWAF_GLOBAL_HOST_CODE]], true) return nil } } else { @@ -1150,7 +1155,7 @@ func (waf *WafEngine) StartWaf() { //第一步 检测合法性并加入到全局 waf.LoadAllHost() - wafSysLog := &model.WafSysLog{ + wafSysLog := model.WafSysLog{ BaseOrm: baseorm.BaseOrm{ Id: uuid.GenUUID(), USER_CODE: global.GWAF_USER_CODE, @@ -1161,7 +1166,7 @@ func (waf *WafEngine) StartWaf() { OpType: "信息", OpContent: "WAF启动", } - global.GQEQUE_LOG_DB.Enqueue(wafSysLog) + global.GQEQUE_LOG_DB.Enqueue(&wafSysLog) waf.StartAllProxyServer() } @@ -1198,6 +1203,12 @@ func (waf *WafEngine) CloseWaf() { Mux: sync.Mutex{}, Map: map[string]*tls.Certificate{}, } + // 清除Transport缓存池 + + waf.TransportMux.Lock() + waf.TransportPool = map[string]*http.Transport{} + defer waf.TransportMux.Unlock() + } // 清除代理 @@ -1312,7 +1323,7 @@ func (waf *WafEngine) StartProxyServer(innruntime innerbean.ServerRunTime) { OpType: "系统运行错误", OpContent: "HTTPS端口被占用: " + strconv.Itoa(innruntime.Port) + ",请检查", } - global.GQEQUE_LOG_DB.Enqueue(wafSysLog) + global.GQEQUE_LOG_DB.Enqueue(&wafSysLog) zlog.Error("[HTTPServer] https server start fail, cause:[%v]", err) } zlog.Info("server https shutdown") @@ -1351,7 +1362,7 @@ func (waf *WafEngine) StartProxyServer(innruntime innerbean.ServerRunTime) { OpType: "系统运行错误", OpContent: "HTTP端口被占用: " + strconv.Itoa(innruntime.Port) + ",请检查", } - global.GQEQUE_LOG_DB.Enqueue(wafSysLog) + global.GQEQUE_LOG_DB.Enqueue(&wafSysLog) zlog.Error("[HTTPServer] http server start fail, cause:[%v]", err) } zlog.Info("server http shutdown") diff --git a/wafinterface/wafnotify.go b/wafinterface/wafnotify.go index a2065ed..937d23c 100644 --- a/wafinterface/wafnotify.go +++ b/wafinterface/wafnotify.go @@ -3,6 +3,6 @@ package wafinterface import "SamWaf/innerbean" type WafNotify interface { - NotifyBatch(logs []innerbean.WebLog) error // 处理多个日志 - NotifySingle(log innerbean.WebLog) error // 处理单个日志 + NotifyBatch(logs []*innerbean.WebLog) error // 处理多个日志 + NotifySingle(log *innerbean.WebLog) error // 处理单个日志 } diff --git a/wafnotify/kafka/kafka.go b/wafnotify/kafka/kafka.go index 51ce720..bc9c63d 100644 --- a/wafnotify/kafka/kafka.go +++ b/wafnotify/kafka/kafka.go @@ -79,7 +79,7 @@ func (kn *KafkaNotifier) ReConnect() error { } // 实现 WafNotify 接口中的 NotifySingle 方法 -func (kn *KafkaNotifier) NotifySingle(log innerbean.WebLog) error { +func (kn *KafkaNotifier) NotifySingle(log *innerbean.WebLog) error { err := kn.sendMessage(log) if err != nil { return err @@ -89,7 +89,7 @@ func (kn *KafkaNotifier) NotifySingle(log innerbean.WebLog) error { } // 实现 WafNotify 接口中的 NotifyBatch 方法 -func (kn *KafkaNotifier) NotifyBatch(logs []innerbean.WebLog) error { +func (kn *KafkaNotifier) NotifyBatch(logs []*innerbean.WebLog) error { go func() { for _, log := range logs { if err := kn.sendMessage(log); err != nil { @@ -102,7 +102,7 @@ func (kn *KafkaNotifier) NotifyBatch(logs []innerbean.WebLog) error { } // 发送消息到 Kafka -func (kn *KafkaNotifier) sendMessage(log innerbean.WebLog) error { +func (kn *KafkaNotifier) sendMessage(log *innerbean.WebLog) error { if kn.client == nil { err := kn.ReConnect() if err != nil { diff --git a/wafnotify/wafnotify.go b/wafnotify/wafnotify.go index ef5610b..ad2af8e 100644 --- a/wafnotify/wafnotify.go +++ b/wafnotify/wafnotify.go @@ -27,7 +27,7 @@ func (ls *WafNotifyService) ChangeEnable(enable int64) { } // 处理并发送单条日志 -func (ls *WafNotifyService) ProcessSingleLog(log innerbean.WebLog) error { +func (ls *WafNotifyService) ProcessSingleLog(log *innerbean.WebLog) error { if ls.enable == 0 { return nil } @@ -47,7 +47,7 @@ func (ls *WafNotifyService) ProcessSingleLog(log innerbean.WebLog) error { } // 处理并发送多条日志 -func (ls *WafNotifyService) ProcessBatchLogs(logs []innerbean.WebLog) error { +func (ls *WafNotifyService) ProcessBatchLogs(logs []*innerbean.WebLog) error { if ls.enable == 0 { zlog.Debug("kafka没有开启") return nil diff --git a/wafqueue/log_queue.go b/wafqueue/log_queue.go index 20e0406..887bdb4 100644 --- a/wafqueue/log_queue.go +++ b/wafqueue/log_queue.go @@ -4,6 +4,8 @@ import ( "SamWaf/common/zlog" "SamWaf/global" "SamWaf/innerbean" + "SamWaf/waftask" + "strconv" "sync/atomic" "time" ) @@ -25,7 +27,8 @@ func ProcessLogDequeEngine() { zlog.Debug("正在切换数据库等待中队列") } else { - var webLogArray []innerbean.WebLog + var webLogArray []*innerbean.WebLog + batchCount := 0 for !global.GQEQUE_LOG_DB.Empty() { atomic.AddUint64(&global.GWAF_RUNTIME_LOG_PROCESS, 1) // 原子增加计数器 weblogbean, ok := global.GQEQUE_LOG_DB.Dequeue() @@ -34,8 +37,12 @@ func ProcessLogDequeEngine() { } if weblogbean != nil { // 进行类型断言将其转为具体的结构 - if logValue, ok := weblogbean.(innerbean.WebLog); ok { + if logValue, ok := weblogbean.(*innerbean.WebLog); ok { webLogArray = append(webLogArray, logValue) + batchCount++ + if batchCount > int(global.GDATA_BATCH_INSERT) { + break + } } else { //插入其他类型内容 global.GWAF_LOCAL_LOG_DB.Create(weblogbean) @@ -43,7 +50,12 @@ func ProcessLogDequeEngine() { } } if len(webLogArray) > 0 { - global.GWAF_LOCAL_LOG_DB.CreateInBatches(webLogArray, global.GDATA_BATCH_INSERT) + zlog.Info("日志队列处理协程处理日志数量:" + strconv.Itoa(len(webLogArray))) + if global.GCONFIG_LOG_PERSIST_ENABLED == 1 { + global.GWAF_LOCAL_LOG_DB.CreateInBatches(webLogArray, len(webLogArray)) + } + // 日志流做统计 + waftask.CollectStatsFromLogs(webLogArray) global.GNOTIFY_KAKFA_SERVICE.ProcessBatchLogs(webLogArray) } } diff --git a/waftask/stat_collector.go b/waftask/stat_collector.go new file mode 100644 index 0000000..c750f77 --- /dev/null +++ b/waftask/stat_collector.go @@ -0,0 +1,304 @@ +package waftask + +import ( + "SamWaf/common/uuid" + "SamWaf/common/zlog" + "SamWaf/customtype" + "SamWaf/global" + "SamWaf/innerbean" + "SamWaf/model" + "SamWaf/model/baseorm" + "time" + + "gorm.io/gorm" +) + +// 从日志流批量聚合统计并写入统计库(不依赖日志入库)。 +// 会按以下维度做增量统计: +// 1) 主机日聚合 (host_code, action, day) +// 2) IP日聚合 (host_code, ip, action, day) +// 3) 城市日聚合 (host_code, country, province, city, action, day) +// 4) IP标签计数 (ip, rule) +func CollectStatsFromLogs(logs []*innerbean.WebLog) { + if len(logs) == 0 { + return + } + + // 添加输入日志数量的调试信息 + zlog.Debug("统计收集器开始处理", "日志数量", len(logs)) + + // 安全检查:需要相关DB已初始化 + if global.GWAF_LOCAL_STATS_DB == nil || global.GWAF_LOCAL_DB == nil { + zlog.Debug("统计收集器", "数据库未初始化,跳过处理") + return + } + + type hostKey struct { + TenantId string + UserCode string + HostCode string + Action string + Day int + Host string + } + type ipKey struct { + TenantId string + UserCode string + HostCode string + Action string + Day int + Host string + IP string + } + type cityKey struct { + TenantId string + UserCode string + HostCode string + Action string + Day int + Host string + Country string + Province string + City string + } + type ipTagKey struct { + IP string + Rule string + } + + hostAgg := make(map[hostKey]int) + ipAgg := make(map[ipKey]int) + cityAgg := make(map[cityKey]int) + ipTagAgg := make(map[ipTagKey]int64) + + for _, lg := range logs { + // 主机聚合 + hk := hostKey{ + TenantId: lg.TenantId, + UserCode: lg.USER_CODE, + HostCode: lg.HOST_CODE, + Action: lg.ACTION, + Day: lg.Day, + Host: lg.HOST, + } + hostAgg[hk]++ + + // IP聚合 + ik := ipKey{ + TenantId: lg.TenantId, + UserCode: lg.USER_CODE, + HostCode: lg.HOST_CODE, + Action: lg.ACTION, + Day: lg.Day, + Host: lg.HOST, + IP: lg.SRC_IP, + } + ipAgg[ik]++ + + // 城市聚合 + ck := cityKey{ + TenantId: lg.TenantId, + UserCode: lg.USER_CODE, + HostCode: lg.HOST_CODE, + Action: lg.ACTION, + Day: lg.Day, + Host: lg.HOST, + Country: lg.COUNTRY, + Province: lg.PROVINCE, + City: lg.CITY, + } + cityAgg[ck]++ + + // IPTag 聚合 + rule := lg.RULE + if rule == "" { + rule = "正常" + } + ipTagAgg[ipTagKey{IP: lg.SRC_IP, Rule: rule}]++ + } + + // 添加聚合结果的调试信息 + zlog.Debug("聚合计算完成", + "主机聚合数量", len(hostAgg), + "IP聚合数量", len(ipAgg), + "城市聚合数量", len(cityAgg), + "IP标签聚合数量", len(ipTagAgg)) + + now := customtype.JsonTime(time.Now()) + + // 1) 主机日聚合 增量 + hostUpdateCount := 0 + hostInsertCount := 0 + for k, delta := range hostAgg { + tx := global.GWAF_LOCAL_STATS_DB.Model(&model.StatsDay{}). + Where("tenant_id = ? and user_code = ? and host_code = ? and type = ? and day = ?", + k.TenantId, k.UserCode, k.HostCode, k.Action, k.Day). + Updates(map[string]interface{}{ + "Count": gorm.Expr("Count + ?", delta), + "UPDATE_TIME": now, + }) + if tx.Error != nil { + zlog.Debug("主机聚合更新失败", "错误", tx.Error.Error(), "主机", k.HostCode, "动作", k.Action) + continue + } + if tx.RowsAffected == 0 { + // 创建新行 + err := global.GWAF_LOCAL_STATS_DB.Create(&model.StatsDay{ + BaseOrm: baseorm.BaseOrm{ + Id: uuid.GenUUID(), + USER_CODE: k.UserCode, + Tenant_ID: k.TenantId, + CREATE_TIME: now, + UPDATE_TIME: now, + }, + HostCode: k.HostCode, + Day: k.Day, + Host: k.Host, + Type: k.Action, + Count: delta, + }).Error + if err != nil { + zlog.Debug("主机聚合插入失败", "错误", err.Error(), "主机", k.HostCode, "动作", k.Action) + } else { + hostInsertCount++ + } + } else { + hostUpdateCount++ + } + } + zlog.Debug("主机聚合处理完成", "更新记录数", hostUpdateCount, "插入记录数", hostInsertCount) + + // 2) IP日聚合 增量 + ipUpdateCount := 0 + ipInsertCount := 0 + for k, delta := range ipAgg { + tx := global.GWAF_LOCAL_STATS_DB.Model(&model.StatsIPDay{}). + Where("tenant_id = ? and user_code = ? and host_code = ? and ip = ? and type = ? and day = ?", + k.TenantId, k.UserCode, k.HostCode, k.IP, k.Action, k.Day). + Updates(map[string]interface{}{ + "Count": gorm.Expr("Count + ?", delta), + "UPDATE_TIME": now, + }) + if tx.Error != nil { + zlog.Debug("IP聚合更新失败", "错误", tx.Error.Error(), "IP", k.IP, "动作", k.Action) + continue + } + if tx.RowsAffected == 0 { + err := global.GWAF_LOCAL_STATS_DB.Create(&model.StatsIPDay{ + BaseOrm: baseorm.BaseOrm{ + Id: uuid.GenUUID(), + USER_CODE: k.UserCode, + Tenant_ID: k.TenantId, + CREATE_TIME: now, + UPDATE_TIME: now, + }, + HostCode: k.HostCode, + Day: k.Day, + Host: k.Host, + Type: k.Action, + Count: delta, + IP: k.IP, + }).Error + if err != nil { + zlog.Debug("IP聚合插入失败", "错误", err.Error(), "IP", k.IP, "动作", k.Action) + } else { + ipInsertCount++ + } + } else { + ipUpdateCount++ + } + } + zlog.Debug("IP聚合处理完成", "更新记录数", ipUpdateCount, "插入记录数", ipInsertCount) + + // 3) 城市日聚合 增量 + cityUpdateCount := 0 + cityInsertCount := 0 + for k, delta := range cityAgg { + tx := global.GWAF_LOCAL_STATS_DB.Model(&model.StatsIPCityDay{}). + Where("tenant_id = ? and user_code = ? and host_code = ? and country = ? and province = ? and city = ? and type = ? and day = ?", + k.TenantId, k.UserCode, k.HostCode, k.Country, k.Province, k.City, k.Action, k.Day). + Updates(map[string]interface{}{ + "Count": gorm.Expr("Count + ?", delta), + "UPDATE_TIME": now, + }) + if tx.Error != nil { + zlog.Debug("城市聚合更新失败", "错误", tx.Error.Error(), "城市", k.City, "动作", k.Action) + continue + } + if tx.RowsAffected == 0 { + err := global.GWAF_LOCAL_STATS_DB.Create(&model.StatsIPCityDay{ + BaseOrm: baseorm.BaseOrm{ + Id: uuid.GenUUID(), + USER_CODE: k.UserCode, + Tenant_ID: k.TenantId, + CREATE_TIME: now, + UPDATE_TIME: now, + }, + HostCode: k.HostCode, + Day: k.Day, + Host: k.Host, + Type: k.Action, + Count: delta, + Country: k.Country, + Province: k.Province, + City: k.City, + }).Error + if err != nil { + zlog.Debug("城市聚合插入失败", "错误", err.Error(), "城市", k.City, "动作", k.Action) + } else { + cityInsertCount++ + } + } else { + cityUpdateCount++ + } + } + zlog.Debug("城市聚合处理完成", "更新记录数", cityUpdateCount, "插入记录数", cityInsertCount) + + // 4) IPTag 增量(主库) + ipTagUpdateCount := 0 + ipTagInsertCount := 0 + for k, delta := range ipTagAgg { + tx := global.GWAF_LOCAL_DB.Model(&model.IPTag{}). + Where("tenant_id = ? and user_code = ? and ip = ? and ip_tag = ?", + global.GWAF_TENANT_ID, global.GWAF_USER_CODE, k.IP, k.Rule). + Updates(map[string]interface{}{ + "Cnt": gorm.Expr("Cnt + ?", delta), + "UPDATE_TIME": now, + }) + if tx.Error != nil { + zlog.Debug("IP标签更新失败", "错误", tx.Error.Error(), "IP", k.IP, "规则", k.Rule) + continue + } + if tx.RowsAffected == 0 { + err := global.GWAF_LOCAL_DB.Create(&model.IPTag{ + BaseOrm: baseorm.BaseOrm{ + Id: uuid.GenUUID(), + USER_CODE: global.GWAF_USER_CODE, + Tenant_ID: global.GWAF_TENANT_ID, + CREATE_TIME: now, + UPDATE_TIME: now, + }, + IP: k.IP, + IPTag: k.Rule, + Cnt: delta, + Remarks: "", + }).Error + if err != nil { + zlog.Debug("IP标签插入失败", "错误", err.Error(), "IP", k.IP, "规则", k.Rule) + } else { + ipTagInsertCount++ + } + } else { + ipTagUpdateCount++ + } + } + zlog.Debug("IP标签处理完成", "更新记录数", ipTagUpdateCount, "插入记录数", ipTagInsertCount) + + // 总结统计信息 + zlog.Debug("统计收集器处理完成", + "总处理日志数", len(logs), + "主机聚合", map[string]interface{}{"更新": hostUpdateCount, "插入": hostInsertCount}, + "IP聚合", map[string]interface{}{"更新": ipUpdateCount, "插入": ipInsertCount}, + "城市聚合", map[string]interface{}{"更新": cityUpdateCount, "插入": cityInsertCount}, + "IP标签", map[string]interface{}{"更新": ipTagUpdateCount, "插入": ipTagInsertCount}) +} diff --git a/waftask/task_config.go b/waftask/task_config.go index 6b294bb..6830446 100644 --- a/waftask/task_config.go +++ b/waftask/task_config.go @@ -97,6 +97,9 @@ func setConfigIntValue(name string, value int64, change int) { case "enable_strict_ip_binding": global.GCONFIG_ENABLE_STRICT_IP_BINDING = value break + case "batch_insert": + global.GDATA_BATCH_INSERT = value + break default: zlog.Warn("Unknown config item:", name) } @@ -253,5 +256,6 @@ func TaskLoadSetting(initLoad bool) { // 指纹认证相关配置 updateConfigIntItem(initLoad, "security", "enable_device_fingerprint", global.GCONFIG_ENABLE_DEVICE_FINGERPRINT, "是否启用设备指纹认证(1启用 0禁用)", "options", "0|禁用,1|启用") updateConfigIntItem(initLoad, "security", "enable_strict_ip_binding", global.GCONFIG_ENABLE_STRICT_IP_BINDING, "是否启用严格IP绑定(1启用 0禁用,启用指纹时建议禁用)", "options", "0|禁用,1|启用") - + //数据库相关 + updateConfigIntItem(initLoad, "database", "batch_insert", global.GDATA_BATCH_INSERT, "数据库批量插入数量", "int", "") } diff --git a/waftask/task_counter.go b/waftask/task_counter.go index b17b5d3..77a5aa8 100644 --- a/waftask/task_counter.go +++ b/waftask/task_counter.go @@ -1,16 +1,7 @@ package waftask import ( - "SamWaf/common/uuid" - "SamWaf/common/zlog" - "SamWaf/customtype" - "SamWaf/global" - "SamWaf/innerbean" - "SamWaf/model" - "SamWaf/model/baseorm" "SamWaf/service/waf_service" - "fmt" - "time" ) var ( @@ -65,265 +56,9 @@ type CountIPRuleResult struct { */ func TaskCounter() { - // 检查是否收到关闭信号 - if global.GWAF_SHUTDOWN_SIGNAL { - zlog.Info("TaskCounter Shutdown") + + //废弃使用新形式 + if 1 == 1 { return } - - if global.GWAF_LOCAL_DB == nil || global.GWAF_LOCAL_LOG_DB == nil { - zlog.Debug("数据库没有初始化完成呢") - return - } - if global.GWAF_SWITCH_TASK_COUNTER == true { - zlog.Debug("统计还没完成,调度任务PASS") - } - global.GWAF_SWITCH_TASK_COUNTER = true - - /** - 1.首次是当前日期,查询当前时间以后的所有数据,备份当前日期 - 2.查询使用备份日期,倒退10秒,查询这个时候所有的数据 - 3. - - */ - if global.GDATA_CURRENT_CHANGE { - //如果正在切换库 跳过 - zlog.Debug("正在切换数据库等待中") - global.GWAF_SWITCH_TASK_COUNTER = false - return - } - - if global.GWAF_LAST_TIME_UNIX == 0 { - global.GWAF_LAST_TIME_UNIX = (global.GWAF_LAST_UPDATE_TIME.UnixNano()) / 1e6 - global.GWAF_SWITCH_TASK_COUNTER = false - return - } - //取大于上次时间的时 - statTimeUnix := global.GWAF_LAST_TIME_UNIX - endTimeUnix := (time.Now().Add(-5 * time.Second).UnixNano()) / 1e6 - //打印 statTimeUnix,endTimeUnix - zlog.Debug(fmt.Sprintf("counter statTimeUnix = %v endTimeUnix=%v", statTimeUnix, endTimeUnix)) - lastWebLogDbBean := wafLogService.GetUnixTimeByCounter(statTimeUnix, endTimeUnix) - if lastWebLogDbBean.REQ_UUID == "" { - zlog.Debug("当前期间没有符合条件的数据") - global.GWAF_LAST_TIME_UNIX = endTimeUnix - global.GWAF_SWITCH_TASK_COUNTER = false - return - } else { - global.GWAF_LAST_TIME_UNIX = endTimeUnix - } - //一、 主机聚合统计 - { - var resultHosts []CountHostResult - global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host FROM \"web_logs\" where task_flag = ? and unix_add_time >= ? and unix_add_time < ? and tenant_id = ? and user_code =? GROUP BY host_code, user_code,action,tenant_id,day,host", - 1, statTimeUnix, endTimeUnix, global.GWAF_TENANT_ID, global.GWAF_USER_CODE).Scan(&resultHosts) - /**** - 1.如果不存在则创建 - 2.如果存在则累加这个周期的统计数 - */ - for _, value := range resultHosts { - // 检查是否收到关闭信号 - if global.GWAF_SHUTDOWN_SIGNAL { - zlog.Info("TaskCounter - Shutdown") - return - } - var statDay model.StatsDay - global.GWAF_LOCAL_STATS_DB.Where("tenant_id = ? and user_code = ? and host_code=? and type=? and day=?", - value.TenantId, value.UserCode, value.HostCode, value.ACTION, value.Day).Find(&statDay) - - if statDay.HostCode == "" { - statDay2 := &model.StatsDay{ - BaseOrm: baseorm.BaseOrm{ - Id: uuid.GenUUID(), - USER_CODE: global.GWAF_USER_CODE, - Tenant_ID: global.GWAF_TENANT_ID, - CREATE_TIME: customtype.JsonTime(time.Now()), - UPDATE_TIME: customtype.JsonTime(time.Now()), - }, - HostCode: value.HostCode, - Day: value.Day, - Host: value.Host, - Type: value.ACTION, - Count: value.Count, - } - global.GQEQUE_STATS_DB.Enqueue(statDay2) - } else { - statDayMap := map[string]interface{}{ - "Count": value.Count + statDay.Count, - "UPDATE_TIME": customtype.JsonTime(time.Now()), - } - updateBean := innerbean.UpdateModel{ - Model: model.StatsDay{}, - Query: `tenant_id = ? and user_code= ? and host_code=? and type=? and day=?`, - Update: statDayMap, - } - updateBean.Args = append(updateBean.Args, value.TenantId, value.UserCode, value.HostCode, value.ACTION, value.Day) - global.GQEQUE_STATS_UPDATE_DB.Enqueue(updateBean) - } - } - } - - //二、 IP聚合统计 - { - var resultIP []CountIPResult - global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host,src_ip as ip FROM \"web_logs\" where task_flag = ? and unix_add_time >= ? and unix_add_time < ? and tenant_id = ? and user_code =? GROUP BY host_code, user_code,action,tenant_id,day,host,ip", - 1, statTimeUnix, endTimeUnix, global.GWAF_TENANT_ID, global.GWAF_USER_CODE).Scan(&resultIP) - /**** - 1.如果不存在则创建 - 2.如果存在则累加这个周期的统计数 - */ - for _, value := range resultIP { - // 检查是否收到关闭信号 - if global.GWAF_SHUTDOWN_SIGNAL { - zlog.Info("TaskCounter - Shutdown") - return - } - var statDay model.StatsIPDay - global.GWAF_LOCAL_STATS_DB.Where("tenant_id = ? and user_code = ? and host_code=? and ip = ? and type=? and day=?", - value.TenantId, value.UserCode, value.HostCode, value.Ip, value.ACTION, value.Day).Find(&statDay) - - if statDay.HostCode == "" { - statDay2 := &model.StatsIPDay{ - BaseOrm: baseorm.BaseOrm{ - Id: uuid.GenUUID(), - USER_CODE: global.GWAF_USER_CODE, - Tenant_ID: global.GWAF_TENANT_ID, - CREATE_TIME: customtype.JsonTime(time.Now()), - UPDATE_TIME: customtype.JsonTime(time.Now()), - }, - HostCode: value.HostCode, - Day: value.Day, - Host: value.Host, - Type: value.ACTION, - Count: value.Count, - IP: value.Ip, - } - global.GQEQUE_STATS_DB.Enqueue(statDay2) - } else { - statDayMap := map[string]interface{}{ - "Count": value.Count + statDay.Count, - "UPDATE_TIME": customtype.JsonTime(time.Now()), - } - - updateBean := innerbean.UpdateModel{ - Model: model.StatsIPDay{}, - Query: "tenant_id = ? and user_code= ? and host_code=? and ip=? and type=? and day=?", - Update: statDayMap, - } - updateBean.Args = append(updateBean.Args, value.TenantId, value.UserCode, value.HostCode, value.Ip, value.ACTION, value.Day) - global.GQEQUE_STATS_UPDATE_DB.Enqueue(updateBean) - - } - } - } - - //三、 城市信息聚合统计 - { - var resultCitys []CountCityResult - global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host,country,province,city FROM \"web_logs\" where task_flag = ? and unix_add_time >= ? and unix_add_time < ? and tenant_id = ? and user_code =? GROUP BY host_code, user_code,action,tenant_id,day,host,country,province,city", - 1, statTimeUnix, endTimeUnix, global.GWAF_TENANT_ID, global.GWAF_USER_CODE).Scan(&resultCitys) - /**** - 1.如果不存在则创建 - 2.如果存在则累加这个周期的统计数 - */ - for _, value := range resultCitys { - // 检查是否收到关闭信号 - if global.GWAF_SHUTDOWN_SIGNAL { - zlog.Info("TaskCounter - Shutdown") - return - } - var statDay model.StatsIPCityDay - global.GWAF_LOCAL_STATS_DB.Where("tenant_id = ? and user_code = ? and host_code=? and country = ? and province = ? and city = ? and type=? and day=?", - value.TenantId, value.UserCode, value.HostCode, value.Country, value.Province, value.City, value.ACTION, value.Day).Find(&statDay) - - if statDay.HostCode == "" { - statDay2 := &model.StatsIPCityDay{ - BaseOrm: baseorm.BaseOrm{ - Id: uuid.GenUUID(), - USER_CODE: global.GWAF_USER_CODE, - Tenant_ID: global.GWAF_TENANT_ID, - CREATE_TIME: customtype.JsonTime(time.Now()), - UPDATE_TIME: customtype.JsonTime(time.Now()), - }, - HostCode: value.HostCode, - Day: value.Day, - Host: value.Host, - Type: value.ACTION, - Count: value.Count, - Country: value.Country, - Province: value.Province, - City: value.City, - } - global.GQEQUE_STATS_DB.Enqueue(statDay2) - } else { - statDayMap := map[string]interface{}{ - "Count": value.Count + statDay.Count, - "UPDATE_TIME": customtype.JsonTime(time.Now()), - } - - updateBean := innerbean.UpdateModel{ - Model: model.StatsIPCityDay{}, - Query: "tenant_id = ? and user_code= ? and host_code=? and country = ? and province = ? and city = ? and type=? and day=?", - Update: statDayMap, - } - updateBean.Args = append(updateBean.Args, value.TenantId, value.UserCode, value.HostCode, value.Country, value.Province, value.City, value.ACTION, value.Day) - global.GQEQUE_STATS_UPDATE_DB.Enqueue(updateBean) - - } - } - } - - //第四 给IP打标签 开始 - { - var resultIPRule []CountIPRuleResult - global.GWAF_LOCAL_LOG_DB.Raw("SELECT src_ip as ip ,rule,count(src_ip) as cnt FROM \"web_logs\" where task_flag = ? and unix_add_time >= ? and unix_add_time < ? and tenant_id = ? and user_code =? GROUP BY user_code,tenant_id, rule,src_ip", - 1, statTimeUnix, endTimeUnix, global.GWAF_TENANT_ID, global.GWAF_USER_CODE).Scan(&resultIPRule) - /**** - 1.如果不存在则创建 - 2.如果存在则累加这个IP这个rule的统计数 - */ - for _, value := range resultIPRule { - // 检查是否收到关闭信号 - if global.GWAF_SHUTDOWN_SIGNAL { - zlog.Info("TaskCounter - Shutdown") - return - } - if value.Rule == "" { - value.Rule = "正常" - } - var ipTag model.IPTag - global.GWAF_LOCAL_DB.Where("tenant_id = ? and user_code = ? and ip=? and ip_tag = ?", - global.GWAF_TENANT_ID, global.GWAF_USER_CODE, value.Ip, value.Rule).Find(&ipTag) - if ipTag.IP == "" { - insertIpTag := &model.IPTag{ - BaseOrm: baseorm.BaseOrm{ - Id: uuid.GenUUID(), - USER_CODE: global.GWAF_USER_CODE, - Tenant_ID: global.GWAF_TENANT_ID, - CREATE_TIME: customtype.JsonTime(time.Now()), - UPDATE_TIME: customtype.JsonTime(time.Now()), - }, - IP: value.Ip, - IPTag: value.Rule, - Cnt: value.Cnt, - Remarks: "", - } - global.GQEQUE_DB.Enqueue(insertIpTag) - } else { - ipTagUpdateMap := map[string]interface{}{ - "Cnt": value.Cnt + ipTag.Cnt, - "UPDATE_TIME": customtype.JsonTime(time.Now()), - } - updateBean := innerbean.UpdateModel{ - Model: model.IPTag{}, - Query: "tenant_id = ? and user_code= ? and ip=? and ip_tag = ?", - Update: ipTagUpdateMap, - } - updateBean.Args = append(updateBean.Args, global.GWAF_TENANT_ID, global.GWAF_USER_CODE, value.Ip, value.Rule) - global.GQEQUE_UPDATE_DB.Enqueue(updateBean) - } - } - - } //给IP打标签结束 - global.GWAF_SWITCH_TASK_COUNTER = false }