mirror of
https://gitee.com/samwaf/SamWaf.git
synced 2025-12-06 06:58:54 +08:00
feat:增加日志归档能力
This commit is contained in:
@@ -45,4 +45,6 @@ var (
|
||||
wafSysLogService = waf_service.WafSysLogServiceApp
|
||||
wafSystemConfigService = waf_service.WafSystemConfigServiceApp
|
||||
wafDelayMsgService = waf_service.WafDelayMsgServiceApp
|
||||
|
||||
wafShareDbService = waf_service.WafShareDbServiceApp
|
||||
)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"SamWaf/global"
|
||||
"SamWaf/model/common/response"
|
||||
"SamWaf/model/request"
|
||||
"github.com/gin-gonic/gin"
|
||||
@@ -13,6 +14,11 @@ func (w *WafLogAPi) GetDetailApi(c *gin.Context) {
|
||||
var req request.WafAttackLogDetailReq
|
||||
err := c.ShouldBind(&req)
|
||||
if err == nil {
|
||||
if global.GDATA_CURRENT_CHANGE {
|
||||
//如果正在切换库 跳过
|
||||
response.FailWithMessage("正在切换数据库请等待", c)
|
||||
return
|
||||
}
|
||||
wafLog, _ := wafLogService.GetDetailApi(req)
|
||||
response.OkWithDetailed(wafLog, "获取成功", c)
|
||||
} else {
|
||||
@@ -26,7 +32,11 @@ func (w *WafLogAPi) GetListApi(c *gin.Context) {
|
||||
/*//TOOD 模拟意外退出
|
||||
|
||||
os.Exit(-1) //退出进程*/
|
||||
|
||||
if global.GDATA_CURRENT_CHANGE {
|
||||
//如果正在切换库 跳过
|
||||
response.FailWithMessage("正在切换数据库请等待", c)
|
||||
return
|
||||
}
|
||||
wafLogs, total, _ := wafLogService.GetListApi(req)
|
||||
response.OkWithDetailed(response.PageResult{
|
||||
List: wafLogs,
|
||||
@@ -42,6 +52,11 @@ func (w *WafLogAPi) GetListByHostCodeApi(c *gin.Context) {
|
||||
var req request.WafAttackLogSearch
|
||||
err := c.ShouldBind(&req)
|
||||
if err == nil {
|
||||
if global.GDATA_CURRENT_CHANGE {
|
||||
//如果正在切换库 跳过
|
||||
response.FailWithMessage("正在切换数据库请等待", c)
|
||||
return
|
||||
}
|
||||
wafLogs, total, _ := wafLogService.GetListByHostCodeApi(req)
|
||||
response.OkWithDetailed(response.PageResult{
|
||||
List: wafLogs,
|
||||
|
||||
@@ -39,6 +39,7 @@ var (
|
||||
|
||||
GWAF_LOCAL_DB *gorm.DB //通用本地数据库,尊重用户隐私
|
||||
GWAF_LOCAL_LOG_DB *gorm.DB //通用本地数据库存日志数据,尊重用户隐私
|
||||
GWAF_LOCAL_CUSTOM_LOG_DB *gorm.DB //通用本地数据库存历史日志数据, 尊重用户隐私
|
||||
GWAF_LOCAL_STATS_DB *gorm.DB //通用本地数据库存放统计数据,尊重用户隐私
|
||||
GWAF_REMOTE_DB *gorm.DB //仅当用户使用云数据库
|
||||
GWAF_LOCAL_SERVER_PORT int = 26666 // 本地local端口
|
||||
@@ -73,8 +74,10 @@ var (
|
||||
GQEQUE_STATS_UPDATE_DB Dequelib.Deque //统计更新DB队列
|
||||
GQEQUE_MESSAGE_DB Dequelib.Deque //发送消息队列
|
||||
|
||||
/******插入数据最大Batch*****/
|
||||
GDATA_BATCH_INSERT int = 1000 //最大批量插入
|
||||
/******数据库处理参数*****/
|
||||
GDATA_BATCH_INSERT int = 1000 //最大批量插入
|
||||
GDATA_SHARE_DB_SIZE int64 = 100 * 10000 //100w 进行分库 100*10000
|
||||
GDATA_CURRENT_CHANGE bool = false //当前是否正在切换
|
||||
|
||||
/******WebSocket*********/
|
||||
GWebSocket *model.WebSocketOnline
|
||||
|
||||
9
main.go
9
main.go
@@ -144,6 +144,15 @@ func (m *wafSystenService) run() {
|
||||
atomic.StoreUint64(&global.GWAF_RUNTIME_LOG_PROCESS, 0)
|
||||
})
|
||||
|
||||
// 执行分库操作 (每天凌晨3点进行数据归档操作)
|
||||
s.Every(1).Day().At("03:00").Do(func() {
|
||||
if global.GDATA_CURRENT_CHANGE == false {
|
||||
go waftask.TaskShareDbInfo()
|
||||
} else {
|
||||
zlog.Debug("执行分库操作没完成,调度任务PASS")
|
||||
}
|
||||
})
|
||||
|
||||
// 每10秒执行一次
|
||||
s.Every(10).Seconds().Do(func() {
|
||||
if global.GWAF_SWITCH_TASK_COUNTER == false {
|
||||
|
||||
7
model/request/waf_sharedb_search.go
Normal file
7
model/request/waf_sharedb_search.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package request
|
||||
|
||||
import "SamWaf/model/common/request"
|
||||
|
||||
type WafShareDbReq struct {
|
||||
request.PageInfo
|
||||
}
|
||||
19
model/sharedb.go
Normal file
19
model/sharedb.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package model
|
||||
|
||||
import (
|
||||
"SamWaf/customtype"
|
||||
"SamWaf/model/baseorm"
|
||||
)
|
||||
|
||||
/*
|
||||
*
|
||||
分库
|
||||
*/
|
||||
type ShareDb struct {
|
||||
baseorm.BaseOrm
|
||||
DbLogicType string `json:"db_logic_type"` //数据库逻辑类型 默认:”log“
|
||||
StartTime customtype.JsonTime `json:"start_time"` //开始时间
|
||||
EndTime customtype.JsonTime `json:"end_time"` //结束时间
|
||||
FileName string `json:"file_name"` //文件名
|
||||
Cnt int64 `json:"cnt"` //当前数量
|
||||
}
|
||||
34
service/waf_service/waf_sharedb.go
Normal file
34
service/waf_service/waf_sharedb.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package waf_service
|
||||
|
||||
import (
|
||||
"SamWaf/global"
|
||||
"SamWaf/model"
|
||||
"SamWaf/model/request"
|
||||
)
|
||||
|
||||
type WafShareDbService struct{}
|
||||
|
||||
var WafShareDbServiceApp = new(WafShareDbService)
|
||||
|
||||
func (receiver *WafShareDbService) AddApi(shareDb model.ShareDb) error {
|
||||
global.GWAF_LOCAL_DB.Create(shareDb)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (receiver *WafShareDbService) GetListApi(req request.WafShareDbReq) ([]model.ShareDb, int64, error) {
|
||||
var list []model.ShareDb
|
||||
var total int64 = 0
|
||||
|
||||
/*where条件*/
|
||||
var whereField = ""
|
||||
var whereValues []interface{}
|
||||
//where字段
|
||||
whereField = ""
|
||||
|
||||
//where字段赋值
|
||||
|
||||
global.GWAF_LOCAL_DB.Model(&model.ShareDb{}).Where(whereField, whereValues...).Limit(req.PageSize).Offset(req.PageSize * (req.PageIndex - 1)).Find(&list)
|
||||
global.GWAF_LOCAL_DB.Model(&model.ShareDb{}).Where(whereField, whereValues...).Count(&total)
|
||||
|
||||
return list, total, nil
|
||||
}
|
||||
@@ -39,23 +39,30 @@ func ProcessDequeEngine() {
|
||||
global.GWAF_LOCAL_DB.Create(weblogbean)
|
||||
}
|
||||
}
|
||||
var webLogArray []innerbean.WebLog
|
||||
for !global.GQEQUE_LOG_DB.Empty() {
|
||||
atomic.AddUint64(&global.GWAF_RUNTIME_LOG_PROCESS, 1) // 原子增加计数器
|
||||
weblogbean := global.GQEQUE_LOG_DB.PopFront()
|
||||
if weblogbean != nil {
|
||||
// 进行类型断言将其转为具体的结构
|
||||
if logValue, ok := weblogbean.(innerbean.WebLog); ok {
|
||||
webLogArray = append(webLogArray, logValue)
|
||||
} else {
|
||||
//插入其他类型内容
|
||||
global.GWAF_LOCAL_LOG_DB.Create(weblogbean)
|
||||
if global.GDATA_CURRENT_CHANGE {
|
||||
//如果正在切换库 跳过
|
||||
zlog.Debug("正在切换数据库等待中队列")
|
||||
|
||||
} else {
|
||||
var webLogArray []innerbean.WebLog
|
||||
for !global.GQEQUE_LOG_DB.Empty() {
|
||||
atomic.AddUint64(&global.GWAF_RUNTIME_LOG_PROCESS, 1) // 原子增加计数器
|
||||
weblogbean := global.GQEQUE_LOG_DB.PopFront()
|
||||
if weblogbean != nil {
|
||||
// 进行类型断言将其转为具体的结构
|
||||
if logValue, ok := weblogbean.(innerbean.WebLog); ok {
|
||||
webLogArray = append(webLogArray, logValue)
|
||||
} else {
|
||||
//插入其他类型内容
|
||||
global.GWAF_LOCAL_LOG_DB.Create(weblogbean)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(webLogArray) > 0 {
|
||||
global.GWAF_LOCAL_LOG_DB.CreateInBatches(webLogArray, global.GDATA_BATCH_INSERT)
|
||||
}
|
||||
}
|
||||
if len(webLogArray) > 0 {
|
||||
global.GWAF_LOCAL_LOG_DB.CreateInBatches(webLogArray, global.GDATA_BATCH_INSERT)
|
||||
}
|
||||
|
||||
for !global.GQEQUE_STATS_DB.Empty() {
|
||||
bean := global.GQEQUE_STATS_DB.PopFront()
|
||||
global.GWAF_LOCAL_STATS_DB.Create(bean)
|
||||
|
||||
@@ -1,13 +1,18 @@
|
||||
package wafenginecore
|
||||
|
||||
import (
|
||||
"SamWaf/customtype"
|
||||
"SamWaf/global"
|
||||
"SamWaf/innerbean"
|
||||
"SamWaf/model"
|
||||
"SamWaf/model/baseorm"
|
||||
"SamWaf/utils"
|
||||
"SamWaf/utils/zlog"
|
||||
"fmt"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
//"github.com/kangarooxin/gorm-plugin-crypto"
|
||||
//"github.com/kangarooxin/gorm-plugin-crypto/strategy"
|
||||
"github.com/pengge/sqlitedriver"
|
||||
@@ -61,6 +66,9 @@ func InitCoreDb(currentDir string) {
|
||||
//延迟信息
|
||||
db.AutoMigrate(&model.DelayMsg{})
|
||||
|
||||
//分库信息表
|
||||
db.AutoMigrate(&model.ShareDb{})
|
||||
|
||||
global.GWAF_LOCAL_DB.Callback().Query().Before("gorm:query").Register("tenant_plugin:before_query", before_query)
|
||||
global.GWAF_LOCAL_DB.Callback().Query().Before("gorm:update").Register("tenant_plugin:before_update", before_update)
|
||||
|
||||
@@ -93,9 +101,64 @@ func InitLogDb(currentDir string) {
|
||||
db.AutoMigrate(&innerbean.WebLog{})
|
||||
db.AutoMigrate(&model.AccountLog{})
|
||||
db.AutoMigrate(&model.WafSysLog{})
|
||||
|
||||
global.GWAF_LOCAL_LOG_DB.Callback().Query().Before("gorm:query").Register("tenant_plugin:before_query", before_query)
|
||||
global.GWAF_LOCAL_LOG_DB.Callback().Query().Before("gorm:update").Register("tenant_plugin:before_update", before_update)
|
||||
|
||||
var total int64 = 0
|
||||
global.GWAF_LOCAL_DB.Model(&model.ShareDb{}).Count(&total)
|
||||
if total == 0 {
|
||||
|
||||
var logtotal int64 = 0
|
||||
global.GWAF_LOCAL_LOG_DB.Model(&innerbean.WebLog{}).Count(&logtotal)
|
||||
|
||||
sharDbBean := model.ShareDb{
|
||||
BaseOrm: baseorm.BaseOrm{
|
||||
Id: uuid.NewV4().String(),
|
||||
USER_CODE: global.GWAF_USER_CODE,
|
||||
Tenant_ID: global.GWAF_TENANT_ID,
|
||||
CREATE_TIME: customtype.JsonTime(time.Now()),
|
||||
UPDATE_TIME: customtype.JsonTime(time.Now()),
|
||||
},
|
||||
DbLogicType: "log",
|
||||
StartTime: customtype.JsonTime(time.Now()),
|
||||
EndTime: customtype.JsonTime(time.Now()),
|
||||
FileName: "local_log",
|
||||
Cnt: logtotal,
|
||||
}
|
||||
global.GWAF_LOCAL_DB.Create(sharDbBean)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 手工切换日志数据源
|
||||
func InitManaulLogDb(currentDir string, custFileName string) {
|
||||
if currentDir == "" {
|
||||
currentDir = utils.GetCurrentDir()
|
||||
}
|
||||
if global.GWAF_LOCAL_CUSTOM_LOG_DB == nil {
|
||||
path := currentDir + "/data/" + custFileName
|
||||
key := url.QueryEscape(global.GWAF_PWD_LOGDB)
|
||||
dns := fmt.Sprintf("%s?_db_key=%s", path, key)
|
||||
db, err := gorm.Open(sqlite.Open(dns), &gorm.Config{})
|
||||
if err != nil {
|
||||
panic("failed to connect database")
|
||||
}
|
||||
// 启用 WAL 模式
|
||||
_ = db.Exec("PRAGMA journal_mode=WAL;")
|
||||
global.GWAF_LOCAL_CUSTOM_LOG_DB = db
|
||||
//logDB.Use(crypto.NewCryptoPlugin())
|
||||
// 注册默认的AES加解密策略
|
||||
//crypto.RegisterCryptoStrategy(strategy.NewAesCryptoStrategy("3Y)(27EtO^tK8Bj~"))
|
||||
// Migrate the schema
|
||||
//统计处理
|
||||
db.AutoMigrate(&innerbean.WebLog{})
|
||||
db.AutoMigrate(&model.AccountLog{})
|
||||
db.AutoMigrate(&model.WafSysLog{})
|
||||
|
||||
global.GWAF_LOCAL_CUSTOM_LOG_DB.Callback().Query().Before("gorm:query").Register("tenant_plugin:before_query", before_query)
|
||||
global.GWAF_LOCAL_CUSTOM_LOG_DB.Callback().Query().Before("gorm:update").Register("tenant_plugin:before_update", before_update)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,11 +8,14 @@ import (
|
||||
"SamWaf/model/baseorm"
|
||||
"SamWaf/model/request"
|
||||
"SamWaf/service/waf_service"
|
||||
"SamWaf/utils"
|
||||
"SamWaf/utils/zlog"
|
||||
"SamWaf/wafenginecore"
|
||||
"SamWaf/wechat"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
uuid "github.com/satori/go.uuid"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
@@ -59,6 +62,10 @@ type CountCityResult struct {
|
||||
*/
|
||||
|
||||
func TaskCounter() {
|
||||
if global.GWAF_LOCAL_DB == nil || global.GWAF_LOCAL_LOG_DB == nil {
|
||||
zlog.Debug("数据库没有初始化完成呢")
|
||||
return
|
||||
}
|
||||
global.GWAF_SWITCH_TASK_COUNTER = true
|
||||
/*dateTime, err := time.Parse("2006-01-02", "2023-01-01")
|
||||
if err != nil {
|
||||
@@ -73,7 +80,12 @@ func TaskCounter() {
|
||||
3.
|
||||
|
||||
*/
|
||||
|
||||
if global.GDATA_CURRENT_CHANGE {
|
||||
//如果正在切换库 跳过
|
||||
zlog.Debug("正在切换数据库等待中")
|
||||
global.GWAF_SWITCH_TASK_COUNTER = false
|
||||
return
|
||||
}
|
||||
currenyDayBak := time.Now()
|
||||
currenyDayMillisecondsBak := (global.GWAF_LAST_UPDATE_TIME.Add(-10 * time.Second).UnixNano()) / 1e6 //倒退10秒
|
||||
|
||||
@@ -334,6 +346,23 @@ func TaskLoadSetting() {
|
||||
}
|
||||
wafSystemConfigService.AddApi(wafSystemConfigAddReq)
|
||||
}
|
||||
|
||||
configItem = wafSystemConfigService.GetDetailByItem("log_db_size")
|
||||
if configItem.Id != "" {
|
||||
value, err := strconv.ParseInt(configItem.Value, 10, 64)
|
||||
if err == nil {
|
||||
if global.GDATA_SHARE_DB_SIZE != value {
|
||||
global.GDATA_SHARE_DB_SIZE = value
|
||||
}
|
||||
}
|
||||
} else {
|
||||
wafSystemConfigAddReq := request.WafSystemConfigAddReq{
|
||||
Item: "log_db_size",
|
||||
Value: strconv.FormatInt(global.GDATA_SHARE_DB_SIZE, 10),
|
||||
Remarks: "日志归档最大记录数量",
|
||||
}
|
||||
wafSystemConfigService.AddApi(wafSystemConfigAddReq)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -388,3 +417,96 @@ func TaskDelayInfo() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 检测库是否切换
|
||||
func TaskShareDbInfo() {
|
||||
zlog.Debug("检测是否需要进行分库")
|
||||
if global.GDATA_CURRENT_CHANGE {
|
||||
//如果正在切换库 跳过
|
||||
zlog.Debug("切库状态")
|
||||
return
|
||||
}
|
||||
if global.GWAF_LOCAL_DB == nil || global.GWAF_LOCAL_LOG_DB == nil {
|
||||
zlog.Debug("数据库没有初始化完成呢")
|
||||
return
|
||||
}
|
||||
//获取当前日志数量
|
||||
var total int64 = 0
|
||||
global.GWAF_LOCAL_LOG_DB.Model(&innerbean.WebLog{}).Count(&total)
|
||||
if total > global.GDATA_SHARE_DB_SIZE {
|
||||
global.GDATA_CURRENT_CHANGE = true
|
||||
oldDBFilename := "local_log.db"
|
||||
newDBFilename := fmt.Sprintf("local_log_%v.db", time.Now().Format("20060102150405"))
|
||||
|
||||
var lastedDb model.ShareDb
|
||||
err := global.GWAF_LOCAL_DB.Limit(1).Order("create_time desc").Find(&lastedDb).Error
|
||||
startTime := customtype.JsonTime(time.Now())
|
||||
if err == nil {
|
||||
startTime = lastedDb.EndTime
|
||||
}
|
||||
sharDbBean := model.ShareDb{
|
||||
BaseOrm: baseorm.BaseOrm{
|
||||
Id: uuid.NewV4().String(),
|
||||
USER_CODE: global.GWAF_USER_CODE,
|
||||
Tenant_ID: global.GWAF_TENANT_ID,
|
||||
CREATE_TIME: customtype.JsonTime(time.Now()),
|
||||
UPDATE_TIME: customtype.JsonTime(time.Now()),
|
||||
},
|
||||
DbLogicType: "log",
|
||||
StartTime: startTime,
|
||||
EndTime: customtype.JsonTime(time.Now()),
|
||||
FileName: newDBFilename,
|
||||
Cnt: total,
|
||||
}
|
||||
|
||||
currentDir := utils.GetCurrentDir()
|
||||
oldDBFilename = currentDir + "/data/" + oldDBFilename
|
||||
newDBFilename = currentDir + "/data/" + newDBFilename
|
||||
zlog.Debug("正在切库中...")
|
||||
sqlDB, err := global.GWAF_LOCAL_LOG_DB.DB()
|
||||
|
||||
if err != nil {
|
||||
zlog.Error("切换关闭时候错误", err)
|
||||
} else {
|
||||
|
||||
// 关闭数据库连接
|
||||
if err := sqlDB.Close(); err != nil {
|
||||
zlog.Error("切换关闭时候错误", err)
|
||||
}
|
||||
}
|
||||
var testTotal int64
|
||||
for {
|
||||
testError := global.GWAF_LOCAL_LOG_DB.Model(&innerbean.WebLog{}).Count(&testTotal).Error
|
||||
if testError != nil {
|
||||
zlog.Error("检测数据", testError)
|
||||
break
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
// 关闭与数据库相关的连接或程序
|
||||
// 重命名数据库文件
|
||||
if err := os.Rename(oldDBFilename, newDBFilename); err != nil {
|
||||
zlog.Error("Error renaming database file:", err)
|
||||
}
|
||||
|
||||
// 重命名 .db-shm 文件
|
||||
if err := os.Rename(oldDBFilename+"-shm", newDBFilename+"-shm"); err != nil {
|
||||
zlog.Error("Error renaming .db-shm file:", err)
|
||||
// 如果有必要,可以选择回滚数据库文件的重命名
|
||||
|
||||
}
|
||||
|
||||
// 重命名 .db-wal 文件
|
||||
if err := os.Rename(oldDBFilename+"-wal", newDBFilename+"-wal"); err != nil {
|
||||
zlog.Error("Error renaming .db-wal file:", err)
|
||||
// 如果有必要,可以选择回滚数据库文件的重命名
|
||||
}
|
||||
global.GWAF_LOCAL_DB.Create(sharDbBean)
|
||||
global.GWAF_LOCAL_LOG_DB = nil
|
||||
wafenginecore.InitLogDb("")
|
||||
global.GDATA_CURRENT_CHANGE = false
|
||||
zlog.Debug("切库完成...")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user