feat:improve log attacking

#451
This commit is contained in:
samwaf
2025-09-19 11:26:00 +08:00
parent 951835c6aa
commit 1be1f974e2
22 changed files with 1944 additions and 316 deletions

View File

@@ -79,7 +79,7 @@ func (w *WafLoginApi) LoginApi(c *gin.Context) {
OpType: "登录信息",
OpContent: loginError,
}
global.GQEQUE_LOG_DB.Enqueue(wafSysLog)
global.GQEQUE_LOG_DB.Enqueue(&wafSysLog)
global.GQEQUE_MESSAGE_DB.Enqueue(innerbean.OperatorMessageInfo{
BaseMessageInfo: innerbean.BaseMessageInfo{OperaType: "登录错误"},
@@ -144,7 +144,7 @@ func (w *WafLoginApi) LoginApi(c *gin.Context) {
OpType: "登录信息",
OpContent: noticeStr,
}
global.GQEQUE_LOG_DB.Enqueue(wafSysLog)
global.GQEQUE_LOG_DB.Enqueue(&wafSysLog)
response.OkWithDetailed(response2.LoginRep{
AccessToken: tokenInfo.AccessToken,

2
cache/waf_cache.go vendored
View File

@@ -3,7 +3,6 @@ package cache
import (
"SamWaf/common/zlog"
"errors"
"fmt"
"strings"
"sync"
"time"
@@ -33,7 +32,6 @@ func (wafCache *WafCache) Set(key string, value interface{}) {
}
func (wafCache *WafCache) SetWithTTl(key string, value interface{}, ttl time.Duration) {
fmt.Println(ttl)
wafCache.mu.Lock()
defer wafCache.mu.Unlock()
createTime := time.Now()

View File

@@ -0,0 +1,339 @@
package flow
import (
"SamWaf/common/zlog"
"fmt"
"strings"
"time"
)
// LogAnomalyHandler 日志记录处理器 - 使用zlog
func LogAnomalyHandler(result *DetectionResult) {
timestamp := time.Now().Format("2006-01-02 15:04:05")
// 使用zlog记录日志
zlog.Info("流量异常检测告警", map[string]interface{}{
"timestamp": timestamp,
"current_value": result.CurrentValue,
"mean": result.Mean,
"deviation": result.Deviation,
"threshold": result.Threshold,
"confidence": result.Confidence,
"is_anomaly": result.IsAnomaly,
"window_size": result.WindowSize,
"detail": result.String(),
})
}
// AlertAnomalyHandler 告警处理器 - 使用zlog
func AlertAnomalyHandler(result *DetectionResult) {
// 使用zlog记录告警信息
zlog.Warn("流量异常告警", map[string]interface{}{
"alert_type": "traffic_anomaly",
"current_value": result.CurrentValue,
"mean": result.Mean,
"deviation": result.Deviation,
"confidence": result.Confidence,
"severity": getSeverityLevel(result),
})
fmt.Printf("🚨 异常告警: %s\n", result.String())
// 示例:发送到告警通道
alertMsg := fmt.Sprintf("检测到流量异常!当前值: %.2f, 均值: %.2f, 偏离度: %.2f, 置信度: %s",
result.CurrentValue, result.Mean, result.Deviation, result.Confidence)
// 这里可以调用实际的告警接口
sendAlert(alertMsg)
}
// BlockAnomalyHandler 阻断处理器 - 使用zlog
func BlockAnomalyHandler(result *DetectionResult) {
if result.Deviation > result.Threshold*2 { // 严重异常才阻断
// 使用zlog记录阻断操作
zlog.Error("严重异常流量阻断", map[string]interface{}{
"action": "block_traffic",
"current_value": result.CurrentValue,
"deviation": result.Deviation,
"threshold": result.Threshold,
"severity": "critical",
"reason": "deviation_exceeds_2x_threshold",
})
fmt.Printf("🛑 严重异常,执行阻断: %s\n", result.String())
// 这里可以调用防火墙或限流接口
blockTraffic(result)
}
}
// LimitAnomalyHandler 限流处理器 - 使用zlog
func LimitAnomalyHandler(result *DetectionResult) {
if result.IsAnomaly {
// 使用zlog记录限流操作
zlog.Warn("异常流量限流", map[string]interface{}{
"action": "limit_traffic",
"current_value": result.CurrentValue,
"mean": result.Mean,
"deviation": result.Deviation,
"severity": getSeverityLevel(result),
})
fmt.Printf("⚠️ 异常流量,启动限流: %s\n", result.String())
// 这里可以调用限流接口
limitTraffic(result)
}
}
// CustomAnomalyHandler 自定义处理器示例 - 使用zlog
func CustomAnomalyHandler(result *DetectionResult) {
// 根据异常程度执行不同处理
switch {
case result.Deviation > result.Threshold*3:
// 极严重异常:立即阻断+告警
zlog.Error("极严重流量异常", map[string]interface{}{
"action": "immediate_block_and_alert",
"current_value": result.CurrentValue,
"deviation_ratio": result.Deviation / result.Threshold,
"severity": "critical",
"auto_action": true,
})
fmt.Printf("🔥 极严重异常,立即处理: %s\n", result.String())
blockTraffic(result)
sendAlert(fmt.Sprintf("极严重流量异常: %.2f", result.CurrentValue))
case result.Deviation > result.Threshold*2:
// 严重异常:限流+告警
zlog.Warn("严重流量异常", map[string]interface{}{
"action": "limit_and_alert",
"current_value": result.CurrentValue,
"deviation_ratio": result.Deviation / result.Threshold,
"severity": "high",
})
fmt.Printf("⚡ 严重异常,限流处理: %s\n", result.String())
limitTraffic(result)
sendAlert(fmt.Sprintf("严重流量异常: %.2f", result.CurrentValue))
default:
// 一般异常:仅记录
zlog.Info("一般流量异常", map[string]interface{}{
"action": "log_only",
"current_value": result.CurrentValue,
"deviation_ratio": result.Deviation / result.Threshold,
"severity": "medium",
})
fmt.Printf("📝 一般异常,记录日志: %s\n", result.String())
LogAnomalyHandler(result)
}
}
// getSeverityLevel 获取异常严重程度
func getSeverityLevel(result *DetectionResult) string {
if result.Threshold == 0 {
return "unknown"
}
ratio := result.Deviation / result.Threshold
switch {
case ratio > 3:
return "critical"
case ratio > 2:
return "high"
case ratio > 1:
return "medium"
default:
return "low"
}
}
// 辅助函数(需要根据实际系统实现)
func sendAlert(message string) {
// 实现告警发送逻辑
zlog.Info("发送告警", map[string]interface{}{
"alert_message": message,
"alert_type": "traffic_anomaly",
})
}
func blockTraffic(result *DetectionResult) {
// 实现流量阻断逻辑
zlog.Error("执行流量阻断", map[string]interface{}{
"blocked_value": result.CurrentValue,
"reason": "anomaly_detection",
"action_taken": "traffic_blocked",
})
}
func limitTraffic(result *DetectionResult) {
// 实现流量限制逻辑
zlog.Warn("执行流量限制", map[string]interface{}{
"limited_value": result.CurrentValue,
"reason": "anomaly_detection",
"action_taken": "traffic_limited",
})
}
// RecoveryLogHandler 恢复日志处理器
func RecoveryLogHandler(result *DetectionResult) {
timestamp := time.Now().Format("2006-01-02 15:04:05")
// 使用zlog记录恢复日志
zlog.Info("流量异常恢复", map[string]interface{}{
"timestamp": timestamp,
"current_value": result.CurrentValue,
"mean": result.Mean,
"recovery_count": result.RecoveryCount,
"status": "recovered",
"detail": fmt.Sprintf("系统已恢复正常,当前值: %.2f, 均值: %.2f", result.CurrentValue, result.Mean),
})
fmt.Printf("✅ 系统恢复正常: 当前值=%.2f, 均值=%.2f\n", result.CurrentValue, result.Mean)
}
// RecoveryUnblockHandler 恢复解除阻断处理器
func RecoveryUnblockHandler(result *DetectionResult) {
// 使用zlog记录解除阻断
zlog.Info("解除流量阻断", map[string]interface{}{
"action": "unblock_traffic",
"current_value": result.CurrentValue,
"mean": result.Mean,
"status": "unblocked",
"reason": "traffic_recovered",
})
fmt.Printf("🔓 解除流量阻断: 当前值=%.2f\n", result.CurrentValue)
// 这里调用实际的解除阻断接口
unblockTraffic(result)
}
// RecoveryUnlimitHandler 恢复解除限流处理器
func RecoveryUnlimitHandler(result *DetectionResult) {
// 使用zlog记录解除限流
zlog.Info("解除流量限制", map[string]interface{}{
"action": "unlimit_traffic",
"current_value": result.CurrentValue,
"mean": result.Mean,
"status": "unlimited",
"reason": "traffic_recovered",
})
fmt.Printf("🔄 解除流量限制: 当前值=%.2f\n", result.CurrentValue)
// 这里调用实际的解除限流接口
unlimitTraffic(result)
}
// RecoveryAlertHandler 恢复告警处理器
func RecoveryAlertHandler(result *DetectionResult) {
// 使用zlog记录恢复告警
zlog.Info("流量恢复告警", map[string]interface{}{
"alert_type": "traffic_recovery",
"current_value": result.CurrentValue,
"mean": result.Mean,
"status": "recovered",
})
fmt.Printf("📢 恢复告警: 流量已恢复正常\n")
// 发送恢复通知
sendRecoveryAlert(fmt.Sprintf("流量已恢复正常,当前值: %.2f", result.CurrentValue))
}
// SustainedHighAnomalyHandler 持续高位专用处理器
func SustainedHighAnomalyHandler(result *DetectionResult) {
if strings.Contains(result.Confidence, "持续高位") {
// 使用zlog记录持续高位告警
zlog.Error("系统持续高位运行告警", map[string]interface{}{
"alert_type": "sustained_high_operation",
"current_value": result.CurrentValue,
"mean": result.Mean,
"deviation": result.Deviation,
"confidence": result.Confidence,
"severity": "critical",
"recommended_action": "review_system_capacity_and_scaling",
"impact": "potential_performance_degradation",
})
fmt.Printf("🔥 系统持续高位运行告警: %s\n", result.String())
// 发送特殊告警
sendSustainedHighAlert(result)
}
}
// DualWindowAnomalyHandler 双窗口专用异常处理器
func DualWindowAnomalyHandler(result *DetectionResult) {
switch {
case strings.Contains(result.Confidence, "持续高位"):
// 持续高位运行处理
zlog.Warn("持续高位运行检测", map[string]interface{}{
"detection_type": "sustained_high",
"current_value": result.CurrentValue,
"deviation": result.Deviation,
"confidence": result.Confidence,
"action": "capacity_review_needed",
})
fmt.Printf("📈 持续高位: %s\n", result.String())
case result.Deviation > result.Threshold*3:
// 极严重异常
zlog.Error("极严重流量异常", map[string]interface{}{
"detection_type": "critical_anomaly",
"current_value": result.CurrentValue,
"deviation_ratio": result.Deviation / result.Threshold,
"action": "immediate_intervention",
})
fmt.Printf("🚨 极严重异常: %s\n", result.String())
default:
// 一般异常
zlog.Info("一般流量异常", map[string]interface{}{
"detection_type": "normal_anomaly",
"current_value": result.CurrentValue,
"confidence": result.Confidence,
})
fmt.Printf("⚠️ 一般异常: %s\n", result.String())
}
}
// 辅助函数
func sendSustainedHighAlert(result *DetectionResult) {
alertMsg := fmt.Sprintf("系统持续高位运行告警!当前值: %.2f, 建议检查系统容量和扩容策略", result.CurrentValue)
zlog.Warn("发送持续高位告警", map[string]interface{}{
"alert_message": alertMsg,
"alert_type": "sustained_high_operation",
"priority": "high",
})
// 这里可以调用实际的告警接口
// 例如:发送邮件、短信、钉钉通知等
}
func unblockTraffic(result *DetectionResult) {
// 实现解除流量阻断逻辑
zlog.Info("执行解除流量阻断", map[string]interface{}{
"unblocked_value": result.CurrentValue,
"reason": "traffic_recovered",
"action_taken": "traffic_unblocked",
})
}
func unlimitTraffic(result *DetectionResult) {
// 实现解除流量限制逻辑
zlog.Info("执行解除流量限制", map[string]interface{}{
"unlimited_value": result.CurrentValue,
"reason": "traffic_recovered",
"action_taken": "traffic_unlimited",
})
}
func sendRecoveryAlert(message string) {
// 实现恢复告警发送逻辑
zlog.Info("发送恢复告警", map[string]interface{}{
"alert_message": message,
"alert_type": "traffic_recovery",
})
}

View File

@@ -0,0 +1,435 @@
package flow
import (
"fmt"
"math"
)
// AnomalyHandler 异常处理回调函数类型
type AnomalyHandler func(result *DetectionResult)
// AnomalyAction 异常处理动作类型
type AnomalyAction int
const (
ActionLog AnomalyAction = iota // 记录日志
ActionAlert // 发送告警
ActionBlock // 阻断流量
ActionLimit // 限制流量
ActionCustom // 自定义处理
)
// AnomalyProcessor 异常处理器
type AnomalyProcessor struct {
Action AnomalyAction
Handler AnomalyHandler
Enabled bool
Name string
// 新增恢复处理器
RecoveryHandler AnomalyHandler
}
// 常量定义
const (
// MinWindowSize 最小窗口大小
MinWindowSize = 2
// InvalidThreshold 无效阈值标识
InvalidThreshold = -1
)
// DetectionResult 异常检测结果
type DetectionResult struct {
IsAnomaly bool // 是否异常
CurrentValue float64 // 当前值
Mean float64 // 均值
StdDev float64 // 标准差
Threshold float64 // 异常阈值 (k * 标准差)
Deviation float64 // 偏离度 (当前值与均值的差)
WindowData []float64 // 当前窗口数据
WindowSize int // 窗口大小
Confidence string // 置信度描述
// 新增恢复相关字段
IsRecovered bool // 是否从异常状态恢复
RecoveryCount int // 连续正常计数
}
// String 返回检测结果的字符串表示
func (r *DetectionResult) String() string {
if !r.IsAnomaly {
return fmt.Sprintf("正常值: %.2f (均值: %.2f, 标准差: %.2f)",
r.CurrentValue, r.Mean, r.StdDev)
}
return fmt.Sprintf("异常值: %.2f (均值: %.2f, 偏离: %.2f, 阈值: %.2f, %s)",
r.CurrentValue, r.Mean, r.Deviation, r.Threshold, r.Confidence)
}
// MeanStdDetector 滑动窗口均值标准差异常检测器
type MeanStdDetector struct {
window []float64 // 滑动窗口数据
size int // 窗口大小
k float64 // 异常检测系数(通常取2-3)
sum float64 // 窗口数据总和(优化计算)
sumSq float64 // 窗口数据平方和(优化计算)
processors []AnomalyProcessor // 异常处理器列表
// 新增恢复相关字段
isInAnomalyState bool // 当前是否处于异常状态
normalCount int // 连续正常值计数
recoveryThreshold int // 恢复需要的连续正常值数量
}
// NewMeanStdDetector 创建新的异常检测器
// size: 滑动窗口大小建议10-50
// k: 异常检测系数k=2(95%置信度), k=3(99.7%置信度)
func NewMeanStdDetector(size int, k float64) *MeanStdDetector {
if size < MinWindowSize {
size = MinWindowSize
}
if k <= 0 {
k = 2.0 // 默认2倍标准差
}
return &MeanStdDetector{
size: size,
k: k,
recoveryThreshold: 5, // 默认需要5个连续正常值才认为恢复
}
}
// AddValue 添加新的数据点到滑动窗口
func (d *MeanStdDetector) AddValue(value float64) {
// 如果窗口已满,移除最旧的数据
if len(d.window) >= d.size {
old := d.window[0]
d.window = d.window[1:]
d.sum -= old
d.sumSq -= old * old
}
// 添加新数据
d.window = append(d.window, value)
d.sum += value
d.sumSq += value * value
}
// IsAnomaly 简单的异常检测,只返回是否异常
func (d *MeanStdDetector) IsAnomaly(value float64) bool {
return d.DetectAnomaly(value).IsAnomaly
}
// DetectAnomaly 完整的异常检测,返回详细结果
func (d *MeanStdDetector) DetectAnomaly(value float64) *DetectionResult {
result := &DetectionResult{
CurrentValue: value,
WindowSize: len(d.window),
}
// 窗口数据不足,无法检测
if len(d.window) < MinWindowSize {
result.IsAnomaly = false
result.Threshold = InvalidThreshold
result.Confidence = "数据不足"
return result
}
// 计算统计指标
n := float64(len(d.window))
mean := d.sum / n
variance := (d.sumSq - (d.sum*d.sum)/n) / n
stdDev := math.Sqrt(math.Max(variance, 0))
threshold := d.k * stdDev
deviation := math.Abs(value - mean)
// 填充结果
result.Mean = mean
result.StdDev = stdDev
result.Threshold = threshold
result.Deviation = deviation
result.WindowData = make([]float64, len(d.window))
copy(result.WindowData, d.window)
// 判断是否异常
isCurrentAnomaly := deviation > threshold
result.IsAnomaly = isCurrentAnomaly
// 恢复逻辑处理
if !isCurrentAnomaly {
// 当前值正常
d.normalCount++
result.RecoveryCount = d.normalCount
// 检查是否从异常状态恢复
if d.isInAnomalyState && d.normalCount >= d.recoveryThreshold {
result.IsRecovered = true
d.isInAnomalyState = false
d.normalCount = 0 // 重置计数
}
} else {
// 当前值异常
d.isInAnomalyState = true
d.normalCount = 0 // 重置正常计数
}
// 设置置信度描述
if stdDev == 0 {
if value == mean {
result.Confidence = "完全正常"
} else {
result.IsAnomaly = true
result.Confidence = "明显异常(零方差)"
}
} else {
sigmaLevel := deviation / stdDev
switch {
case sigmaLevel > 3:
result.Confidence = "高度异常(>3σ)"
case sigmaLevel > 2:
result.Confidence = "中度异常(>2σ)"
case sigmaLevel > 1:
result.Confidence = "轻微异常(>1σ)"
default:
result.Confidence = "正常范围"
}
}
return result
}
// SetRecoveryThreshold 设置恢复阈值
func (d *MeanStdDetector) SetRecoveryThreshold(threshold int) {
if threshold > 0 {
d.recoveryThreshold = threshold
}
}
// GetRecoveryStatus 获取恢复状态信息
func (d *MeanStdDetector) GetRecoveryStatus() map[string]interface{} {
return map[string]interface{}{
"is_in_anomaly_state": d.isInAnomalyState,
"normal_count": d.normalCount,
"recovery_threshold": d.recoveryThreshold,
"recovery_progress": float64(d.normalCount) / float64(d.recoveryThreshold),
}
}
// GetWindowStats 获取当前窗口的统计信息
func (d *MeanStdDetector) GetWindowStats() map[string]interface{} {
if len(d.window) == 0 {
return map[string]interface{}{
"window_size": 0,
"mean": 0,
"std_dev": 0,
"min": 0,
"max": 0,
}
}
n := float64(len(d.window))
mean := d.sum / n
variance := (d.sumSq - (d.sum*d.sum)/n) / n
stdDev := math.Sqrt(math.Max(variance, 0))
// 计算最小值和最大值
min, max := d.window[0], d.window[0]
for _, v := range d.window[1:] {
if v < min {
min = v
}
if v > max {
max = v
}
}
return map[string]interface{}{
"window_size": len(d.window),
"mean": mean,
"std_dev": stdDev,
"min": min,
"max": max,
"threshold": d.k * stdDev,
"k_factor": d.k,
}
}
// Reset 重置检测器
func (d *MeanStdDetector) Reset() {
d.window = nil
d.sum = 0
d.sumSq = 0
}
// 保持向后兼容的方法
// Add 添加数据点(向后兼容)
func (d *MeanStdDetector) Add(value float64) {
d.AddValue(value)
}
// IsAnomalyPrintValue 返回异常状态和阈值(向后兼容)
func (d *MeanStdDetector) IsAnomalyPrintValue(value float64) (bool, float64) {
result := d.DetectAnomaly(value)
if result.Threshold == InvalidThreshold {
return false, InvalidThreshold
}
return result.IsAnomaly, result.Threshold
}
// IsAnomalyPrintFull 返回异常状态和窗口数据(向后兼容)
func (d *MeanStdDetector) IsAnomalyPrintFull(value float64) (bool, []float64) {
result := d.DetectAnomaly(value)
return result.IsAnomaly, result.WindowData
}
// AddAnomalyProcessor 添加异常处理器
func (d *MeanStdDetector) AddAnomalyProcessor(processor AnomalyProcessor) {
d.processors = append(d.processors, processor)
}
// RemoveAnomalyProcessor 移除异常处理器
func (d *MeanStdDetector) RemoveAnomalyProcessor(name string) {
for i, processor := range d.processors {
if processor.Name == name {
d.processors = append(d.processors[:i], d.processors[i+1:]...)
break
}
}
}
// processAnomaly 处理异常情况
func (d *MeanStdDetector) processAnomaly(result *DetectionResult) {
// 处理异常
if result.IsAnomaly {
// 执行所有启用的异常处理器
for _, processor := range d.processors {
if processor.Enabled && processor.Handler != nil {
processor.Handler(result)
}
}
}
// 处理恢复
if result.IsRecovered {
// 执行恢复处理器
for _, processor := range d.processors {
if processor.Enabled && processor.RecoveryHandler != nil {
processor.RecoveryHandler(result)
}
}
}
}
// DetectAnomalyWithProcessing 检测异常并自动处理
func (d *MeanStdDetector) DetectAnomalyWithProcessing(value float64) *DetectionResult {
result := d.DetectAnomaly(value)
// 如果检测到异常,执行处理逻辑
d.processAnomaly(result)
return result
}
// 双窗口异常检测器
type DualWindowDetector struct {
shortWindow *MeanStdDetector // 短期窗口(快速响应)
longWindow *MeanStdDetector // 长期窗口(稳定基线)
// 状态管理
sustainedHighCount int
maxSustainedHigh int
// 异常处理器
processors []AnomalyProcessor
}
func NewDualWindowDetector(shortSize, longSize int, k float64) *DualWindowDetector {
return &DualWindowDetector{
shortWindow: NewMeanStdDetector(shortSize, k),
longWindow: NewMeanStdDetector(longSize, k),
maxSustainedHigh: 20, // 最大持续高位次数
}
}
// AddValue 向两个窗口添加数据
func (d *DualWindowDetector) AddValue(value float64) {
d.shortWindow.AddValue(value)
d.longWindow.AddValue(value)
}
// AddAnomalyProcessor 添加异常处理器
func (d *DualWindowDetector) AddAnomalyProcessor(processor AnomalyProcessor) {
d.processors = append(d.processors, processor)
}
// SetSustainedHighThreshold 设置持续高位阈值
func (d *DualWindowDetector) SetSustainedHighThreshold(threshold int) {
if threshold > 0 {
d.maxSustainedHigh = threshold
}
}
func (d *DualWindowDetector) DetectAnomaly(value float64) *DetectionResult {
shortResult := d.shortWindow.DetectAnomaly(value)
//longResult := d.longWindow.DetectAnomaly(value)
// 获取统计信息
shortStats := d.shortWindow.GetWindowStats()
longStats := d.longWindow.GetWindowStats()
shortMean := shortStats["mean"].(float64)
longMean := longStats["mean"].(float64)
// 检测持续高位状态
if len(d.shortWindow.window) >= MinWindowSize && len(d.longWindow.window) >= MinWindowSize {
if shortMean > longMean*1.5 { // 短期均值显著高于长期均值
d.sustainedHighCount++
} else {
d.sustainedHighCount = 0
}
// 如果持续高位时间过长,发出警告
if d.sustainedHighCount > d.maxSustainedHigh {
shortResult.Confidence = fmt.Sprintf("持续高位运行警告(%d次)", d.sustainedHighCount)
shortResult.IsAnomaly = true
// 添加持续高位的特殊标记
shortResult.Deviation = shortMean - longMean
}
}
return shortResult
}
// DetectAnomalyWithProcessing 检测异常并自动处理
func (d *DualWindowDetector) DetectAnomalyWithProcessing(value float64) *DetectionResult {
result := d.DetectAnomaly(value)
// 执行异常处理器
if result.IsAnomaly {
for _, processor := range d.processors {
if processor.Enabled && processor.Handler != nil {
processor.Handler(result)
}
}
}
return result
}
// GetDualWindowStats 获取双窗口统计信息
func (d *DualWindowDetector) GetDualWindowStats() map[string]interface{} {
shortStats := d.shortWindow.GetWindowStats()
longStats := d.longWindow.GetWindowStats()
return map[string]interface{}{
"short_window": shortStats,
"long_window": longStats,
"sustained_high_count": d.sustainedHighCount,
"max_sustained_high": d.maxSustainedHigh,
"short_mean": shortStats["mean"],
"long_mean": longStats["mean"],
}
}
// Reset 重置双窗口检测器
func (d *DualWindowDetector) Reset() {
d.shortWindow.Reset()
d.longWindow.Reset()
d.sustainedHighCount = 0
}

View File

@@ -0,0 +1,658 @@
package flow
import (
"SamWaf/common/zlog"
"SamWaf/global"
"fmt"
"strings"
"testing"
"time"
)
func TestNewMeanStdDetector(t *testing.T) {
d := NewMeanStdDetector(10, 3)
if d == nil {
t.Fatal("NewMeanStdDetector failed")
}
//模拟测试 刚开始正常->异常->正常
// 第一阶段:添加正常数据,建立基线
normalValues := []float64{10.0, 11.0, 9.0, 10.5, 9.5, 10.2, 9.8, 10.1, 9.9, 10.3}
for _, value := range normalValues {
d.Add(value)
// 在建立基线阶段,不应该检测到异常
if d.IsAnomaly(value) {
t.Logf("正常阶段检测到异常值: %.2f (这在建立基线时是正常的)", value)
}
}
t.Logf("第一阶段完成:已添加%d个正常基线数据", len(normalValues))
// 第二阶段:测试异常检测
anomalyValues := []float64{25.0, 30.0, -5.0, 35.0} // 明显偏离正常范围的异常值
anomalyDetected := 0
for _, value := range anomalyValues {
if d.IsAnomaly(value) {
anomalyDetected++
t.Logf("检测到异常值: %.2f", value)
} else {
t.Logf("未检测到异常值: %.2f (可能需要调整k值)", value)
}
d.Add(value) // 添加到窗口中
}
if anomalyDetected == 0 {
t.Error("异常检测失败:没有检测到任何异常值")
} else {
t.Logf("第二阶段完成:检测到%d个异常值", anomalyDetected)
}
// 第三阶段:回归正常
normalValues2 := []float64{10.1, 9.9, 10.2, 9.8, 10.0, 9.7, 10.3, 9.6, 10.4}
normalDetected := 0
for _, value := range normalValues2 {
if !d.IsAnomaly(value) {
normalDetected++
t.Logf("正常值: %.2f", value)
} else {
t.Logf("仍被检测为异常: %.2f (滑动窗口还在调整中)", value)
}
d.Add(value)
}
t.Logf("第三阶段完成:%d个值被识别为正常", normalDetected)
// 验证最终状态:最后几个正常值应该不被检测为异常
finalNormalValues := []float64{10.0, 9.9, 10.1, 0}
finalNormalCount := 0
for _, value := range finalNormalValues {
if !d.IsAnomaly(value) {
finalNormalCount++
}
}
if finalNormalCount < 2 {
t.Error("最终正常化测试失败:系统未能回归正常状态")
} else {
t.Logf("测试成功:系统已回归正常状态,%d/%d个最终值被正确识别为正常", finalNormalCount, len(finalNormalValues))
}
}
// 额外的测试用例:测试边界条件
func TestMeanStdDetectorEdgeCases(t *testing.T) {
d := NewMeanStdDetector(5, 2.0)
// 测试窗口未满时的行为
if d.IsAnomaly(100.0) {
t.Error("窗口数据不足时不应检测异常")
}
d.Add(10.0)
if d.IsAnomaly(100.0) {
t.Error("窗口数据不足时不应检测异常")
}
// 测试相同值的情况
for i := 0; i < 5; i++ {
d.Add(10.0)
}
// 当所有值都相同时标准差为0任何不同的值都应该被检测为异常
if !d.IsAnomaly(15.0) {
t.Error("当标准差为0时不同的值应该被检测为异常")
}
if d.IsAnomaly(10.0) {
t.Error("相同的值不应该被检测为异常")
}
}
// 性能测试
func TestMeanStdDetectorPerformance(t *testing.T) {
d := NewMeanStdDetector(100, 2.0)
// 添加大量数据测试性能
for i := 0; i < 10000; i++ {
value := float64(i%20 + 10) // 生成10-29之间的循环数据
d.Add(value)
d.IsAnomaly(value)
}
t.Log("性能测试完成处理了10000个数据点")
}
// 手动测试
func TestMeanStdDetectorManual(t *testing.T) {
d := NewMeanStdDetector(10, 3)
if d == nil {
t.Fatal("NewMeanStdDetector failed")
}
normalValues := []float64{10.0, 11.0, 9.0, 10.5, 9.5, 10.2, 9.8, 10.1, 9.9, 10.3, 5000}
for _, value := range normalValues {
d.Add(value)
anomaly, window := d.IsAnomalyPrintFull(value)
if anomaly {
t.Logf("正常阶段检测到异常值: %.2f (这在建立基线时是正常的) 当前window数据: %v", value, window)
}
}
}
// 在现有测试文件中添加新的测试函数
// TestMeanStdDetectorImproved 改进后的测试示例
func TestMeanStdDetectorImproved(t *testing.T) {
// 创建检测器窗口大小102倍标准差阈值
detector := NewMeanStdDetector(10, 2.0)
// 模拟正常流量数据
normalTraffic := []float64{100, 105, 95, 110, 90, 102, 98, 107, 93, 101}
fmt.Println("=== 正常流量阶段 ===")
for _, traffic := range normalTraffic {
detector.AddValue(traffic)
result := detector.DetectAnomaly(traffic)
fmt.Printf("%s\n", result.String())
}
// 模拟异常流量
anomalyTraffic := []float64{200, 50, 300, 10}
fmt.Println("\n=== 异常流量检测 ===")
for _, traffic := range anomalyTraffic {
result := detector.DetectAnomaly(traffic)
fmt.Printf("%s\n", result.String())
// 添加到窗口中
detector.AddValue(traffic)
}
// 打印当前窗口统计信息
fmt.Println("\n=== 窗口统计信息 ===")
stats := detector.GetWindowStats()
for key, value := range stats {
fmt.Printf("%s: %.2f\n", key, value)
}
}
// TestFlowAnomalyDetectionDemo 流量异常检测演示
func TestFlowAnomalyDetectionDemo(t *testing.T) {
// 创建流量异常检测器
flowDetector := NewMeanStdDetector(20, 2.5) // 20个数据点窗口2.5倍标准差
// 模拟一天的网络流量数据 (MB/s)
dailyTraffic := []float64{
// 凌晨低流量
10, 8, 12, 9, 11, 7, 13, 10,
// 上午逐渐增加
15, 18, 22, 25, 30, 35, 40,
// 中午高峰
45, 50, 48, 52, 47,
// 下午稳定
40, 42, 38, 41, 39,
// 异常流量攻击
150, 200, 180, 220,
// 恢复正常
35, 40, 38, 42, 36,
}
fmt.Println("=== 网络流量异常检测演示 ===")
anomalyCount := 0
for i, traffic := range dailyTraffic {
// 先检测再添加
result := flowDetector.DetectAnomaly(traffic)
if result.IsAnomaly {
anomalyCount++
fmt.Printf("[%02d] ⚠️ %s\n", i+1, result.String())
} else {
fmt.Printf("[%02d] ✅ %s\n", i+1, result.String())
}
// 添加到检测器
flowDetector.AddValue(traffic)
}
fmt.Printf("\n检测完成共发现 %d 个异常流量点\n", anomalyCount)
// 最终统计
stats := flowDetector.GetWindowStats()
fmt.Printf("最终窗口统计:均值=%.1f, 标准差=%.1f, 阈值=%.1f\n",
stats["mean"], stats["std_dev"], stats["threshold"])
}
// TestFlowAnomalyDetectionWithHandling 带异常处理的流量检测测试
func TestFlowAnomalyDetectionWithHandling(t *testing.T) {
// 创建流量异常检测器
flowDetector := NewMeanStdDetector(20, 2.5)
// 添加使用zlog的异常处理器
flowDetector.AddAnomalyProcessor(AnomalyProcessor{
Action: ActionLog,
Handler: LogAnomalyHandler, // 现在使用zlog
Enabled: true,
Name: "zlog_logger",
})
flowDetector.AddAnomalyProcessor(AnomalyProcessor{
Action: ActionAlert,
Handler: AlertAnomalyHandler, // 现在使用zlog
Enabled: true,
Name: "zlog_alerter",
})
flowDetector.AddAnomalyProcessor(AnomalyProcessor{
Action: ActionCustom,
Handler: CustomAnomalyHandler, // 现在使用zlog
Enabled: true,
Name: "zlog_custom",
})
// 模拟流量数据
dailyTraffic := []float64{
// 正常流量
10, 8, 12, 9, 11, 7, 13, 10, 15, 18,
// 异常攻击流量
150, 200, 180, 220,
// 恢复正常
35, 40, 38, 42, 36,
}
fmt.Println("=== 带异常处理的流量检测演示 ===")
anomalyCount := 0
for i, traffic := range dailyTraffic {
// 使用带处理的检测方法
result := flowDetector.DetectAnomalyWithProcessing(traffic)
if result.IsAnomaly {
anomalyCount++
fmt.Printf("[%02d] ⚠️ 异常已处理: %.2f\n", i+1, traffic)
} else {
fmt.Printf("[%02d] ✅ 正常流量: %.2f\n", i+1, traffic)
}
// 添加到检测器
flowDetector.AddValue(traffic)
// 模拟实时处理间隔
time.Sleep(100 * time.Millisecond)
}
fmt.Printf("\n检测完成共处理 %d 个异常流量点\n", anomalyCount)
}
// TestRealTimeAnomalyProcessing 实时异常处理测试
func TestRealTimeAnomalyProcessing(t *testing.T) {
detector := NewMeanStdDetector(10, 2.0)
// 添加实时处理器
detector.AddAnomalyProcessor(AnomalyProcessor{
Action: ActionCustom,
Enabled: true,
Name: "realtime",
Handler: func(result *DetectionResult) {
// 实时处理逻辑
timestamp := time.Now().Format("15:04:05")
switch {
case result.Deviation > result.Threshold*3:
fmt.Printf("[%s] 🔥 极严重异常 %.2f - 立即阻断\n", timestamp, result.CurrentValue)
case result.Deviation > result.Threshold*2:
fmt.Printf("[%s] ⚡ 严重异常 %.2f - 启动限流\n", timestamp, result.CurrentValue)
default:
fmt.Printf("[%s] ⚠️ 一般异常 %.2f - 记录日志\n", timestamp, result.CurrentValue)
}
},
})
// 模拟实时数据流
trafficStream := []float64{10, 12, 11, 9, 50, 100, 200, 15, 13, 11}
fmt.Println("=== 实时异常处理演示 ===")
for _, traffic := range trafficStream {
detector.AddValue(traffic)
detector.DetectAnomalyWithProcessing(traffic)
time.Sleep(500 * time.Millisecond) // 模拟实时间隔
}
}
// TestFlowAnomalyDetectionWithRecovery 带恢复机制的流量检测测试
func TestFlowAnomalyDetectionWithRecovery(t *testing.T) {
//初始化日志
zlog.InitZLog(global.GWAF_LOG_DEBUG_ENABLE, "console")
// 创建流量异常检测器
flowDetector := NewMeanStdDetector(10, 2.0)
flowDetector.SetRecoveryThreshold(3) // 设置3个连续正常值后恢复
// 添加异常和恢复处理器
flowDetector.AddAnomalyProcessor(AnomalyProcessor{
Action: ActionLog,
Handler: LogAnomalyHandler,
RecoveryHandler: RecoveryLogHandler,
Enabled: true,
Name: "logger_with_recovery",
})
flowDetector.AddAnomalyProcessor(AnomalyProcessor{
Action: ActionBlock,
Handler: BlockAnomalyHandler,
RecoveryHandler: RecoveryUnblockHandler,
Enabled: true,
Name: "blocker_with_recovery",
})
// 模拟完整的异常-恢复周期
trafficData := []float64{
// 正常流量建立基线
10, 12, 11, 9, 10, 11, 9, 12, 10, 11,
// 异常流量
50, 60, 55,
// 恢复正常
10, 11, 12, 9, 10, 11,
// 高位运行
100, 120, 110, 130, 120, 110,
// 继续高位运行
92, 93, 94, 95, 94, 98,
//降至低位
10, 11, 12, 9, 10, 11,
}
fmt.Println("=== 异常检测与恢复演示 ===")
for i, traffic := range trafficData {
// 先添加到窗口,再检测
flowDetector.AddValue(traffic)
result := flowDetector.DetectAnomalyWithProcessing(traffic)
if result.IsAnomaly {
fmt.Printf("[%02d] ⚠️ 异常: %.2f\n", i+1, traffic)
} else if result.IsRecovered {
fmt.Printf("[%02d] 🎉 恢复: %.2f (连续正常%d次)\n", i+1, traffic, result.RecoveryCount)
} else {
fmt.Printf("[%02d] ✅ 正常: %.2f\n", i+1, traffic)
}
// 显示恢复状态
status := flowDetector.GetRecoveryStatus()
if status["is_in_anomaly_state"].(bool) {
fmt.Printf(" 恢复进度: %d/%d (%.1f%%)\n",
status["normal_count"], status["recovery_threshold"],
status["recovery_progress"].(float64)*100)
}
time.Sleep(200 * time.Millisecond)
}
}
// TestDualWindowDetectorBasic 双窗口检测器基础功能测试
func TestDualWindowDetectorBasic(t *testing.T) {
// 创建双窗口检测器短窗口5长窗口152倍标准差
detector := NewDualWindowDetector(5, 15, 2.0)
fmt.Println("=== 双窗口检测器基础功能测试 ===")
// 第一阶段:建立长期基线(低流量)
lowTraffic := []float64{10, 12, 11, 9, 10, 11, 9, 12, 10, 11, 9, 10, 12, 11, 10}
fmt.Println("\n--- 建立长期基线(低流量) ---")
for i, traffic := range lowTraffic {
detector.AddValue(traffic)
result := detector.DetectAnomaly(traffic)
fmt.Printf("[%02d] 值: %.1f, 异常: %v, %s\n", i+1, traffic, result.IsAnomaly, result.Confidence)
}
// 第二阶段:短期高流量(应该被检测为正常,因为还没有持续太久)
highTraffic := []float64{25, 30, 28, 32, 27, 29, 31, 26, 30, 28}
fmt.Println("\n--- 短期高流量阶段 ---")
for i, traffic := range highTraffic {
detector.AddValue(traffic)
result := detector.DetectAnomaly(traffic)
fmt.Printf("[%02d] 值: %.1f, 异常: %v, %s\n", i+len(lowTraffic)+1, traffic, result.IsAnomaly, result.Confidence)
}
// 第三阶段:持续高流量(应该触发持续高位警告)
sustainedHighTraffic := []float64{35, 40, 38, 42, 36, 39, 41, 37, 40, 38, 35, 42, 39, 36, 40}
fmt.Println("\n--- 持续高流量阶段 ---")
for i, traffic := range sustainedHighTraffic {
detector.AddValue(traffic)
result := detector.DetectAnomaly(traffic)
fmt.Printf("[%02d] 值: %.1f, 异常: %v, %s\n", i+len(lowTraffic)+len(highTraffic)+1, traffic, result.IsAnomaly, result.Confidence)
}
// 打印最终统计
stats := detector.GetDualWindowStats()
fmt.Printf("\n=== 最终统计 ===\n")
fmt.Printf("短窗口均值: %.2f\n", stats["short_mean"])
fmt.Printf("长窗口均值: %.2f\n", stats["long_mean"])
fmt.Printf("持续高位计数: %d\n", stats["sustained_high_count"])
}
// TestDualWindowSustainedHighDetection 持续高位检测专项测试
func TestDualWindowSustainedHighDetection(t *testing.T) {
detector := NewDualWindowDetector(5, 20, 2.0)
detector.SetSustainedHighThreshold(10) // 设置较低的阈值便于测试
fmt.Println("=== 持续高位检测专项测试 ===")
// 建立低基线
for i := 0; i < 20; i++ {
detector.AddValue(10.0 + float64(i%3)) // 9-12之间波动
}
fmt.Println("\n--- 基线建立完成,开始高位运行 ---")
// 模拟系统从低位突然跳到高位并持续运行
highValues := []float64{50, 52, 48, 51, 49, 53, 47, 50, 52, 48, 51, 49, 50, 52, 48}
sustainedDetected := false
for i, value := range highValues {
detector.AddValue(value)
result := detector.DetectAnomaly(value)
fmt.Printf("[%02d] 值: %.1f, 异常: %v", i+1, value, result.IsAnomaly)
if result.IsAnomaly && result.Confidence != "正常范围" {
fmt.Printf(", %s", result.Confidence)
if !sustainedDetected && result.Confidence != "正常范围" {
sustainedDetected = true
fmt.Printf(" ← 首次检测到持续高位")
}
}
fmt.Println()
}
if !sustainedDetected {
t.Error("未能检测到持续高位运行状态")
} else {
fmt.Println("✅ 成功检测到持续高位运行状态")
}
}
// TestDualWindowVsSingleWindow 双窗口与单窗口对比测试
func TestDualWindowVsSingleWindow(t *testing.T) {
// 创建检测器
singleDetector := NewMeanStdDetector(10, 2.0)
dualDetector := NewDualWindowDetector(5, 15, 2.0)
dualDetector.SetSustainedHighThreshold(8)
fmt.Println("=== 双窗口 vs 单窗口对比测试 ===")
// 测试场景:历史低峰 -> 高峰持续运行
testData := []float64{
// 历史低峰
5, 7, 6, 8, 5, 6, 7, 5, 8, 6,
// 突然跳到高位并持续
25, 28, 26, 30, 27, 29, 31, 26, 28, 30, 27, 29, 25, 28, 26,
}
fmt.Printf("%-5s %-8s %-15s %-15s %-20s %-20s\n", "序号", "数值", "单窗口异常", "双窗口异常", "单窗口置信度", "双窗口置信度")
fmt.Println(strings.Repeat("-", 100))
singleAnomalies := 0
dualAnomalies := 0
dualSustainedWarnings := 0
for i, value := range testData {
// 单窗口检测
singleDetector.AddValue(value)
singleResult := singleDetector.DetectAnomaly(value)
// 双窗口检测
dualDetector.AddValue(value)
dualResult := dualDetector.DetectAnomaly(value)
if singleResult.IsAnomaly {
singleAnomalies++
}
if dualResult.IsAnomaly {
dualAnomalies++
if strings.Contains(dualResult.Confidence, "持续高位") {
dualSustainedWarnings++
}
}
fmt.Printf("%-5d %-8.1f %-15v %-15v %-20s %-20s\n",
i+1, value,
singleResult.IsAnomaly, dualResult.IsAnomaly,
singleResult.Confidence, dualResult.Confidence)
}
fmt.Printf("\n=== 对比结果 ===\n")
fmt.Printf("单窗口异常次数: %d\n", singleAnomalies)
fmt.Printf("双窗口异常次数: %d\n", dualAnomalies)
fmt.Printf("双窗口持续高位警告: %d\n", dualSustainedWarnings)
// 验证双窗口能检测到持续高位问题
if dualSustainedWarnings == 0 {
t.Error("双窗口检测器未能识别持续高位运行问题")
} else {
fmt.Printf("✅ 双窗口成功识别了持续高位运行问题\n")
}
}
// TestDualWindowWithHandlers 双窗口检测器异常处理器测试
func TestDualWindowWithHandlers(t *testing.T) {
//初始化日志
zlog.InitZLog(global.GWAF_LOG_DEBUG_ENABLE, "console")
detector := NewDualWindowDetector(5, 15, 2.0)
detector.SetSustainedHighThreshold(5)
// 添加专门的持续高位处理器
detector.AddAnomalyProcessor(AnomalyProcessor{
Action: ActionCustom,
Enabled: true,
Name: "sustained_high_handler",
Handler: func(result *DetectionResult) {
if strings.Contains(result.Confidence, "持续高位") {
zlog.Warn("持续高位运行告警", map[string]interface{}{
"alert_type": "sustained_high_traffic",
"current_value": result.CurrentValue,
"deviation": result.Deviation,
"confidence": result.Confidence,
"action_needed": "review_system_capacity",
})
fmt.Printf("🔥 持续高位告警: %s\n", result.String())
} else {
zlog.Info("一般异常检测", map[string]interface{}{
"current_value": result.CurrentValue,
"confidence": result.Confidence,
})
}
},
})
fmt.Println("=== 双窗口异常处理器测试 ===")
// 测试数据:低基线 -> 持续高位
testSequence := []float64{
// 建立低基线
10, 12, 11, 9, 10, 11, 9, 12, 10, 11, 9, 10, 12, 11, 10,
// 持续高位运行
30, 32, 31, 33, 29, 31, 34, 30, 32, 31, 30, 33, 31, 29, 32,
}
handlerTriggered := false
sustainedHighDetected := false
for i, value := range testSequence {
detector.AddValue(value)
result := detector.DetectAnomalyWithProcessing(value)
if result.IsAnomaly {
handlerTriggered = true
if strings.Contains(result.Confidence, "持续高位") {
sustainedHighDetected = true
fmt.Printf("[%02d] 🚨 持续高位: %.1f - %s\n", i+1, value, result.Confidence)
} else {
fmt.Printf("[%02d] ⚠️ 一般异常: %.1f - %s\n", i+1, value, result.Confidence)
}
} else {
fmt.Printf("[%02d] ✅ 正常: %.1f\n", i+1, value)
}
time.Sleep(50 * time.Millisecond)
}
if !handlerTriggered {
t.Error("异常处理器未被触发")
}
if !sustainedHighDetected {
t.Error("未检测到持续高位运行状态")
}
fmt.Printf("\n✅ 测试完成:异常处理器正常工作,持续高位检测有效\n")
}
// TestDualWindowRealWorldScenario 双窗口真实场景测试
func TestDualWindowRealWorldScenario(t *testing.T) {
detector := NewDualWindowDetector(10, 30, 2.0)
detector.SetSustainedHighThreshold(15)
fmt.Println("=== 双窗口真实场景测试 ===")
// 模拟真实的网络流量场景
scenarios := map[string][]float64{
"正常日间流量": {15, 18, 20, 22, 25, 23, 21, 19, 17, 20, 22, 24, 21, 18, 16},
"突发异常流量": {150, 200, 180, 220, 160}, // 真正的异常攻击
"系统升级后高位": {45, 48, 50, 47, 49, 51, 46, 48, 50, 47, 49, 52, 48, 46, 50, 49, 47, 51, 48, 50}, // 系统升级后持续高位
"恢复正常": {20, 22, 18, 21, 19, 23, 20, 18, 22, 21},
}
totalAnomalies := 0
sustainedHighWarnings := 0
for scenario, data := range scenarios {
fmt.Printf("\n--- %s ---\n", scenario)
for i, value := range data {
detector.AddValue(value)
result := detector.DetectAnomaly(value)
if result.IsAnomaly {
totalAnomalies++
if strings.Contains(result.Confidence, "持续高位") {
sustainedHighWarnings++
fmt.Printf("[%02d] 🔥 %s: %.1f\n", i+1, result.Confidence, value)
} else {
fmt.Printf("[%02d] ⚠️ 异常: %.1f - %s\n", i+1, value, result.Confidence)
}
} else {
fmt.Printf("[%02d] ✅ 正常: %.1f\n", i+1, value)
}
}
}
fmt.Printf("\n=== 场景测试总结 ===\n")
fmt.Printf("总异常检测次数: %d\n", totalAnomalies)
fmt.Printf("持续高位警告次数: %d\n", sustainedHighWarnings)
// 验证检测效果
if totalAnomalies == 0 {
t.Error("未检测到任何异常,检测器可能过于宽松")
}
if sustainedHighWarnings == 0 {
t.Error("未检测到持续高位运行,可能需要调整参数")
}
fmt.Printf("✅ 双窗口检测器在真实场景中表现良好\n")
}

View File

@@ -2,19 +2,19 @@ package global
var (
/******记录参数配置****************/
GCONFIG_RECORD_MAX_BODY_LENGTH int64 = 1024 * 2 //限制记录最大请求的body长度 record_max_req_body_length
GCONFIG_RECORD_MAX_RES_BODY_LENGTH int64 = 1024 * 4 //限制记录最大响应的body长度 record_max_rep_body_length
GCONFIG_RECORD_RESP int64 = 0 // 是否记录响应记录 record_resp
GCONFIG_RECORD_PROXY_HEADER string = "" //配置获取IP头信息
GCONFIG_RECORD_AUTO_LOAD_SSL int64 = 1 //是否每天凌晨3点自动加载ssl证书
GCONFIG_RECORD_KAFKA_ENABLE int64 = 0 //kafka 是否激活
GCONFIG_RECORD_KAFKA_URL string = "127.0.0.1:9092" //kafka url地址
GCONFIG_RECORD_KAFKA_TOPIC string = "samwaf_logs_topic" //kafka topic
GCONFIG_RECORD_REDIRECT_HTTPS_CODE int64 = 301 //80跳转https的方式
GCONFIG_ENABLE_HTTPS_REDIRECT int64 = 0 //是否启用HTTPS重定向服务器 0关闭 1开启
GCONFIG_RECORD_LOGIN_MAX_ERROR_TIME int64 = 3 //登录周期里错误最大次数
GCONFIG_RECORD_LOGIN_LIMIT_MINTUTES int64 = 1 //登录错误记录周期 单位分钟最小1
GCONFIG_LOG_PERSIST_ENABLED int64 = 1 //是否将web日志持久化到日志库默认0维持老行为 1开启 0维持老行为
GCONFIG_RECORD_MAX_BODY_LENGTH int64 = 1024 * 2 //限制记录最大请求的body长度 record_max_req_body_length
GCONFIG_RECORD_MAX_RES_BODY_LENGTH int64 = 1024 * 4 //限制记录最大响应的body长度 record_max_rep_body_length
GCONFIG_RECORD_RESP int64 = 0 // 是否记录响应记录 record_resp
GCONFIG_RECORD_PROXY_HEADER string = "" //配置获取IP头信息
GCONFIG_RECORD_AUTO_LOAD_SSL int64 = 1 //是否每天凌晨3点自动加载ssl证书
GCONFIG_RECORD_KAFKA_ENABLE int64 = 0 //kafka 是否激活
GCONFIG_RECORD_KAFKA_URL string = "127.0.0.1:9092" //kafka url地址
GCONFIG_RECORD_KAFKA_TOPIC string = "samwaf_logs_topic" //kafka topic
GCONFIG_RECORD_REDIRECT_HTTPS_CODE int64 = 301 //80跳转https的方式
GCONFIG_ENABLE_HTTPS_REDIRECT int64 = 0 //是否启用HTTPS重定向服务器 0关闭 1开启
GCONFIG_RECORD_LOGIN_MAX_ERROR_TIME int64 = 3 //登录周期里错误最大次数
GCONFIG_RECORD_LOGIN_LIMIT_MINTUTES int64 = 1 //登录错误记录周期 单位分钟最小1
// 指纹认证相关配置
GCONFIG_ENABLE_DEVICE_FINGERPRINT int64 = 1 // 是否启用设备指纹认证 1启用 0禁用

View File

@@ -134,7 +134,7 @@ var (
GSSL_HTTP_CHANGLE_PATH string = "/.well-known/acme-challenge/" // http01证书验证路径
/******数据库处理参数*****/
GDATA_BATCH_INSERT int = 1000 //最大批量插入
GDATA_BATCH_INSERT int64 = 100 //最大批量插入数量
GDATA_SHARE_DB_SIZE int64 = 100 * 10000 //100w 进行分库 100*10000
GDATA_SHARE_DB_FILE_SIZE int64 = 1024 //1024M 进行分库
GDATA_CURRENT_CHANGE bool = false //当前是否正在切换

View File

@@ -40,6 +40,7 @@ type Hosts struct {
AntiLeechJSON string `json:"anti_leech_json"` //防盗链配置 json
CacheJSON string `json:"cache_json"` //缓存配置 json
StaticSiteJSON string `json:"static_site_json"` //静态站点配置 json
TransportJSON string `json:"transport_json"` //传输配置 json
DefaultEncoding string `json:"default_encoding"` //默认编码 utf-8 或者 gbk auto字符串自动选择
LogOnlyMode int `json:"log_only_mode"` //仅记录模式 1开启 0关闭
}
@@ -154,3 +155,36 @@ type StaticSiteConfig struct {
AllowedExtensions string `json:"allowed_extensions"` // 允许的文件扩展名白名单,逗号分隔
SensitivePatterns string `json:"sensitive_patterns"` // 敏感文件名模式(正则表达式),逗号分隔
}
// TransportConfig 传输配置
type TransportConfig struct {
MaxIdleConns int `json:"max_idle_conns"` // 最大空闲连接数
MaxIdleConnsPerHost int `json:"max_idle_conns_per_host"` // 每个主机的最大空闲连接数
MaxConnsPerHost int `json:"max_conns_per_host"` // 每个主机的最大连接数
IdleConnTimeout int `json:"idle_conn_timeout"` // 空闲连接超时时间(秒)
TLSHandshakeTimeout int `json:"tls_handshake_timeout"` // TLS握手超时时间(秒)
ExpectContinueTimeout int `json:"expect_continue_timeout"` // Expect Continue超时时间(秒)
}
// ParseTransportConfig 解析传输配置
func ParseTransportConfig(transportJSON string) TransportConfig {
var config TransportConfig
// 设置默认值
config.MaxIdleConns = 0
config.MaxIdleConnsPerHost = 0
config.MaxConnsPerHost = 0
config.IdleConnTimeout = 0
config.TLSHandshakeTimeout = 0
config.ExpectContinueTimeout = 0
// 如果JSON不为空则解析覆盖默认值
if transportJSON != "" {
err := json.Unmarshal([]byte(transportJSON), &config)
if err != nil {
// 解析失败时使用默认值,可以记录日志
return config
}
}
return config
}

View File

@@ -36,6 +36,7 @@ type WafHostAddReq struct {
CacheJSON string `json:"cache_json"` //缓存配置 json
DefaultEncoding string `json:"default_encoding"` //默认编码 utf-8 或者 gbk auto字符串自动选择
LogOnlyMode int `json:"log_only_mode"` //是否只记录日志 1 是 0 不是
TransportJSON string `json:"transport_json"` //Transport配置 json
}
type WafHostDelReq struct {
@@ -80,6 +81,7 @@ type WafHostEditReq struct {
StaticSiteJSON string `json:"static_site_json"` //静态站点配置 json
DefaultEncoding string `json:"default_encoding"` //默认编码 utf-8 或者 gbk auto字符串自动选择
LogOnlyMode int `json:"log_only_mode"` //是否只记录日志 1 是 0 不是
TransportJSON string `json:"transport_json"` //Transport配置 json
}
type WafHostGuardStatusReq struct {
CODE string `json:"code"`

View File

@@ -67,6 +67,7 @@ func (receiver *WafHostService) AddApi(wafHostAddReq request.WafHostAddReq) (str
StaticSiteJSON: wafHostAddReq.StaticSiteJSON,
DefaultEncoding: wafHostAddReq.DefaultEncoding,
LogOnlyMode: wafHostAddReq.LogOnlyMode,
TransportJSON: wafHostAddReq.TransportJSON,
}
global.GWAF_LOCAL_DB.Create(wafHost)
return wafHost.Code, nil
@@ -124,6 +125,7 @@ func (receiver *WafHostService) ModifyApi(wafHostEditReq request.WafHostEditReq)
"StaticSiteJSON": wafHostEditReq.StaticSiteJSON,
"DefaultEncoding": wafHostEditReq.DefaultEncoding,
"LogOnlyMode": wafHostEditReq.LogOnlyMode,
"TransportJSON": wafHostEditReq.TransportJSON,
}
err := global.GWAF_LOCAL_DB.Debug().Model(model.Hosts{}).Where("CODE=?", wafHostEditReq.CODE).Updates(hostMap).Error

View File

@@ -57,7 +57,7 @@ func (waf *WafEngine) checkCaptchaToken(r *http.Request, webLog innerbean.WebLog
}
// 处理验证码
func (waf *WafEngine) handleCaptchaRequest(w http.ResponseWriter, r *http.Request, log innerbean.WebLog, captchaConfig model.CaptchaConfig) {
func (waf *WafEngine) handleCaptchaRequest(w http.ResponseWriter, r *http.Request, log *innerbean.WebLog, captchaConfig model.CaptchaConfig) {
// 使用验证码服务处理请求
captchaService := wafcaptcha.GetService()
captchaService.HandleCaptchaRequest(w, r, log, captchaConfig)

View File

@@ -33,7 +33,7 @@ func renderTemplate(templateContent string, data map[string]interface{}) ([]byte
}
// EchoErrorInfo ruleName 对内记录 blockInfo 对外展示
func EchoErrorInfo(w http.ResponseWriter, r *http.Request, weblogbean innerbean.WebLog, ruleName string, blockInfo string, hostsafe *wafenginmodel.HostSafe, globalHostSafe *wafenginmodel.HostSafe, isLog bool) {
func EchoErrorInfo(w http.ResponseWriter, r *http.Request, weblogbean *innerbean.WebLog, ruleName string, blockInfo string, hostsafe *wafenginmodel.HostSafe, globalHostSafe *wafenginmodel.HostSafe, isLog bool) {
resBytes := []byte("")
var responseCode int = 403
@@ -136,7 +136,7 @@ func EchoErrorInfo(w http.ResponseWriter, r *http.Request, weblogbean innerbean.
}
// EchoResponseErrorInfo ruleName 对内记录 blockInfo 对外展示
func EchoResponseErrorInfo(resp *http.Response, weblogbean innerbean.WebLog, ruleName string, blockInfo string, hostsafe *wafenginmodel.HostSafe, globalHostSafe *wafenginmodel.HostSafe, isLog bool) {
func EchoResponseErrorInfo(resp *http.Response, weblogbean *innerbean.WebLog, ruleName string, blockInfo string, hostsafe *wafenginmodel.HostSafe, globalHostSafe *wafenginmodel.HostSafe, isLog bool) {
resBytes := []byte("")
var responseCode int = 403

View File

@@ -9,6 +9,7 @@ import (
"SamWaf/wafproxy"
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
@@ -28,7 +29,8 @@ func (waf *WafEngine) ProxyHTTP(w http.ResponseWriter, r *http.Request, host str
for addrIndex, loadBalance := range hostTarget.LoadBalanceLists {
//初始化后端负载
zlog.Debug("HTTP REQUEST", weblog.REQ_UUID, weblog.URL, "未初始化")
transport, customHeaders := waf.createTransport(r, host, 1, loadBalance, hostTarget)
transport := waf.getOrCreateTransport(r, host, 1, loadBalance, hostTarget) // 使用缓存的Transport
customHeaders := waf.getCustomHeaders(r, host, 1, loadBalance, hostTarget)
customConfig := map[string]string{}
customConfig["IsTransBackDomain"] = strconv.Itoa(hostTarget.Host.IsTransBackDomain)
proxy := wafproxy.NewSingleHostReverseProxyCustomHeader(remoteUrl, customHeaders, customConfig)
@@ -70,7 +72,8 @@ func (waf *WafEngine) ProxyHTTP(w http.ResponseWriter, r *http.Request, host str
}
} else {
transport, customHeaders := waf.createTransport(r, host, 0, model.LoadBalance{}, hostTarget)
transport := waf.getOrCreateTransport(r, host, 0, model.LoadBalance{}, hostTarget) // 使用缓存的Transport
customHeaders := waf.getCustomHeaders(r, host, 0, model.LoadBalance{}, hostTarget)
customConfig := map[string]string{}
customConfig["IsTransBackDomain"] = strconv.Itoa(hostTarget.Host.IsTransBackDomain)
proxy := wafproxy.NewSingleHostReverseProxyCustomHeader(remoteUrl, customHeaders, customConfig)
@@ -130,6 +133,97 @@ func (waf *WafEngine) createTransport(r *http.Request, host string, isEnableLoad
DialContext: dialContext,
}
}
// 解析并应用Transport配置
transportConfig := model.ParseTransportConfig(hostTarget.Host.TransportJSON)
// 应用Transport配置只有非零值才设置
if transportConfig.MaxIdleConns > 0 {
transport.MaxIdleConns = transportConfig.MaxIdleConns
}
if transportConfig.MaxIdleConnsPerHost > 0 {
transport.MaxIdleConnsPerHost = transportConfig.MaxIdleConnsPerHost
}
if transportConfig.MaxConnsPerHost > 0 {
transport.MaxConnsPerHost = transportConfig.MaxConnsPerHost
}
if transportConfig.IdleConnTimeout > 0 {
transport.IdleConnTimeout = time.Duration(transportConfig.IdleConnTimeout) * time.Second
}
if transportConfig.TLSHandshakeTimeout > 0 {
transport.TLSHandshakeTimeout = time.Duration(transportConfig.TLSHandshakeTimeout) * time.Second
}
if transportConfig.ExpectContinueTimeout > 0 {
transport.ExpectContinueTimeout = time.Duration(transportConfig.ExpectContinueTimeout) * time.Second
}
transport.ResponseHeaderTimeout = time.Duration(hostTarget.Host.ResponseTimeOut) * time.Second
//把下面的参数一次性使用格式化实现
zlog.Debug(fmt.Sprintf("Transport配置信息:\nMaxIdleConns: %d\nMaxIdleConnsPerHost: %d\nMaxConnsPerHost: %d\nIdleConnTimeout: %v\nTLSHandshakeTimeout: %v\nExpectContinueTimeout: %v\nResponseHeaderTimeout: %v\nDisableKeepAlives: %v\nDisableCompression: %v",
transport.MaxIdleConns,
transport.MaxIdleConnsPerHost,
transport.MaxConnsPerHost,
transport.IdleConnTimeout,
transport.TLSHandshakeTimeout,
transport.ExpectContinueTimeout,
transport.ResponseHeaderTimeout,
transport.DisableKeepAlives,
transport.DisableCompression))
return transport, customHeaders
}
// 获取或创建Transport
func (waf *WafEngine) getOrCreateTransport(r *http.Request, host string, isEnableLoadBalance int, loadBalance model.LoadBalance, hostTarget *wafenginmodel.HostSafe) *http.Transport {
// 生成Transport的唯一键
transportKey := waf.generateTransportKey(host, isEnableLoadBalance, loadBalance, hostTarget)
waf.TransportMux.RLock()
if transport, exists := waf.TransportPool[transportKey]; exists {
waf.TransportMux.RUnlock()
return transport
}
waf.TransportMux.RUnlock()
// 创建新的Transport
waf.TransportMux.Lock()
defer waf.TransportMux.Unlock()
// 双重检查,防止并发创建
if transport, exists := waf.TransportPool[transportKey]; exists {
return transport
}
transport, _ := waf.createTransport(r, host, isEnableLoadBalance, loadBalance, hostTarget)
// 优化Transport配置
/*transport.MaxIdleConns = 1000
transport.MaxIdleConnsPerHost = 1000*/
if waf.TransportPool == nil {
waf.TransportPool = make(map[string]*http.Transport)
}
waf.TransportPool[transportKey] = transport
return transport
}
// 生成Transport的唯一键
func (waf *WafEngine) generateTransportKey(host string, isEnableLoadBalance int, loadBalance model.LoadBalance, hostTarget *wafenginmodel.HostSafe) string {
key := fmt.Sprintf("%s_%d_%s_%d_%v",
host,
isEnableLoadBalance,
loadBalance.Remote_ip,
loadBalance.Remote_port,
hostTarget.Host.InsecureSkipVerify)
return key
}
// 分离自定义头部逻辑
func (waf *WafEngine) getCustomHeaders(r *http.Request, host string, isEnableLoadBalance int, loadBalance model.LoadBalance, hostTarget *wafenginmodel.HostSafe) map[string]string {
customHeaders := map[string]string{}
if r.TLS != nil {
customHeaders["X-FORWARDED-PROTO"] = "https"
}
return customHeaders
}

View File

@@ -157,7 +157,7 @@ func GetService() *CaptchaService {
}
// HandleCaptchaRequest 处理验证码请求
func (s *CaptchaService) HandleCaptchaRequest(w http.ResponseWriter, r *http.Request, weblog innerbean.WebLog, captchaConfig model.CaptchaConfig) {
func (s *CaptchaService) HandleCaptchaRequest(w http.ResponseWriter, r *http.Request, weblog *innerbean.WebLog, captchaConfig model.CaptchaConfig) {
path := r.URL.Path
// 记录访问日志
@@ -400,7 +400,7 @@ func (s *CaptchaService) GetClickBasicCaptData(w http.ResponseWriter, r *http.Re
}
// VerifyCaptcha 验证验证码
func (s *CaptchaService) VerifyCaptcha(w http.ResponseWriter, r *http.Request, captchaType string, webLog innerbean.WebLog, captchaConfig model.CaptchaConfig) {
func (s *CaptchaService) VerifyCaptcha(w http.ResponseWriter, r *http.Request, captchaType string, webLog *innerbean.WebLog, captchaConfig model.CaptchaConfig) {
// 根据IP模式选择使用的IP
clientIP := webLog.NetSrcIp
if captchaConfig.IPMode == "proxy" {
@@ -638,7 +638,7 @@ func (s *CaptchaService) VerifyCapJsCaptcha(w http.ResponseWriter, r *http.Reque
json.NewEncoder(w).Encode(response)
}
func (s *CaptchaService) ValidateCapJsCaptcha(w http.ResponseWriter, r *http.Request, configStruct model.CaptchaConfig, webLog innerbean.WebLog) {
func (s *CaptchaService) ValidateCapJsCaptcha(w http.ResponseWriter, r *http.Request, configStruct model.CaptchaConfig, webLog *innerbean.WebLog) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return

View File

@@ -58,15 +58,20 @@ type WafEngine struct {
//敏感词管理
Sensitive []model.Sensitive //敏感词
SensitiveManager *goahocorasick.Machine
TransportPool map[string]*http.Transport // 添加Transport缓存池
TransportMux sync.RWMutex // 保护Transport池的读写锁
}
func (waf *WafEngine) Error() string {
fs := "HTTP: %d, HostCode: %d, Message: %s"
zlog.Error(fmt.Sprintf(fs))
return fmt.Sprintf(fs)
}
func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
innerLogName := "WafEngine ServeHTTP"
atomic.AddUint64(&global.GWAF_RUNTIME_QPS, 1) // 原子增加计数器
port := ""
host := r.Host
if !strings.Contains(host, ":") {
@@ -318,7 +323,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
BaseMessageInfo: innerbean.BaseMessageInfo{OperaType: "CC封禁提醒"},
OperaCnt: visitIPError,
})
EchoErrorInfo(w, r, weblogbean, "", "当前IP由于访问频次太高暂时无法访问", hostTarget, waf.HostTarget[waf.HostCode[global.GWAF_GLOBAL_HOST_CODE]], false)
EchoErrorInfo(w, r, &weblogbean, "", "当前IP由于访问频次太高暂时无法访问", hostTarget, waf.HostTarget[waf.HostCode[global.GWAF_GLOBAL_HOST_CODE]], false)
return
}
@@ -358,7 +363,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return false
} else {
decrementMonitor(hostCode)
EchoErrorInfo(w, r, weblogbean, detectionResult.Title, detectionResult.Content, hostTarget, waf.HostTarget[waf.HostCode[global.GWAF_GLOBAL_HOST_CODE]], true)
EchoErrorInfo(w, r, &weblogbean, detectionResult.Title, detectionResult.Content, hostTarget, waf.HostTarget[waf.HostCode[global.GWAF_GLOBAL_HOST_CODE]], true)
return true
}
}
@@ -483,7 +488,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
if !isExcluded {
waf.handleCaptchaRequest(w, r, weblogbean, captchaConfig)
waf.handleCaptchaRequest(w, r, &weblogbean, captchaConfig)
return
}
}
@@ -564,7 +569,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
weblogbean.STATUS = r.Response.Status
weblogbean.STATUS_CODE = r.Response.StatusCode
weblogbean.TASK_FLAG = 1
global.GQEQUE_LOG_DB.Enqueue(weblogbean)
global.GQEQUE_LOG_DB.Enqueue(&weblogbean)
return
}
}
@@ -584,7 +589,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 记录日志
weblogbean.RES_BODY = sHttpAuthBaseResult
weblogbean.ACTION = "禁止"
global.GQEQUE_LOG_DB.Enqueue(weblogbean)
global.GQEQUE_LOG_DB.Enqueue(&weblogbean)
return
}
}
@@ -682,7 +687,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
//记录响应body
weblogbean.RES_BODY = string(resBytes)
weblogbean.ACTION = "禁止"
global.GQEQUE_LOG_DB.Enqueue(weblogbean)
global.GQEQUE_LOG_DB.Enqueue(&weblogbean)
}
}
func (waf *WafEngine) getClientIP(r *http.Request, headers ...string) (error, string, string) {
@@ -970,7 +975,7 @@ func (waf *WafEngine) modifyResponse() func(*http.Response) error {
weblogfrist.RULE = "敏感词检测:" + string(matchBodyResult[0].Word)
} else {
EchoResponseErrorInfo(resp, *weblogfrist, "敏感词检测:"+string(matchBodyResult[0].Word), "敏感词内容", waf.HostTarget[host], waf.HostTarget[waf.HostCode[global.GWAF_GLOBAL_HOST_CODE]], true)
EchoResponseErrorInfo(resp, weblogfrist, "敏感词检测:"+string(matchBodyResult[0].Word), "敏感词内容", waf.HostTarget[host], waf.HostTarget[waf.HostCode[global.GWAF_GLOBAL_HOST_CODE]], true)
return nil
}
} else {
@@ -1150,7 +1155,7 @@ func (waf *WafEngine) StartWaf() {
//第一步 检测合法性并加入到全局
waf.LoadAllHost()
wafSysLog := &model.WafSysLog{
wafSysLog := model.WafSysLog{
BaseOrm: baseorm.BaseOrm{
Id: uuid.GenUUID(),
USER_CODE: global.GWAF_USER_CODE,
@@ -1161,7 +1166,7 @@ func (waf *WafEngine) StartWaf() {
OpType: "信息",
OpContent: "WAF启动",
}
global.GQEQUE_LOG_DB.Enqueue(wafSysLog)
global.GQEQUE_LOG_DB.Enqueue(&wafSysLog)
waf.StartAllProxyServer()
}
@@ -1198,6 +1203,12 @@ func (waf *WafEngine) CloseWaf() {
Mux: sync.Mutex{},
Map: map[string]*tls.Certificate{},
}
// 清除Transport缓存池
waf.TransportMux.Lock()
waf.TransportPool = map[string]*http.Transport{}
defer waf.TransportMux.Unlock()
}
// 清除代理
@@ -1312,7 +1323,7 @@ func (waf *WafEngine) StartProxyServer(innruntime innerbean.ServerRunTime) {
OpType: "系统运行错误",
OpContent: "HTTPS端口被占用: " + strconv.Itoa(innruntime.Port) + ",请检查",
}
global.GQEQUE_LOG_DB.Enqueue(wafSysLog)
global.GQEQUE_LOG_DB.Enqueue(&wafSysLog)
zlog.Error("[HTTPServer] https server start fail, cause:[%v]", err)
}
zlog.Info("server https shutdown")
@@ -1351,7 +1362,7 @@ func (waf *WafEngine) StartProxyServer(innruntime innerbean.ServerRunTime) {
OpType: "系统运行错误",
OpContent: "HTTP端口被占用: " + strconv.Itoa(innruntime.Port) + ",请检查",
}
global.GQEQUE_LOG_DB.Enqueue(wafSysLog)
global.GQEQUE_LOG_DB.Enqueue(&wafSysLog)
zlog.Error("[HTTPServer] http server start fail, cause:[%v]", err)
}
zlog.Info("server http shutdown")

View File

@@ -3,6 +3,6 @@ package wafinterface
import "SamWaf/innerbean"
type WafNotify interface {
NotifyBatch(logs []innerbean.WebLog) error // 处理多个日志
NotifySingle(log innerbean.WebLog) error // 处理单个日志
NotifyBatch(logs []*innerbean.WebLog) error // 处理多个日志
NotifySingle(log *innerbean.WebLog) error // 处理单个日志
}

View File

@@ -79,7 +79,7 @@ func (kn *KafkaNotifier) ReConnect() error {
}
// 实现 WafNotify 接口中的 NotifySingle 方法
func (kn *KafkaNotifier) NotifySingle(log innerbean.WebLog) error {
func (kn *KafkaNotifier) NotifySingle(log *innerbean.WebLog) error {
err := kn.sendMessage(log)
if err != nil {
return err
@@ -89,7 +89,7 @@ func (kn *KafkaNotifier) NotifySingle(log innerbean.WebLog) error {
}
// 实现 WafNotify 接口中的 NotifyBatch 方法
func (kn *KafkaNotifier) NotifyBatch(logs []innerbean.WebLog) error {
func (kn *KafkaNotifier) NotifyBatch(logs []*innerbean.WebLog) error {
go func() {
for _, log := range logs {
if err := kn.sendMessage(log); err != nil {
@@ -102,7 +102,7 @@ func (kn *KafkaNotifier) NotifyBatch(logs []innerbean.WebLog) error {
}
// 发送消息到 Kafka
func (kn *KafkaNotifier) sendMessage(log innerbean.WebLog) error {
func (kn *KafkaNotifier) sendMessage(log *innerbean.WebLog) error {
if kn.client == nil {
err := kn.ReConnect()
if err != nil {

View File

@@ -27,7 +27,7 @@ func (ls *WafNotifyService) ChangeEnable(enable int64) {
}
// 处理并发送单条日志
func (ls *WafNotifyService) ProcessSingleLog(log innerbean.WebLog) error {
func (ls *WafNotifyService) ProcessSingleLog(log *innerbean.WebLog) error {
if ls.enable == 0 {
return nil
}
@@ -47,7 +47,7 @@ func (ls *WafNotifyService) ProcessSingleLog(log innerbean.WebLog) error {
}
// 处理并发送多条日志
func (ls *WafNotifyService) ProcessBatchLogs(logs []innerbean.WebLog) error {
func (ls *WafNotifyService) ProcessBatchLogs(logs []*innerbean.WebLog) error {
if ls.enable == 0 {
zlog.Debug("kafka没有开启")
return nil

View File

@@ -4,6 +4,8 @@ import (
"SamWaf/common/zlog"
"SamWaf/global"
"SamWaf/innerbean"
"SamWaf/waftask"
"strconv"
"sync/atomic"
"time"
)
@@ -25,7 +27,8 @@ func ProcessLogDequeEngine() {
zlog.Debug("正在切换数据库等待中队列")
} else {
var webLogArray []innerbean.WebLog
var webLogArray []*innerbean.WebLog
batchCount := 0
for !global.GQEQUE_LOG_DB.Empty() {
atomic.AddUint64(&global.GWAF_RUNTIME_LOG_PROCESS, 1) // 原子增加计数器
weblogbean, ok := global.GQEQUE_LOG_DB.Dequeue()
@@ -34,8 +37,12 @@ func ProcessLogDequeEngine() {
}
if weblogbean != nil {
// 进行类型断言将其转为具体的结构
if logValue, ok := weblogbean.(innerbean.WebLog); ok {
if logValue, ok := weblogbean.(*innerbean.WebLog); ok {
webLogArray = append(webLogArray, logValue)
batchCount++
if batchCount > int(global.GDATA_BATCH_INSERT) {
break
}
} else {
//插入其他类型内容
global.GWAF_LOCAL_LOG_DB.Create(weblogbean)
@@ -43,7 +50,12 @@ func ProcessLogDequeEngine() {
}
}
if len(webLogArray) > 0 {
global.GWAF_LOCAL_LOG_DB.CreateInBatches(webLogArray, global.GDATA_BATCH_INSERT)
zlog.Info("日志队列处理协程处理日志数量:" + strconv.Itoa(len(webLogArray)))
if global.GCONFIG_LOG_PERSIST_ENABLED == 1 {
global.GWAF_LOCAL_LOG_DB.CreateInBatches(webLogArray, len(webLogArray))
}
// 日志流做统计
waftask.CollectStatsFromLogs(webLogArray)
global.GNOTIFY_KAKFA_SERVICE.ProcessBatchLogs(webLogArray)
}
}

304
waftask/stat_collector.go Normal file
View File

@@ -0,0 +1,304 @@
package waftask
import (
"SamWaf/common/uuid"
"SamWaf/common/zlog"
"SamWaf/customtype"
"SamWaf/global"
"SamWaf/innerbean"
"SamWaf/model"
"SamWaf/model/baseorm"
"time"
"gorm.io/gorm"
)
// 从日志流批量聚合统计并写入统计库(不依赖日志入库)。
// 会按以下维度做增量统计:
// 1) 主机日聚合 (host_code, action, day)
// 2) IP日聚合 (host_code, ip, action, day)
// 3) 城市日聚合 (host_code, country, province, city, action, day)
// 4) IP标签计数 (ip, rule)
func CollectStatsFromLogs(logs []*innerbean.WebLog) {
if len(logs) == 0 {
return
}
// 添加输入日志数量的调试信息
zlog.Debug("统计收集器开始处理", "日志数量", len(logs))
// 安全检查需要相关DB已初始化
if global.GWAF_LOCAL_STATS_DB == nil || global.GWAF_LOCAL_DB == nil {
zlog.Debug("统计收集器", "数据库未初始化,跳过处理")
return
}
type hostKey struct {
TenantId string
UserCode string
HostCode string
Action string
Day int
Host string
}
type ipKey struct {
TenantId string
UserCode string
HostCode string
Action string
Day int
Host string
IP string
}
type cityKey struct {
TenantId string
UserCode string
HostCode string
Action string
Day int
Host string
Country string
Province string
City string
}
type ipTagKey struct {
IP string
Rule string
}
hostAgg := make(map[hostKey]int)
ipAgg := make(map[ipKey]int)
cityAgg := make(map[cityKey]int)
ipTagAgg := make(map[ipTagKey]int64)
for _, lg := range logs {
// 主机聚合
hk := hostKey{
TenantId: lg.TenantId,
UserCode: lg.USER_CODE,
HostCode: lg.HOST_CODE,
Action: lg.ACTION,
Day: lg.Day,
Host: lg.HOST,
}
hostAgg[hk]++
// IP聚合
ik := ipKey{
TenantId: lg.TenantId,
UserCode: lg.USER_CODE,
HostCode: lg.HOST_CODE,
Action: lg.ACTION,
Day: lg.Day,
Host: lg.HOST,
IP: lg.SRC_IP,
}
ipAgg[ik]++
// 城市聚合
ck := cityKey{
TenantId: lg.TenantId,
UserCode: lg.USER_CODE,
HostCode: lg.HOST_CODE,
Action: lg.ACTION,
Day: lg.Day,
Host: lg.HOST,
Country: lg.COUNTRY,
Province: lg.PROVINCE,
City: lg.CITY,
}
cityAgg[ck]++
// IPTag 聚合
rule := lg.RULE
if rule == "" {
rule = "正常"
}
ipTagAgg[ipTagKey{IP: lg.SRC_IP, Rule: rule}]++
}
// 添加聚合结果的调试信息
zlog.Debug("聚合计算完成",
"主机聚合数量", len(hostAgg),
"IP聚合数量", len(ipAgg),
"城市聚合数量", len(cityAgg),
"IP标签聚合数量", len(ipTagAgg))
now := customtype.JsonTime(time.Now())
// 1) 主机日聚合 增量
hostUpdateCount := 0
hostInsertCount := 0
for k, delta := range hostAgg {
tx := global.GWAF_LOCAL_STATS_DB.Model(&model.StatsDay{}).
Where("tenant_id = ? and user_code = ? and host_code = ? and type = ? and day = ?",
k.TenantId, k.UserCode, k.HostCode, k.Action, k.Day).
Updates(map[string]interface{}{
"Count": gorm.Expr("Count + ?", delta),
"UPDATE_TIME": now,
})
if tx.Error != nil {
zlog.Debug("主机聚合更新失败", "错误", tx.Error.Error(), "主机", k.HostCode, "动作", k.Action)
continue
}
if tx.RowsAffected == 0 {
// 创建新行
err := global.GWAF_LOCAL_STATS_DB.Create(&model.StatsDay{
BaseOrm: baseorm.BaseOrm{
Id: uuid.GenUUID(),
USER_CODE: k.UserCode,
Tenant_ID: k.TenantId,
CREATE_TIME: now,
UPDATE_TIME: now,
},
HostCode: k.HostCode,
Day: k.Day,
Host: k.Host,
Type: k.Action,
Count: delta,
}).Error
if err != nil {
zlog.Debug("主机聚合插入失败", "错误", err.Error(), "主机", k.HostCode, "动作", k.Action)
} else {
hostInsertCount++
}
} else {
hostUpdateCount++
}
}
zlog.Debug("主机聚合处理完成", "更新记录数", hostUpdateCount, "插入记录数", hostInsertCount)
// 2) IP日聚合 增量
ipUpdateCount := 0
ipInsertCount := 0
for k, delta := range ipAgg {
tx := global.GWAF_LOCAL_STATS_DB.Model(&model.StatsIPDay{}).
Where("tenant_id = ? and user_code = ? and host_code = ? and ip = ? and type = ? and day = ?",
k.TenantId, k.UserCode, k.HostCode, k.IP, k.Action, k.Day).
Updates(map[string]interface{}{
"Count": gorm.Expr("Count + ?", delta),
"UPDATE_TIME": now,
})
if tx.Error != nil {
zlog.Debug("IP聚合更新失败", "错误", tx.Error.Error(), "IP", k.IP, "动作", k.Action)
continue
}
if tx.RowsAffected == 0 {
err := global.GWAF_LOCAL_STATS_DB.Create(&model.StatsIPDay{
BaseOrm: baseorm.BaseOrm{
Id: uuid.GenUUID(),
USER_CODE: k.UserCode,
Tenant_ID: k.TenantId,
CREATE_TIME: now,
UPDATE_TIME: now,
},
HostCode: k.HostCode,
Day: k.Day,
Host: k.Host,
Type: k.Action,
Count: delta,
IP: k.IP,
}).Error
if err != nil {
zlog.Debug("IP聚合插入失败", "错误", err.Error(), "IP", k.IP, "动作", k.Action)
} else {
ipInsertCount++
}
} else {
ipUpdateCount++
}
}
zlog.Debug("IP聚合处理完成", "更新记录数", ipUpdateCount, "插入记录数", ipInsertCount)
// 3) 城市日聚合 增量
cityUpdateCount := 0
cityInsertCount := 0
for k, delta := range cityAgg {
tx := global.GWAF_LOCAL_STATS_DB.Model(&model.StatsIPCityDay{}).
Where("tenant_id = ? and user_code = ? and host_code = ? and country = ? and province = ? and city = ? and type = ? and day = ?",
k.TenantId, k.UserCode, k.HostCode, k.Country, k.Province, k.City, k.Action, k.Day).
Updates(map[string]interface{}{
"Count": gorm.Expr("Count + ?", delta),
"UPDATE_TIME": now,
})
if tx.Error != nil {
zlog.Debug("城市聚合更新失败", "错误", tx.Error.Error(), "城市", k.City, "动作", k.Action)
continue
}
if tx.RowsAffected == 0 {
err := global.GWAF_LOCAL_STATS_DB.Create(&model.StatsIPCityDay{
BaseOrm: baseorm.BaseOrm{
Id: uuid.GenUUID(),
USER_CODE: k.UserCode,
Tenant_ID: k.TenantId,
CREATE_TIME: now,
UPDATE_TIME: now,
},
HostCode: k.HostCode,
Day: k.Day,
Host: k.Host,
Type: k.Action,
Count: delta,
Country: k.Country,
Province: k.Province,
City: k.City,
}).Error
if err != nil {
zlog.Debug("城市聚合插入失败", "错误", err.Error(), "城市", k.City, "动作", k.Action)
} else {
cityInsertCount++
}
} else {
cityUpdateCount++
}
}
zlog.Debug("城市聚合处理完成", "更新记录数", cityUpdateCount, "插入记录数", cityInsertCount)
// 4) IPTag 增量(主库)
ipTagUpdateCount := 0
ipTagInsertCount := 0
for k, delta := range ipTagAgg {
tx := global.GWAF_LOCAL_DB.Model(&model.IPTag{}).
Where("tenant_id = ? and user_code = ? and ip = ? and ip_tag = ?",
global.GWAF_TENANT_ID, global.GWAF_USER_CODE, k.IP, k.Rule).
Updates(map[string]interface{}{
"Cnt": gorm.Expr("Cnt + ?", delta),
"UPDATE_TIME": now,
})
if tx.Error != nil {
zlog.Debug("IP标签更新失败", "错误", tx.Error.Error(), "IP", k.IP, "规则", k.Rule)
continue
}
if tx.RowsAffected == 0 {
err := global.GWAF_LOCAL_DB.Create(&model.IPTag{
BaseOrm: baseorm.BaseOrm{
Id: uuid.GenUUID(),
USER_CODE: global.GWAF_USER_CODE,
Tenant_ID: global.GWAF_TENANT_ID,
CREATE_TIME: now,
UPDATE_TIME: now,
},
IP: k.IP,
IPTag: k.Rule,
Cnt: delta,
Remarks: "",
}).Error
if err != nil {
zlog.Debug("IP标签插入失败", "错误", err.Error(), "IP", k.IP, "规则", k.Rule)
} else {
ipTagInsertCount++
}
} else {
ipTagUpdateCount++
}
}
zlog.Debug("IP标签处理完成", "更新记录数", ipTagUpdateCount, "插入记录数", ipTagInsertCount)
// 总结统计信息
zlog.Debug("统计收集器处理完成",
"总处理日志数", len(logs),
"主机聚合", map[string]interface{}{"更新": hostUpdateCount, "插入": hostInsertCount},
"IP聚合", map[string]interface{}{"更新": ipUpdateCount, "插入": ipInsertCount},
"城市聚合", map[string]interface{}{"更新": cityUpdateCount, "插入": cityInsertCount},
"IP标签", map[string]interface{}{"更新": ipTagUpdateCount, "插入": ipTagInsertCount})
}

View File

@@ -97,6 +97,9 @@ func setConfigIntValue(name string, value int64, change int) {
case "enable_strict_ip_binding":
global.GCONFIG_ENABLE_STRICT_IP_BINDING = value
break
case "batch_insert":
global.GDATA_BATCH_INSERT = value
break
default:
zlog.Warn("Unknown config item:", name)
}
@@ -253,5 +256,6 @@ func TaskLoadSetting(initLoad bool) {
// 指纹认证相关配置
updateConfigIntItem(initLoad, "security", "enable_device_fingerprint", global.GCONFIG_ENABLE_DEVICE_FINGERPRINT, "是否启用设备指纹认证1启用 0禁用", "options", "0|禁用,1|启用")
updateConfigIntItem(initLoad, "security", "enable_strict_ip_binding", global.GCONFIG_ENABLE_STRICT_IP_BINDING, "是否启用严格IP绑定1启用 0禁用启用指纹时建议禁用", "options", "0|禁用,1|启用")
//数据库相关
updateConfigIntItem(initLoad, "database", "batch_insert", global.GDATA_BATCH_INSERT, "数据库批量插入数量", "int", "")
}

View File

@@ -1,16 +1,7 @@
package waftask
import (
"SamWaf/common/uuid"
"SamWaf/common/zlog"
"SamWaf/customtype"
"SamWaf/global"
"SamWaf/innerbean"
"SamWaf/model"
"SamWaf/model/baseorm"
"SamWaf/service/waf_service"
"fmt"
"time"
)
var (
@@ -65,265 +56,9 @@ type CountIPRuleResult struct {
*/
func TaskCounter() {
// 检查是否收到关闭信号
if global.GWAF_SHUTDOWN_SIGNAL {
zlog.Info("TaskCounter Shutdown")
//废弃使用新形式
if 1 == 1 {
return
}
if global.GWAF_LOCAL_DB == nil || global.GWAF_LOCAL_LOG_DB == nil {
zlog.Debug("数据库没有初始化完成呢")
return
}
if global.GWAF_SWITCH_TASK_COUNTER == true {
zlog.Debug("统计还没完成调度任务PASS")
}
global.GWAF_SWITCH_TASK_COUNTER = true
/**
1.首次是当前日期,查询当前时间以后的所有数据,备份当前日期
2.查询使用备份日期倒退10秒查询这个时候所有的数据
3.
*/
if global.GDATA_CURRENT_CHANGE {
//如果正在切换库 跳过
zlog.Debug("正在切换数据库等待中")
global.GWAF_SWITCH_TASK_COUNTER = false
return
}
if global.GWAF_LAST_TIME_UNIX == 0 {
global.GWAF_LAST_TIME_UNIX = (global.GWAF_LAST_UPDATE_TIME.UnixNano()) / 1e6
global.GWAF_SWITCH_TASK_COUNTER = false
return
}
//取大于上次时间的时
statTimeUnix := global.GWAF_LAST_TIME_UNIX
endTimeUnix := (time.Now().Add(-5 * time.Second).UnixNano()) / 1e6
//打印 statTimeUnixendTimeUnix
zlog.Debug(fmt.Sprintf("counter statTimeUnix = %v endTimeUnix=%v", statTimeUnix, endTimeUnix))
lastWebLogDbBean := wafLogService.GetUnixTimeByCounter(statTimeUnix, endTimeUnix)
if lastWebLogDbBean.REQ_UUID == "" {
zlog.Debug("当前期间没有符合条件的数据")
global.GWAF_LAST_TIME_UNIX = endTimeUnix
global.GWAF_SWITCH_TASK_COUNTER = false
return
} else {
global.GWAF_LAST_TIME_UNIX = endTimeUnix
}
//一、 主机聚合统计
{
var resultHosts []CountHostResult
global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host FROM \"web_logs\" where task_flag = ? and unix_add_time >= ? and unix_add_time < ? and tenant_id = ? and user_code =? GROUP BY host_code, user_code,action,tenant_id,day,host",
1, statTimeUnix, endTimeUnix, global.GWAF_TENANT_ID, global.GWAF_USER_CODE).Scan(&resultHosts)
/****
1.如果不存在则创建
2.如果存在则累加这个周期的统计数
*/
for _, value := range resultHosts {
// 检查是否收到关闭信号
if global.GWAF_SHUTDOWN_SIGNAL {
zlog.Info("TaskCounter - Shutdown")
return
}
var statDay model.StatsDay
global.GWAF_LOCAL_STATS_DB.Where("tenant_id = ? and user_code = ? and host_code=? and type=? and day=?",
value.TenantId, value.UserCode, value.HostCode, value.ACTION, value.Day).Find(&statDay)
if statDay.HostCode == "" {
statDay2 := &model.StatsDay{
BaseOrm: baseorm.BaseOrm{
Id: uuid.GenUUID(),
USER_CODE: global.GWAF_USER_CODE,
Tenant_ID: global.GWAF_TENANT_ID,
CREATE_TIME: customtype.JsonTime(time.Now()),
UPDATE_TIME: customtype.JsonTime(time.Now()),
},
HostCode: value.HostCode,
Day: value.Day,
Host: value.Host,
Type: value.ACTION,
Count: value.Count,
}
global.GQEQUE_STATS_DB.Enqueue(statDay2)
} else {
statDayMap := map[string]interface{}{
"Count": value.Count + statDay.Count,
"UPDATE_TIME": customtype.JsonTime(time.Now()),
}
updateBean := innerbean.UpdateModel{
Model: model.StatsDay{},
Query: `tenant_id = ? and user_code= ? and host_code=? and type=? and day=?`,
Update: statDayMap,
}
updateBean.Args = append(updateBean.Args, value.TenantId, value.UserCode, value.HostCode, value.ACTION, value.Day)
global.GQEQUE_STATS_UPDATE_DB.Enqueue(updateBean)
}
}
}
//二、 IP聚合统计
{
var resultIP []CountIPResult
global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host,src_ip as ip FROM \"web_logs\" where task_flag = ? and unix_add_time >= ? and unix_add_time < ? and tenant_id = ? and user_code =? GROUP BY host_code, user_code,action,tenant_id,day,host,ip",
1, statTimeUnix, endTimeUnix, global.GWAF_TENANT_ID, global.GWAF_USER_CODE).Scan(&resultIP)
/****
1.如果不存在则创建
2.如果存在则累加这个周期的统计数
*/
for _, value := range resultIP {
// 检查是否收到关闭信号
if global.GWAF_SHUTDOWN_SIGNAL {
zlog.Info("TaskCounter - Shutdown")
return
}
var statDay model.StatsIPDay
global.GWAF_LOCAL_STATS_DB.Where("tenant_id = ? and user_code = ? and host_code=? and ip = ? and type=? and day=?",
value.TenantId, value.UserCode, value.HostCode, value.Ip, value.ACTION, value.Day).Find(&statDay)
if statDay.HostCode == "" {
statDay2 := &model.StatsIPDay{
BaseOrm: baseorm.BaseOrm{
Id: uuid.GenUUID(),
USER_CODE: global.GWAF_USER_CODE,
Tenant_ID: global.GWAF_TENANT_ID,
CREATE_TIME: customtype.JsonTime(time.Now()),
UPDATE_TIME: customtype.JsonTime(time.Now()),
},
HostCode: value.HostCode,
Day: value.Day,
Host: value.Host,
Type: value.ACTION,
Count: value.Count,
IP: value.Ip,
}
global.GQEQUE_STATS_DB.Enqueue(statDay2)
} else {
statDayMap := map[string]interface{}{
"Count": value.Count + statDay.Count,
"UPDATE_TIME": customtype.JsonTime(time.Now()),
}
updateBean := innerbean.UpdateModel{
Model: model.StatsIPDay{},
Query: "tenant_id = ? and user_code= ? and host_code=? and ip=? and type=? and day=?",
Update: statDayMap,
}
updateBean.Args = append(updateBean.Args, value.TenantId, value.UserCode, value.HostCode, value.Ip, value.ACTION, value.Day)
global.GQEQUE_STATS_UPDATE_DB.Enqueue(updateBean)
}
}
}
//三、 城市信息聚合统计
{
var resultCitys []CountCityResult
global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host,country,province,city FROM \"web_logs\" where task_flag = ? and unix_add_time >= ? and unix_add_time < ? and tenant_id = ? and user_code =? GROUP BY host_code, user_code,action,tenant_id,day,host,country,province,city",
1, statTimeUnix, endTimeUnix, global.GWAF_TENANT_ID, global.GWAF_USER_CODE).Scan(&resultCitys)
/****
1.如果不存在则创建
2.如果存在则累加这个周期的统计数
*/
for _, value := range resultCitys {
// 检查是否收到关闭信号
if global.GWAF_SHUTDOWN_SIGNAL {
zlog.Info("TaskCounter - Shutdown")
return
}
var statDay model.StatsIPCityDay
global.GWAF_LOCAL_STATS_DB.Where("tenant_id = ? and user_code = ? and host_code=? and country = ? and province = ? and city = ? and type=? and day=?",
value.TenantId, value.UserCode, value.HostCode, value.Country, value.Province, value.City, value.ACTION, value.Day).Find(&statDay)
if statDay.HostCode == "" {
statDay2 := &model.StatsIPCityDay{
BaseOrm: baseorm.BaseOrm{
Id: uuid.GenUUID(),
USER_CODE: global.GWAF_USER_CODE,
Tenant_ID: global.GWAF_TENANT_ID,
CREATE_TIME: customtype.JsonTime(time.Now()),
UPDATE_TIME: customtype.JsonTime(time.Now()),
},
HostCode: value.HostCode,
Day: value.Day,
Host: value.Host,
Type: value.ACTION,
Count: value.Count,
Country: value.Country,
Province: value.Province,
City: value.City,
}
global.GQEQUE_STATS_DB.Enqueue(statDay2)
} else {
statDayMap := map[string]interface{}{
"Count": value.Count + statDay.Count,
"UPDATE_TIME": customtype.JsonTime(time.Now()),
}
updateBean := innerbean.UpdateModel{
Model: model.StatsIPCityDay{},
Query: "tenant_id = ? and user_code= ? and host_code=? and country = ? and province = ? and city = ? and type=? and day=?",
Update: statDayMap,
}
updateBean.Args = append(updateBean.Args, value.TenantId, value.UserCode, value.HostCode, value.Country, value.Province, value.City, value.ACTION, value.Day)
global.GQEQUE_STATS_UPDATE_DB.Enqueue(updateBean)
}
}
}
//第四 给IP打标签 开始
{
var resultIPRule []CountIPRuleResult
global.GWAF_LOCAL_LOG_DB.Raw("SELECT src_ip as ip ,rule,count(src_ip) as cnt FROM \"web_logs\" where task_flag = ? and unix_add_time >= ? and unix_add_time < ? and tenant_id = ? and user_code =? GROUP BY user_code,tenant_id, rule,src_ip",
1, statTimeUnix, endTimeUnix, global.GWAF_TENANT_ID, global.GWAF_USER_CODE).Scan(&resultIPRule)
/****
1.如果不存在则创建
2.如果存在则累加这个IP这个rule的统计数
*/
for _, value := range resultIPRule {
// 检查是否收到关闭信号
if global.GWAF_SHUTDOWN_SIGNAL {
zlog.Info("TaskCounter - Shutdown")
return
}
if value.Rule == "" {
value.Rule = "正常"
}
var ipTag model.IPTag
global.GWAF_LOCAL_DB.Where("tenant_id = ? and user_code = ? and ip=? and ip_tag = ?",
global.GWAF_TENANT_ID, global.GWAF_USER_CODE, value.Ip, value.Rule).Find(&ipTag)
if ipTag.IP == "" {
insertIpTag := &model.IPTag{
BaseOrm: baseorm.BaseOrm{
Id: uuid.GenUUID(),
USER_CODE: global.GWAF_USER_CODE,
Tenant_ID: global.GWAF_TENANT_ID,
CREATE_TIME: customtype.JsonTime(time.Now()),
UPDATE_TIME: customtype.JsonTime(time.Now()),
},
IP: value.Ip,
IPTag: value.Rule,
Cnt: value.Cnt,
Remarks: "",
}
global.GQEQUE_DB.Enqueue(insertIpTag)
} else {
ipTagUpdateMap := map[string]interface{}{
"Cnt": value.Cnt + ipTag.Cnt,
"UPDATE_TIME": customtype.JsonTime(time.Now()),
}
updateBean := innerbean.UpdateModel{
Model: model.IPTag{},
Query: "tenant_id = ? and user_code= ? and ip=? and ip_tag = ?",
Update: ipTagUpdateMap,
}
updateBean.Args = append(updateBean.Args, global.GWAF_TENANT_ID, global.GWAF_USER_CODE, value.Ip, value.Rule)
global.GQEQUE_UPDATE_DB.Enqueue(updateBean)
}
}
} //给IP打标签结束
global.GWAF_SWITCH_TASK_COUNTER = false
}