fix:server map concurrent

#375
This commit is contained in:
samwaf
2025-06-18 14:51:06 +08:00
parent b6e904f99f
commit 5f783b524b
5 changed files with 140 additions and 42 deletions

View File

@@ -30,7 +30,7 @@ func (w *WafHostAPi) AddApi(c *gin.Context) {
err := c.ShouldBindJSON(&req)
if err == nil {
//端口从未在本系统加过,检测端口是否被其他应用占用
_, svrOk := globalobj.GWAF_RUNTIME_OBJ_WAF_ENGINE.ServerOnline[req.Port]
_, svrOk := globalobj.GWAF_RUNTIME_OBJ_WAF_ENGINE.ServerOnline.Get(req.Port)
if !svrOk && utils.PortCheck(req.Port) == false {
//发送websocket 推送消息
global.GQEQUE_MESSAGE_DB.Enqueue(innerbean.OpResultMessageInfo{
@@ -199,7 +199,7 @@ func (w *WafHostAPi) ModifyHostApi(c *gin.Context) {
wafHostOld := wafHostService.GetDetailByCodeApi(req.CODE)
//端口从未在本系统加过,检测端口是否被其他应用占用
_, svrOk := globalobj.GWAF_RUNTIME_OBJ_WAF_ENGINE.ServerOnline[req.Port]
_, svrOk := globalobj.GWAF_RUNTIME_OBJ_WAF_ENGINE.ServerOnline.Get(req.Port)
if !svrOk && utils.PortCheck(req.Port) == false {
//发送websocket 推送消息
global.GQEQUE_MESSAGE_DB.Enqueue(innerbean.OpResultMessageInfo{
@@ -295,7 +295,7 @@ func (w *WafHostAPi) ModifyStartStatusApi(c *gin.Context) {
if err == nil {
wafHostOld := wafHostService.GetDetailByCodeApi(req.CODE)
_, svrOk := globalobj.GWAF_RUNTIME_OBJ_WAF_ENGINE.ServerOnline[wafHostOld.Port]
_, svrOk := globalobj.GWAF_RUNTIME_OBJ_WAF_ENGINE.ServerOnline.Get(wafHostOld.Port)
if req.START_STATUS == 0 && !svrOk && utils.PortCheck(wafHostOld.Port) == false {
//发送websocket 推送消息

View File

@@ -7,7 +7,6 @@ import (
"SamWaf/enums"
"SamWaf/global"
"SamWaf/globalobj"
"SamWaf/innerbean"
"SamWaf/model"
"SamWaf/model/wafenginmodel"
"SamWaf/utils"
@@ -275,7 +274,7 @@ func (m *wafSystenService) run() {
HostCode: map[string]string{},
HostTargetNoPort: map[string]string{},
HostTargetMoreDomain: map[string]string{},
ServerOnline: map[int]innerbean.ServerRunTime{},
ServerOnline: wafenginmodel.NewSafeServerMap(),
//所有证书情况 对应端口 可能多个端口都是https 443或者其他非标准端口也要实现https证书
AllCertificate: wafenginecore.AllCertificate{
Mux: sync.Mutex{},

View File

@@ -0,0 +1,84 @@
package wafenginmodel
import (
"SamWaf/innerbean"
"sync"
)
// SafeServerMap 线程安全的ServerRunTime Map
type SafeServerMap struct {
mu sync.RWMutex
items map[int]innerbean.ServerRunTime
}
// NewSafeServerMap 创建新的安全Map
func NewSafeServerMap() *SafeServerMap {
return &SafeServerMap{
items: make(map[int]innerbean.ServerRunTime),
}
}
// Get 获取值
func (m *SafeServerMap) Get(key int) (innerbean.ServerRunTime, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
val, ok := m.items[key]
return val, ok
}
// Set 设置值
func (m *SafeServerMap) Set(key int, value innerbean.ServerRunTime) {
m.mu.Lock()
defer m.mu.Unlock()
m.items[key] = value
}
// Delete 删除值
func (m *SafeServerMap) Delete(key int) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.items, key)
}
// Range 遍历Map
func (m *SafeServerMap) Range(f func(key int, value innerbean.ServerRunTime) bool) {
// 先获取所有数据的副本
m.mu.RLock()
copy := make(map[int]innerbean.ServerRunTime, len(m.items))
for k, v := range m.items {
copy[k] = v
}
m.mu.RUnlock()
// 在副本上遍历,不持有锁
for k, v := range copy {
if !f(k, v) {
break
}
}
}
// Len 获取Map长度
func (m *SafeServerMap) Len() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.items)
}
// GetAll 获取所有值
func (m *SafeServerMap) GetAll() map[int]innerbean.ServerRunTime {
m.mu.RLock()
defer m.mu.RUnlock()
result := make(map[int]innerbean.ServerRunTime, len(m.items))
for k, v := range m.items {
result[k] = v
}
return result
}
// Clear 清空Map
func (m *SafeServerMap) Clear() {
m.mu.Lock()
defer m.mu.Unlock()
m.items = make(map[int]innerbean.ServerRunTime)
}

View File

@@ -50,7 +50,7 @@ type WafEngine struct {
//更多域名和配置防护关系 (key:主机域名,value:主机的hostCode) 一个主机绑定很多域名的情况
HostTargetMoreDomain map[string]string
//服务在线情况key端口value :服务情况)
ServerOnline map[int]innerbean.ServerRunTime
ServerOnline *wafenginmodel.SafeServerMap
AllCertificate AllCertificate //所有证书
EngineCurrentStatus int // 当前waf引擎状态
@@ -177,7 +177,7 @@ func (waf *WafEngine) ServeHTTP(w http.ResponseWriter, r *http.Request) {
zlog.Error("get client error", ipErr.Error())
return
}
_, ok := waf.ServerOnline[hostTarget.Host.Remote_port]
_, ok := waf.ServerOnline.Get(hostTarget.Host.Remote_port)
//检测如果访问IP和远程IP是同一个IP且远程端口在本地Server已存在则显示配置错误
if clientIP == hostTarget.Host.Remote_ip && ok == true {
resBytes := []byte("500: 配置有误" + host + " 当前IP和访问远端IP一样且端口也一样会造成循环问题")
@@ -1174,7 +1174,7 @@ func (waf *WafEngine) CloseWaf() {
waf.HostTarget = map[string]*wafenginmodel.HostSafe{}
waf.HostCode = map[string]string{}
waf.HostTargetNoPort = map[string]string{}
waf.ServerOnline = map[int]innerbean.ServerRunTime{}
waf.ServerOnline.Clear()
waf.AllCertificate = AllCertificate{
Mux: sync.Mutex{},
Map: map[string]*tls.Certificate{},
@@ -1198,9 +1198,10 @@ func (waf *WafEngine) ClearProxy(hostCode string) {
// 开启所有代理
func (waf *WafEngine) StartAllProxyServer() {
for _, v := range waf.ServerOnline {
waf.ServerOnline.Range(func(port int, v innerbean.ServerRunTime) bool {
waf.StartProxyServer(v)
}
return true
})
waf.EnumAllPortProxyServer()
waf.ReLoadSensitive()
@@ -1209,9 +1210,10 @@ func (waf *WafEngine) StartAllProxyServer() {
// 罗列端口
func (waf *WafEngine) EnumAllPortProxyServer() {
onlinePorts := ""
for _, v := range waf.ServerOnline {
waf.ServerOnline.Range(func(port int, v innerbean.ServerRunTime) bool {
onlinePorts = strconv.Itoa(v.Port) + "," + onlinePorts
}
return true
})
global.GWAF_RUNTIME_CURRENT_WEBPORT = onlinePorts
}
@@ -1249,10 +1251,10 @@ func (waf *WafEngine) StartProxyServer(innruntime innerbean.ServerRunTime) {
}
svr = redirectServer.Server
serclone := waf.ServerOnline[innruntime.Port]
serclone, _ := waf.ServerOnline.Get(innruntime.Port)
serclone.Svr = svr
serclone.Status = 0
waf.ServerOnline[innruntime.Port] = serclone
waf.ServerOnline.Set(innruntime.Port, serclone)
zlog.Info("启动HTTPS重定向服务器" + strconv.Itoa(innruntime.Port))
err := redirectServer.ListenAndServeTLS("", "")
if err == http.ErrServerClosed {
@@ -1266,10 +1268,10 @@ func (waf *WafEngine) StartProxyServer(innruntime innerbean.ServerRunTime) {
GetCertificate: waf.GetCertificateFunc,
},
}
serclone := waf.ServerOnline[innruntime.Port]
serclone, _ := waf.ServerOnline.Get(innruntime.Port)
serclone.Svr = svr
serclone.Status = 0
waf.ServerOnline[innruntime.Port] = serclone
waf.ServerOnline.Set(innruntime.Port, serclone)
zlog.Info("启动HTTPS 服务器" + strconv.Itoa(innruntime.Port))
err := svr.ListenAndServeTLS("", "")
if err == http.ErrServerClosed {
@@ -1303,11 +1305,11 @@ func (waf *WafEngine) StartProxyServer(innruntime innerbean.ServerRunTime) {
Addr: ":" + strconv.Itoa(innruntime.Port),
Handler: waf,
}
serclone := waf.ServerOnline[innruntime.Port]
serclone, _ := waf.ServerOnline.Get(innruntime.Port)
serclone.Svr = svr
serclone.Status = 0
waf.ServerOnline[innruntime.Port] = serclone
waf.ServerOnline.Set(innruntime.Port, serclone)
zlog.Info("启动HTTP 服务器" + strconv.Itoa(innruntime.Port))
err := svr.ListenAndServe()
@@ -1337,9 +1339,10 @@ func (waf *WafEngine) StartProxyServer(innruntime innerbean.ServerRunTime) {
// 关闭所有代理服务
func (waf *WafEngine) StopAllProxyServer() {
for _, v := range waf.ServerOnline {
waf.ServerOnline.Range(func(port int, v innerbean.ServerRunTime) bool {
waf.StopProxyServer(v)
}
return true
})
}
// 关闭指定代理服务

View File

@@ -42,16 +42,16 @@ func (waf *WafEngine) LoadHost(inHost model.Hosts) []innerbean.ServerRunTime {
if inHost.GLOBAL_HOST == 1 {
global.GWAF_GLOBAL_HOST_CODE = inHost.Code
}
onlineServer, ok := waf.ServerOnline[inHost.Port]
onlineServer, ok := waf.ServerOnline.Get(inHost.Port)
if ok == false && inHost.GLOBAL_HOST == 0 {
if inHost.START_STATUS == 0 {
waf.ServerOnline[inHost.Port] = innerbean.ServerRunTime{
waf.ServerOnline.Set(inHost.Port, innerbean.ServerRunTime{
ServerType: utils.GetServerByHosts(inHost),
Port: inHost.Port,
Status: 1,
}
})
} else {
delete(waf.ServerOnline, inHost.Port)
waf.ServerOnline.Delete(inHost.Port)
}
} else if ok {
@@ -71,16 +71,16 @@ func (waf *WafEngine) LoadHost(inHost model.Hosts) []innerbean.ServerRunTime {
//检查是否存在强制跳转HTTPS的情况
if inHost.AutoJumpHTTPS == 1 {
default80Port := 80
_, ok := waf.ServerOnline[default80Port]
_, ok := waf.ServerOnline.Get(default80Port)
if ok == false && inHost.GLOBAL_HOST == 0 {
if inHost.START_STATUS == 0 {
waf.ServerOnline[default80Port] = innerbean.ServerRunTime{
waf.ServerOnline.Set(default80Port, innerbean.ServerRunTime{
ServerType: "http",
Port: default80Port,
Status: 1,
}
})
} else {
delete(waf.ServerOnline, default80Port)
waf.ServerOnline.Delete(default80Port)
}
}
}
@@ -95,21 +95,21 @@ func (waf *WafEngine) LoadHost(inHost model.Hosts) []innerbean.ServerRunTime {
continue
}
ports = append(ports, port)
_, ok := waf.ServerOnline[port]
_, ok := waf.ServerOnline.Get(port)
if ok == false {
if inHost.START_STATUS == 0 {
if port == 443 {
waf.ServerOnline[port] = innerbean.ServerRunTime{
waf.ServerOnline.Set(port, innerbean.ServerRunTime{
ServerType: "https",
Port: port,
Status: 1,
}
})
} else {
waf.ServerOnline[port] = innerbean.ServerRunTime{
waf.ServerOnline.Set(port, innerbean.ServerRunTime{
ServerType: "http",
Port: port,
Status: 1,
}
})
}
}
@@ -263,11 +263,14 @@ func (waf *WafEngine) LoadHost(inHost model.Hosts) []innerbean.ServerRunTime {
}
var serverOnlines = []innerbean.ServerRunTime{}
serverOnlines = append(serverOnlines, waf.ServerOnline[inHost.Port])
serverOnline, isExist := waf.ServerOnline.Get(inHost.Port)
if isExist {
serverOnlines = append(serverOnlines, serverOnline)
}
for _, port := range ports {
_, ok := waf.ServerOnline[port]
if ok == true {
serverOnlines = append(serverOnlines, waf.ServerOnline[port])
serverOnline, isExist := waf.ServerOnline.Get(port)
if isExist {
serverOnlines = append(serverOnlines, serverOnline)
}
}
return serverOnlines
@@ -275,23 +278,32 @@ func (waf *WafEngine) LoadHost(inHost model.Hosts) []innerbean.ServerRunTime {
// RemovePortServer 检测如果没有端口在占用了,可以关闭相应端口
func (waf *WafEngine) RemovePortServer() {
for onlinePort := range waf.ServerOnline {
// 使用Range方法安全地遍历ServerOnline
portsToRemove := make([]int, 0)
waf.ServerOnline.Range(func(onlinePort int, serverRuntime innerbean.ServerRunTime) bool {
if waf_service.WafHostServiceApp.CheckAvailablePortExistApi(onlinePort) == 0 {
//暂停服务 并 移除服务信息
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, svrOk := waf.ServerOnline[onlinePort]
if svrOk {
err := waf.ServerOnline[onlinePort].Svr.Shutdown(ctx)
if serverRuntime.Svr != nil {
err := serverRuntime.Svr.Shutdown(ctx)
if err != nil {
zlog.Error("shutting down: " + err.Error())
} else {
zlog.Info("shutdown processed successfully port" + strconv.Itoa(onlinePort))
}
delete(waf.ServerOnline, onlinePort)
}
// 记录需要删除的端口
portsToRemove = append(portsToRemove, onlinePort)
}
return true // 继续遍历
})
// 删除已关闭的端口
for _, port := range portsToRemove {
waf.ServerOnline.Delete(port)
}
}