fix:修正数据统计逻辑

This commit is contained in:
samwaf
2023-11-02 15:16:12 +08:00
parent ec2df7183a
commit 850fda47fc
8 changed files with 106 additions and 15 deletions

View File

@@ -4,6 +4,7 @@ import (
"SamWaf/cache"
"SamWaf/model"
"SamWaf/model/spec"
"SamWaf/wafsnowflake"
"github.com/bytedance/godlp/dlpheader"
Dequelib "github.com/edwingeng/deque"
"gorm.io/gorm"
@@ -69,6 +70,8 @@ var (
//升级相关
GUPDATE_VERSION_URL string = "https://update.samwaf.com/" //
GWAF_SNOWFLAKE_GEN *wafsnowflake.Snowflake //雪花算法
)
func GetCurrentVersionInt() int {

View File

@@ -29,6 +29,8 @@ type WebLog struct {
STATUS_CODE int `json:"status_code"` //状态编码
RES_BODY string `json:"res_body"` //返回信息
POST_FORM string `json:"post_form"` //提交的表单数据
TASK_FLAG int `json:"task_flag" gorm:"default:-1"` //任务处理标记 -1 等待处理 1 可以进行处理 定时任务处理2 处理完毕
UNIX_ADD_TIME int64 `json:"unix_add_time"` //添加日期unix
}
type WAFLog struct {
REQ_UUID string `json:"req_uuid"`

12
main.go
View File

@@ -11,6 +11,7 @@ import (
"SamWaf/utils"
"SamWaf/utils/zlog"
"SamWaf/wafenginecore"
"SamWaf/wafsnowflake"
"SamWaf/waftask"
"crypto/tls"
"fmt"
@@ -50,6 +51,9 @@ func (m *wafSystenService) run() {
fmt.Println("Service is running...")
//初始化cache
global.GCACHE_WAFCACHE = cache.InitWafCache()
// 创建 Snowflake 实例
global.GWAF_SNOWFLAKE_GEN = wafsnowflake.NewSnowflake(1609459200000, 1, 1) // 设置epoch时间、机器ID和数据中心ID
rversion := "初始化系统 版本号:" + global.GWAF_RELEASE_VERSION_NAME + "(" + global.GWAF_RELEASE_VERSION + ")"
if global.GWAF_RELEASE == "false" {
rversion = rversion + " 调试版本"
@@ -90,6 +94,14 @@ func (m *wafSystenService) run() {
wafenginecore.InitDequeEngine()
//启动队列消费
go func() {
defer func() {
e := recover()
if e != nil {
zlog.Info("ProcessErrorException", e)
time.Sleep(3 * time.Second)
wafenginecore.ProcessDequeEngine()
}
}()
wafenginecore.ProcessDequeEngine()
}()

View File

@@ -27,6 +27,7 @@ func InitDequeEngine() {
处理队列信息
*/
func ProcessDequeEngine() {
zlog.Info("ProcessDequeEngine start")
for {
for !global.GQEQUE_DB.Empty() {
@@ -41,6 +42,7 @@ func ProcessDequeEngine() {
if weblogbean != nil {
// 进行类型断言将其转为具体的结构
if logValue, ok := weblogbean.(innerbean.WebLog); ok {
// 类型断言成功
// myValue 现在是具体的 MyStruct 类型
if logValue.WafInnerDFlag == "update" {
@@ -53,8 +55,11 @@ func ProcessDequeEngine() {
global.GWAF_LOCAL_LOG_DB.Model(innerbean.WebLog{}).Where("req_uuid=?", logValue.REQ_UUID).Updates(logMap)
} else {
global.GWAF_LOCAL_LOG_DB.Create(weblogbean)
global.GWAF_LOCAL_LOG_DB.Create(logValue)
}
} else {
//插入其他类型内容
global.GWAF_LOCAL_LOG_DB.Create(weblogbean)
}
}

View File

@@ -102,7 +102,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if _urlEscapeOk != nil {
enEscapeUrl = r.RequestURI
}
datetimeNow := time.Now()
weblogbean := innerbean.WebLog{
HOST: host,
URL: enEscapeUrl,
@@ -115,7 +115,8 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
CITY: region[3],
SRC_IP: ipAndPort[0],
SRC_PORT: ipAndPort[1],
CREATE_TIME: time.Now().Format("2006-01-02 15:04:05"),
CREATE_TIME: datetimeNow.Format("2006-01-02 15:04:05"),
UNIX_ADD_TIME: datetimeNow.UnixNano() / 1e6,
CONTENT_LENGTH: contentLength,
COOKIES: string(cookies),
BODY: string(bodyByte),
@@ -127,6 +128,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ACTION: "通过",
Day: currentDay,
POST_FORM: r.PostForm.Encode(),
TASK_FLAG: -1,
}
formValues := url.Values{}
@@ -404,7 +406,9 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// 取出客户IP
ipAndPort := strings.Split(r.RemoteAddr, ":")
region := utils.GetCountry(ipAndPort[0])
currentDay, _ := strconv.Atoi(time.Now().Format("20060102"))
datetimeNow := time.Now()
currentDay, _ := strconv.Atoi(datetimeNow.Format("20060102"))
weblogbean := innerbean.WebLog{
HOST: r.Host,
URL: r.RequestURI,
@@ -417,7 +421,8 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
CITY: region[3],
SRC_IP: ipAndPort[0],
SRC_PORT: ipAndPort[1],
CREATE_TIME: time.Now().Format("2006-01-02 15:04:05"),
CREATE_TIME: datetimeNow.Format("2006-01-02 15:04:05"),
UNIX_ADD_TIME: datetimeNow.UnixNano() / 1e6,
CONTENT_LENGTH: contentLength,
COOKIES: string(cookies),
BODY: string(bodyByte),
@@ -430,6 +435,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Day: currentDay,
STATUS: "禁止访问",
STATUS_CODE: 403,
TASK_FLAG: 1,
}
//记录响应body
@@ -460,6 +466,7 @@ func EchoErrorInfo(w http.ResponseWriter, r *http.Request, weblogbean innerbean.
weblogbean.ACTION = "阻止"
weblogbean.STATUS = "阻止访问"
weblogbean.STATUS_CODE = 403
weblogbean.TASK_FLAG = 1
global.GQEQUE_LOG_DB.PushBack(weblogbean)
zlog.Debug(ruleName)
@@ -591,6 +598,7 @@ func (waf *WafEngine) modifyResponse() func(*http.Response) error {
weblogbean.ACTION = "放行"
weblogbean.STATUS = resp.Status
weblogbean.STATUS_CODE = resp.StatusCode
weblogbean.TASK_FLAG = 1
global.GQEQUE_LOG_DB.PushBack(weblogbean)
}

View File

@@ -0,0 +1,53 @@
package wafsnowflake
//雪花算法
import (
"fmt"
"sync"
"time"
)
// Snowflake 结构
type Snowflake struct {
mutex sync.Mutex
epoch int64
machineID int64
datacenterID int64
sequence int64
}
// NewSnowflake 创建一个 Snowflake 实例
func NewSnowflake(epoch, machineID, datacenterID int64) *Snowflake {
return &Snowflake{
epoch: epoch,
machineID: machineID,
datacenterID: datacenterID,
sequence: 0,
}
}
// NextID 生成下一个唯一ID 返回0 代表生成失败
func (s *Snowflake) NextID() int64 {
s.mutex.Lock()
defer s.mutex.Unlock()
currentTime := time.Now().UnixNano() / 1000000 // 转换为毫秒
if currentTime < s.epoch {
fmt.Println("错误系统时间小于epoch时间")
return 0
}
if currentTime == s.epoch {
s.sequence++
} else {
s.sequence = 0
}
if s.sequence >= 4096 {
fmt.Println("序列号溢出")
return 0
}
ID := ((currentTime - s.epoch) << 22) | (s.datacenterID << 17) | (s.machineID << 12) | s.sequence
return ID
}

View File

@@ -66,11 +66,13 @@ func TaskCounter() {
currenyDayBak := dateTime*/
currenyDayBak := time.Now()
currentDayBak3Second := currenyDayBak.Add(-3 * time.Second)
currenyDayMillisecondsBak := currentDayBak3Second.UnixNano() / 1e6 //倒查3秒
//一、 主机聚合统计
{
var resultHosts []CountHostResult
global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host FROM \"web_logs\" where create_time>? GROUP BY host_code, user_code,action,tenant_id,day,host",
global.GWAF_LAST_UPDATE_TIME.Format("2006-01-02 15:04:05")).Scan(&resultHosts)
global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host FROM \"web_logs\" where task_flag = ? and unix_add_time <= ? GROUP BY host_code, user_code,action,tenant_id,day,host",
1, currenyDayMillisecondsBak).Scan(&resultHosts)
/****
1.如果不存在则创建
2.如果存在则累加这个周期的统计数
@@ -108,8 +110,8 @@ func TaskCounter() {
//二、 IP聚合统计
{
var resultIP []CountIPResult
global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host,src_ip as ip FROM \"web_logs\" where create_time>? GROUP BY host_code, user_code,action,tenant_id,day,host,ip",
global.GWAF_LAST_UPDATE_TIME.Format("2006-01-02 15:04:05")).Scan(&resultIP)
global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host,src_ip as ip FROM \"web_logs\" where task_flag = ? and unix_add_time <= ? GROUP BY host_code, user_code,action,tenant_id,day,host,ip",
1, currenyDayMillisecondsBak).Scan(&resultIP)
/****
1.如果不存在则创建
2.如果存在则累加这个周期的统计数
@@ -148,8 +150,8 @@ func TaskCounter() {
//三、 城市信息聚合统计
{
var resultCitys []CountCityResult
global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host,country,province,city FROM \"web_logs\" where create_time>? GROUP BY host_code, user_code,action,tenant_id,day,host,country,province,city",
global.GWAF_LAST_UPDATE_TIME.Format("2006-01-02 15:04:05")).Scan(&resultCitys)
global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host,country,province,city FROM \"web_logs\" where task_flag = ? and unix_add_time <= ? GROUP BY host_code, user_code,action,tenant_id,day,host,country,province,city",
1, currenyDayMillisecondsBak).Scan(&resultCitys)
/****
1.如果不存在则创建
2.如果存在则累加这个周期的统计数

View File

@@ -183,5 +183,11 @@ e. mutex profile
go tool pprof main http://192.168.56.101:16060//debug/pprof/mutex
提示输入top 看
https://www.cnblogs.com/zhanchenjin/p/17101573.html
然后进行list SamWaf/wafenginecore.ProcessDequeEngine 查看具体某一个有问题
```