feat:host qps /active count

Closes #64
This commit is contained in:
samwaf
2024-11-29 10:14:03 +08:00
parent 52e51eaa01
commit de58b554a5
10 changed files with 236 additions and 99 deletions

View File

@@ -11,6 +11,7 @@ import (
response2 "SamWaf/model/response"
"SamWaf/model/spec"
"SamWaf/utils"
"SamWaf/wafenginecore"
"errors"
"fmt"
"github.com/gin-gonic/gin"
@@ -71,8 +72,18 @@ func (w *WafHostAPi) GetListApi(c *gin.Context) {
err := c.ShouldBindJSON(&req)
if err == nil {
wafHosts, total, _ := wafHostService.GetListApi(req)
// 初始化返回结果列表
var repList []response2.HostRep
for _, srcHost := range wafHosts {
rep := response2.HostRep{
Hosts: srcHost,
RealTimeConnectCnt: wafenginecore.GetActiveConnectCnt(srcHost.Code),
RealTimeQps: wafenginecore.GetQPS(srcHost.Code),
}
repList = append(repList, rep)
}
response.OkWithDetailed(response.PageResult{
List: wafHosts,
List: repList,
Total: total,
PageIndex: req.PageIndex,
PageSize: req.PageSize,

View File

@@ -84,6 +84,7 @@ var (
/****队列相关*****/
GQEQUE_DB *queue.Queue //正常DB队列
GQEQUE_UPDATE_DB *queue.Queue //正常DB 更新队列
GQEQUE_LOG_DB *queue.Queue //日志DB队列
GQEQUE_STATS_DB *queue.Queue //统计DB队列
GQEQUE_STATS_UPDATE_DB *queue.Queue //统计更新DB队列

View File

@@ -274,6 +274,12 @@ func (m *wafSystenService) run() {
atomic.StoreUint64(&global.GWAF_RUNTIME_QPS, 0)
atomic.StoreUint64(&global.GWAF_RUNTIME_LOG_PROCESS, 0)
})
// 重置QPS数据
globalobj.GWAF_RUNTIME_OBJ_WAF_CRON.Every(1).Seconds().Do(func() {
go wafenginecore.ResetQPS()
})
go waftask.TaskShareDbInfo()
// 执行分库操作 每天凌晨3点进行数据归档操作
globalobj.GWAF_RUNTIME_OBJ_WAF_CRON.Every(1).Day().At("03:00").Do(func() {

View File

@@ -0,0 +1,9 @@
package response
import "SamWaf/model"
type HostRep struct {
model.Hosts
RealTimeQps int `json:"real_time_qps"` //实时信息QPS
RealTimeConnectCnt int `json:"real_time_connect_cnt"` //实时连接数量
}

View File

@@ -0,0 +1,77 @@
package wafenginecore
import (
"sync"
)
// 存储每个网站的活动连接数
var activeConnections = make(map[string]int)
var connectionLock sync.Mutex
// 存储每个网站的 QPS
var siteQPS = make(map[string]int)
var qpsLock sync.Mutex
// 增加活动连接数
func incrementActiveConnections(site string) {
defer connectionLock.Unlock()
connectionLock.Lock()
activeConnections[site]++
}
// 减少活动连接数
func decrementActiveConnections(site string) {
defer connectionLock.Unlock()
connectionLock.Lock()
activeConnections[site]--
}
// 增加 QPS 计数
func incrementQPS(site string) {
defer qpsLock.Unlock()
qpsLock.Lock()
siteQPS[site]++
}
// 每秒重置 QPS 计数器
func ResetQPS() {
qpsLock.Lock()
for site := range siteQPS {
siteQPS[site] = 0 // 重置为 0
}
qpsLock.Unlock()
}
// 获取某个网站的 QPS
func GetQPS(site string) int {
qpsLock.Lock()
defer qpsLock.Unlock()
if qps, exists := siteQPS[site]; exists {
return qps
}
return 0
}
// 获取某个网站的 当前活动连接
func GetActiveConnectCnt(site string) int {
connectionLock.Lock()
defer connectionLock.Unlock()
if cnt, exists := activeConnections[site]; exists {
return cnt
}
return 0
}
// 增加访问量
func incrementMonitor(hostcode string) {
// 增加活动连接数
incrementActiveConnections(hostcode)
// 增加 QPS 计数
incrementQPS(hostcode)
}
// 减少访问量
func decrementMonitor(hostcode string) {
// 减少活动连接数
decrementActiveConnections(hostcode)
}

104
wafenginecore/hostssl.go Normal file
View File

@@ -0,0 +1,104 @@
package wafenginecore
import (
"SamWaf/common/zlog"
"crypto/tls"
"errors"
"strings"
"sync"
)
type AllCertificate struct {
Mux sync.Mutex
Map map[string]*tls.Certificate
}
// LoadSSL 加载证书
func (ac *AllCertificate) LoadSSL(domain string, cert string, key string) error {
ac.Mux.Lock()
defer ac.Mux.Unlock()
domain = strings.ToLower(domain)
// 加载新的证书
newCert, err := tls.X509KeyPair([]byte(cert), []byte(key))
if err != nil {
return err
}
certificate, ok := ac.Map[domain]
if !ok {
ac.Map[domain] = &newCert
return nil
} else {
if certificate == nil {
ac.Map[domain] = &newCert
return nil
}
if certificate != nil && certificate.Certificate[0] != nil {
zlog.Debug("需要重新加载证书")
ac.Map[domain] = &newCert
}
}
// 检查域名是否已存在,如果存在则替换
ac.Map[domain] = &newCert
return nil
}
// LoadSSLByFilePath 加载证书从文件
func (ac *AllCertificate) LoadSSLByFilePath(domain string, certPath string, keyPath string) error {
ac.Mux.Lock()
defer ac.Mux.Unlock()
domain = strings.ToLower(domain)
// 加载新的证书
newCert, err := tls.LoadX509KeyPair(certPath, keyPath)
if err != nil {
return err
}
certificate, ok := ac.Map[domain]
if !ok {
ac.Map[domain] = &newCert
return nil
} else {
if certificate != nil && certificate.Certificate[0] != nil {
zlog.Debug("需要重新加载证书")
ac.Map[domain] = &newCert
}
}
// 检查域名是否已存在,如果存在则替换
ac.Map[domain] = &newCert
return nil
}
// RemoveSSL 移除证书
func (ac *AllCertificate) RemoveSSL(domain string) error {
ac.Mux.Lock()
defer ac.Mux.Unlock()
domain = strings.ToLower(domain)
_, ok := ac.Map[domain]
if ok {
ac.Map[domain] = nil
}
return nil
}
// GetSSL 加载证书
func (ac *AllCertificate) GetSSL(domain string) *tls.Certificate {
ac.Mux.Lock()
defer ac.Mux.Unlock()
domain = strings.ToLower(domain)
certificate, ok := ac.Map[domain]
if ok {
return certificate
}
return nil
}
// GetCertificateFunc 获取证书的函数
func (waf *WafEngine) GetCertificateFunc(clientInfo *tls.ClientHelloInfo) (*tls.Certificate, error) {
zlog.Debug("GetCertificate ", clientInfo.ServerName)
x509Cert := waf.AllCertificate.GetSSL(clientInfo.ServerName)
if x509Cert != nil {
return x509Cert, nil
}
return nil, errors.New("config error")
}

View File

@@ -61,100 +61,6 @@ type WafEngine struct {
Sensitive []model.Sensitive //敏感词
SensitiveManager *goahocorasick.Machine
}
type AllCertificate struct {
Mux sync.Mutex
Map map[string]*tls.Certificate
}
// LoadSSL 加载证书
func (ac *AllCertificate) LoadSSL(domain string, cert string, key string) error {
ac.Mux.Lock()
defer ac.Mux.Unlock()
domain = strings.ToLower(domain)
// 加载新的证书
newCert, err := tls.X509KeyPair([]byte(cert), []byte(key))
if err != nil {
return err
}
certificate, ok := ac.Map[domain]
if !ok {
ac.Map[domain] = &newCert
return nil
} else {
if certificate == nil {
ac.Map[domain] = &newCert
return nil
}
if certificate != nil && certificate.Certificate[0] != nil {
zlog.Debug("需要重新加载证书")
ac.Map[domain] = &newCert
}
}
// 检查域名是否已存在,如果存在则替换
ac.Map[domain] = &newCert
return nil
}
// LoadSSLByFilePath 加载证书从文件
func (ac *AllCertificate) LoadSSLByFilePath(domain string, certPath string, keyPath string) error {
ac.Mux.Lock()
defer ac.Mux.Unlock()
domain = strings.ToLower(domain)
// 加载新的证书
newCert, err := tls.LoadX509KeyPair(certPath, keyPath)
if err != nil {
return err
}
certificate, ok := ac.Map[domain]
if !ok {
ac.Map[domain] = &newCert
return nil
} else {
if certificate != nil && certificate.Certificate[0] != nil {
zlog.Debug("需要重新加载证书")
ac.Map[domain] = &newCert
}
}
// 检查域名是否已存在,如果存在则替换
ac.Map[domain] = &newCert
return nil
}
// RemoveSSL 移除证书
func (ac *AllCertificate) RemoveSSL(domain string) error {
ac.Mux.Lock()
defer ac.Mux.Unlock()
domain = strings.ToLower(domain)
_, ok := ac.Map[domain]
if ok {
ac.Map[domain] = nil
}
return nil
}
// GetSSL 加载证书
func (ac *AllCertificate) GetSSL(domain string) *tls.Certificate {
ac.Mux.Lock()
defer ac.Mux.Unlock()
domain = strings.ToLower(domain)
certificate, ok := ac.Map[domain]
if ok {
return certificate
}
return nil
}
// GetCertificateFunc 获取证书的函数
func (waf *WafEngine) GetCertificateFunc(clientInfo *tls.ClientHelloInfo) (*tls.Certificate, error) {
zlog.Debug("GetCertificate ", clientInfo.ServerName)
x509Cert := waf.AllCertificate.GetSSL(clientInfo.ServerName)
if x509Cert != nil {
return x509Cert, nil
}
return nil, errors.New("config error")
}
func (waf *WafEngine) Error() string {
fs := "HTTP: %d, HostCode: %d, Message: %s"
@@ -204,6 +110,8 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
// 检查域名是否已经注册
if findHost == true {
incrementMonitor(waf.HostTarget[host].Host.Code)
//检测网站是否已关闭
if waf.HostTarget[host].Host.START_STATUS == 1 {
resBytes := []byte("<html><head><title>网站已关闭</title></head><body><center><h1>当前访问网站已关闭</h1> <br><h3></h3></center></body> </html>")
@@ -334,6 +242,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
handleBlock := func(checkFunc func(*http.Request, *innerbean.WebLog, url.Values) detection.Result) bool {
detectionResult := checkFunc(r, &weblogbean, formValues)
if detectionResult.IsBlock {
decrementMonitor(waf.HostTarget[host].Host.Code)
EchoErrorInfo(w, r, weblogbean, detectionResult.Title, detectionResult.Content)
return true
}
@@ -429,6 +338,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), "weblog", weblogbean)
// 代理请求
waf.ProxyHTTP(w, r, host, remoteUrl, clientIP, ctx, weblogbean)
decrementMonitor(waf.HostTarget[host].Host.Code)
return
} else {
resBytes := []byte("403: Host forbidden " + host)

View File

@@ -1,7 +1,9 @@
package wafqueue
import (
"SamWaf/common/zlog"
"SamWaf/global"
"SamWaf/innerbean"
"time"
)
@@ -15,7 +17,23 @@ func ProcessCoreDequeEngine() {
bean, ok := global.GQEQUE_DB.Dequeue()
if ok {
if bean != nil {
global.GWAF_LOCAL_DB.Create(bean)
tx := global.GWAF_LOCAL_DB.Create(bean)
if tx.Error != nil {
zlog.Error("LocalDBerror", tx.Error.Error())
}
}
}
}
for !global.GQEQUE_UPDATE_DB.Empty() {
bean, ok := global.GQEQUE_UPDATE_DB.Dequeue()
if ok {
if bean != nil {
// 进行类型断言将其转为具体的结构
if UpdateValue, ok := bean.(innerbean.UpdateModel); ok {
global.GWAF_LOCAL_DB.Model(UpdateValue.Model).Where(UpdateValue.Query,
UpdateValue.Args...).Updates(UpdateValue.Update)
}
}
}

View File

@@ -11,6 +11,7 @@ import (
*/
func InitDequeEngine() {
global.GQEQUE_DB = queue.NewQueue()
global.GQEQUE_UPDATE_DB = queue.NewQueue()
global.GQEQUE_LOG_DB = queue.NewQueue()
global.GQEQUE_STATS_DB = queue.NewQueue()
global.GQEQUE_STATS_UPDATE_DB = queue.NewQueue()

View File

@@ -247,7 +247,7 @@ func TaskCounter() {
value.Rule = "正常"
}
var ipTag model.IPTag
global.GWAF_LOCAL_DB.Where("tenant_id = ? and user_code = ? and ip=? and rule = ?",
global.GWAF_LOCAL_DB.Where("tenant_id = ? and user_code = ? and ip=? and ip_tag = ?",
global.GWAF_TENANT_ID, global.GWAF_USER_CODE, value.Ip, value.Rule).Find(&ipTag)
if ipTag.IP == "" {
insertIpTag := &model.IPTag{
@@ -271,11 +271,11 @@ func TaskCounter() {
}
updateBean := innerbean.UpdateModel{
Model: model.IPTag{},
Query: "tenant_id = ? and user_code= ? and ip=? and rule = ?",
Query: "tenant_id = ? and user_code= ? and ip=? and ip_tag = ?",
Update: ipTagUpdateMap,
}
updateBean.Args = append(updateBean.Args, global.GWAF_TENANT_ID, global.GWAF_USER_CODE, value.Ip, value.Rule)
global.GQEQUE_DB.Enqueue(updateBean)
global.GQEQUE_UPDATE_DB.Enqueue(updateBean)
}
}