From 850fda47fc96dfbf30a2b061ae70364886d232aa Mon Sep 17 00:00:00 2001 From: samwaf Date: Thu, 2 Nov 2023 15:16:12 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E6=AD=A3=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- global/global.go | 3 ++ innerbean/web_log.go | 10 ++++--- main.go | 12 ++++++++ wafenginecore/dequeengine.go | 7 ++++- wafenginecore/wafengine.go | 16 ++++++++--- wafsnowflake/wafsnowflake.go | 53 ++++++++++++++++++++++++++++++++++++ waftask/localtaskcounter.go | 14 ++++++---- 编译说明.md | 6 ++++ 8 files changed, 106 insertions(+), 15 deletions(-) create mode 100644 wafsnowflake/wafsnowflake.go diff --git a/global/global.go b/global/global.go index 16b071e..fda2810 100644 --- a/global/global.go +++ b/global/global.go @@ -4,6 +4,7 @@ import ( "SamWaf/cache" "SamWaf/model" "SamWaf/model/spec" + "SamWaf/wafsnowflake" "github.com/bytedance/godlp/dlpheader" Dequelib "github.com/edwingeng/deque" "gorm.io/gorm" @@ -69,6 +70,8 @@ var ( //升级相关 GUPDATE_VERSION_URL string = "https://update.samwaf.com/" // + + GWAF_SNOWFLAKE_GEN *wafsnowflake.Snowflake //雪花算法 ) func GetCurrentVersionInt() int { diff --git a/innerbean/web_log.go b/innerbean/web_log.go index 241fc14..10e7f38 100644 --- a/innerbean/web_log.go +++ b/innerbean/web_log.go @@ -25,10 +25,12 @@ type WebLog struct { Day int `json:"day"` //日 (主要键) ACTION string `json:"action"` RULE string `json:"rule"` - STATUS string `json:"status"` //状态 - STATUS_CODE int `json:"status_code"` //状态编码 - RES_BODY string `json:"res_body"` //返回信息 - POST_FORM string `json:"post_form"` //提交的表单数据 + STATUS string `json:"status"` //状态 + STATUS_CODE int `json:"status_code"` //状态编码 + RES_BODY string `json:"res_body"` //返回信息 + POST_FORM string `json:"post_form"` //提交的表单数据 + TASK_FLAG int `json:"task_flag" gorm:"default:-1"` //任务处理标记 -1 等待处理 ;1 可以进行处理 (定时任务处理),2 处理完毕 + UNIX_ADD_TIME int64 `json:"unix_add_time"` //添加日期unix } type WAFLog struct { REQ_UUID string `json:"req_uuid"` diff --git a/main.go b/main.go index eb317fb..df3f126 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "SamWaf/utils" "SamWaf/utils/zlog" "SamWaf/wafenginecore" + "SamWaf/wafsnowflake" "SamWaf/waftask" "crypto/tls" "fmt" @@ -50,6 +51,9 @@ func (m *wafSystenService) run() { fmt.Println("Service is running...") //初始化cache global.GCACHE_WAFCACHE = cache.InitWafCache() + // 创建 Snowflake 实例 + global.GWAF_SNOWFLAKE_GEN = wafsnowflake.NewSnowflake(1609459200000, 1, 1) // 设置epoch时间、机器ID和数据中心ID + rversion := "初始化系统 版本号:" + global.GWAF_RELEASE_VERSION_NAME + "(" + global.GWAF_RELEASE_VERSION + ")" if global.GWAF_RELEASE == "false" { rversion = rversion + " 调试版本" @@ -90,6 +94,14 @@ func (m *wafSystenService) run() { wafenginecore.InitDequeEngine() //启动队列消费 go func() { + defer func() { + e := recover() + if e != nil { + zlog.Info("ProcessErrorException", e) + time.Sleep(3 * time.Second) + wafenginecore.ProcessDequeEngine() + } + }() wafenginecore.ProcessDequeEngine() }() diff --git a/wafenginecore/dequeengine.go b/wafenginecore/dequeengine.go index 5e92b46..19e0e33 100644 --- a/wafenginecore/dequeengine.go +++ b/wafenginecore/dequeengine.go @@ -27,6 +27,7 @@ func InitDequeEngine() { 处理队列信息 */ func ProcessDequeEngine() { + zlog.Info("ProcessDequeEngine start") for { for !global.GQEQUE_DB.Empty() { @@ -41,6 +42,7 @@ func ProcessDequeEngine() { if weblogbean != nil { // 进行类型断言将其转为具体的结构 if logValue, ok := weblogbean.(innerbean.WebLog); ok { + // 类型断言成功 // myValue 现在是具体的 MyStruct 类型 if logValue.WafInnerDFlag == "update" { @@ -53,8 +55,11 @@ func ProcessDequeEngine() { global.GWAF_LOCAL_LOG_DB.Model(innerbean.WebLog{}).Where("req_uuid=?", logValue.REQ_UUID).Updates(logMap) } else { - global.GWAF_LOCAL_LOG_DB.Create(weblogbean) + global.GWAF_LOCAL_LOG_DB.Create(logValue) } + } else { + //插入其他类型内容 + global.GWAF_LOCAL_LOG_DB.Create(weblogbean) } } diff --git a/wafenginecore/wafengine.go b/wafenginecore/wafengine.go index 108031a..c032880 100644 --- a/wafenginecore/wafengine.go +++ b/wafenginecore/wafengine.go @@ -102,7 +102,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) { if _urlEscapeOk != nil { enEscapeUrl = r.RequestURI } - + datetimeNow := time.Now() weblogbean := innerbean.WebLog{ HOST: host, URL: enEscapeUrl, @@ -115,7 +115,8 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) { CITY: region[3], SRC_IP: ipAndPort[0], SRC_PORT: ipAndPort[1], - CREATE_TIME: time.Now().Format("2006-01-02 15:04:05"), + CREATE_TIME: datetimeNow.Format("2006-01-02 15:04:05"), + UNIX_ADD_TIME: datetimeNow.UnixNano() / 1e6, CONTENT_LENGTH: contentLength, COOKIES: string(cookies), BODY: string(bodyByte), @@ -127,6 +128,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) { ACTION: "通过", Day: currentDay, POST_FORM: r.PostForm.Encode(), + TASK_FLAG: -1, } formValues := url.Values{} @@ -404,7 +406,9 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) { // 取出客户IP ipAndPort := strings.Split(r.RemoteAddr, ":") region := utils.GetCountry(ipAndPort[0]) - currentDay, _ := strconv.Atoi(time.Now().Format("20060102")) + datetimeNow := time.Now() + + currentDay, _ := strconv.Atoi(datetimeNow.Format("20060102")) weblogbean := innerbean.WebLog{ HOST: r.Host, URL: r.RequestURI, @@ -417,7 +421,8 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) { CITY: region[3], SRC_IP: ipAndPort[0], SRC_PORT: ipAndPort[1], - CREATE_TIME: time.Now().Format("2006-01-02 15:04:05"), + CREATE_TIME: datetimeNow.Format("2006-01-02 15:04:05"), + UNIX_ADD_TIME: datetimeNow.UnixNano() / 1e6, CONTENT_LENGTH: contentLength, COOKIES: string(cookies), BODY: string(bodyByte), @@ -430,6 +435,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) { Day: currentDay, STATUS: "禁止访问", STATUS_CODE: 403, + TASK_FLAG: 1, } //记录响应body @@ -460,6 +466,7 @@ func EchoErrorInfo(w http.ResponseWriter, r *http.Request, weblogbean innerbean. weblogbean.ACTION = "阻止" weblogbean.STATUS = "阻止访问" weblogbean.STATUS_CODE = 403 + weblogbean.TASK_FLAG = 1 global.GQEQUE_LOG_DB.PushBack(weblogbean) zlog.Debug(ruleName) @@ -591,6 +598,7 @@ func (waf *WafEngine) modifyResponse() func(*http.Response) error { weblogbean.ACTION = "放行" weblogbean.STATUS = resp.Status weblogbean.STATUS_CODE = resp.StatusCode + weblogbean.TASK_FLAG = 1 global.GQEQUE_LOG_DB.PushBack(weblogbean) } diff --git a/wafsnowflake/wafsnowflake.go b/wafsnowflake/wafsnowflake.go new file mode 100644 index 0000000..01a2672 --- /dev/null +++ b/wafsnowflake/wafsnowflake.go @@ -0,0 +1,53 @@ +package wafsnowflake + +//雪花算法 +import ( + "fmt" + "sync" + "time" +) + +// Snowflake 结构 +type Snowflake struct { + mutex sync.Mutex + epoch int64 + machineID int64 + datacenterID int64 + sequence int64 +} + +// NewSnowflake 创建一个 Snowflake 实例 +func NewSnowflake(epoch, machineID, datacenterID int64) *Snowflake { + return &Snowflake{ + epoch: epoch, + machineID: machineID, + datacenterID: datacenterID, + sequence: 0, + } +} + +// NextID 生成下一个唯一ID 返回0 代表生成失败 +func (s *Snowflake) NextID() int64 { + s.mutex.Lock() + defer s.mutex.Unlock() + + currentTime := time.Now().UnixNano() / 1000000 // 转换为毫秒 + if currentTime < s.epoch { + fmt.Println("错误系统时间小于epoch时间") + return 0 + } + + if currentTime == s.epoch { + s.sequence++ + } else { + s.sequence = 0 + } + + if s.sequence >= 4096 { + fmt.Println("序列号溢出") + return 0 + } + + ID := ((currentTime - s.epoch) << 22) | (s.datacenterID << 17) | (s.machineID << 12) | s.sequence + return ID +} diff --git a/waftask/localtaskcounter.go b/waftask/localtaskcounter.go index c0916af..ad9f02c 100644 --- a/waftask/localtaskcounter.go +++ b/waftask/localtaskcounter.go @@ -66,11 +66,13 @@ func TaskCounter() { currenyDayBak := dateTime*/ currenyDayBak := time.Now() + currentDayBak3Second := currenyDayBak.Add(-3 * time.Second) + currenyDayMillisecondsBak := currentDayBak3Second.UnixNano() / 1e6 //倒查3秒 //一、 主机聚合统计 { var resultHosts []CountHostResult - global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host FROM \"web_logs\" where create_time>? GROUP BY host_code, user_code,action,tenant_id,day,host", - global.GWAF_LAST_UPDATE_TIME.Format("2006-01-02 15:04:05")).Scan(&resultHosts) + global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host FROM \"web_logs\" where task_flag = ? and unix_add_time <= ? GROUP BY host_code, user_code,action,tenant_id,day,host", + 1, currenyDayMillisecondsBak).Scan(&resultHosts) /**** 1.如果不存在则创建 2.如果存在则累加这个周期的统计数 @@ -108,8 +110,8 @@ func TaskCounter() { //二、 IP聚合统计 { var resultIP []CountIPResult - global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host,src_ip as ip FROM \"web_logs\" where create_time>? GROUP BY host_code, user_code,action,tenant_id,day,host,ip", - global.GWAF_LAST_UPDATE_TIME.Format("2006-01-02 15:04:05")).Scan(&resultIP) + global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host,src_ip as ip FROM \"web_logs\" where task_flag = ? and unix_add_time <= ? GROUP BY host_code, user_code,action,tenant_id,day,host,ip", + 1, currenyDayMillisecondsBak).Scan(&resultIP) /**** 1.如果不存在则创建 2.如果存在则累加这个周期的统计数 @@ -148,8 +150,8 @@ func TaskCounter() { //三、 城市信息聚合统计 { var resultCitys []CountCityResult - global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host,country,province,city FROM \"web_logs\" where create_time>? GROUP BY host_code, user_code,action,tenant_id,day,host,country,province,city", - global.GWAF_LAST_UPDATE_TIME.Format("2006-01-02 15:04:05")).Scan(&resultCitys) + global.GWAF_LOCAL_LOG_DB.Raw("SELECT host_code, user_code,tenant_id ,action,count(req_uuid) as count,day,host,country,province,city FROM \"web_logs\" where task_flag = ? and unix_add_time <= ? GROUP BY host_code, user_code,action,tenant_id,day,host,country,province,city", + 1, currenyDayMillisecondsBak).Scan(&resultCitys) /**** 1.如果不存在则创建 2.如果存在则累加这个周期的统计数 diff --git a/编译说明.md b/编译说明.md index cd947e5..2089153 100644 --- a/编译说明.md +++ b/编译说明.md @@ -183,5 +183,11 @@ e. mutex profile go tool pprof main http://192.168.56.101:16060//debug/pprof/mutex 提示输入top 看 + + +https://www.cnblogs.com/zhanchenjin/p/17101573.html + +然后进行list SamWaf/wafenginecore.ProcessDequeEngine 查看具体某一个有问题 + ``` \ No newline at end of file