From 38b4a7957c479c1ddb82fbd0bce45a3cc35a7f5e Mon Sep 17 00:00:00 2001 From: samwaf Date: Mon, 24 Nov 2025 11:11:43 +0800 Subject: [PATCH] fix:core db_migrate_fail add execute sql #539 #557 --- main.go | 5 + wafdb/localdb.go | 383 +++++++++++++++++++++++++++++++++++++- wafdb/migrations_core.go | 65 ++++++- wafdb/migrations_log.go | 16 +- wafdb/migrations_stats.go | 65 ++++++- 5 files changed, 517 insertions(+), 17 deletions(-) diff --git a/main.go b/main.go index 6cb88d4..baf8113 100644 --- a/main.go +++ b/main.go @@ -791,6 +791,10 @@ func main() { fmt.Println("如果您遇到数据库损坏错误") fmt.Println("可以使用此工具尝试修复。\n") wafdb.RepairAllDatabases("") + case "execsql": //执行SQL语句 + fmt.Println("\n💻 SQL 执行工具") + fmt.Println("可以在指定数据库上执行 SQL 语句\n") + wafdb.ExecuteSQLCommand("") default: fmt.Printf("Command '%s' is not recognized.\n", command) fmt.Println("\n可用命令:") @@ -802,6 +806,7 @@ func main() { fmt.Println(" resetpwd - 重置管理员密码") fmt.Println(" resetotp - 重置安全码") fmt.Println(" repairdb - 修复损坏的数据库") + fmt.Println(" execsql - 执行SQL语句(支持SELECT/UPDATE/DELETE等)") fmt.Println("") } return diff --git a/wafdb/localdb.go b/wafdb/localdb.go index 04bf540..3b71634 100644 --- a/wafdb/localdb.go +++ b/wafdb/localdb.go @@ -9,6 +9,7 @@ import ( "SamWaf/model" "SamWaf/model/baseorm" "SamWaf/utils" + "bufio" "context" "database/sql" "fmt" @@ -85,8 +86,11 @@ func InitCoreDb(currentDir string) (bool, error) { // ============ 使用 gormigrate 替代 AutoMigrate(完全向后兼容) ============ zlog.Info("开始执行core数据库迁移...") if err := RunCoreDBMigrations(db); err != nil { - zlog.Error("core数据库迁移失败", "error", err) - panic("core database migration failed: " + err.Error()) + // 记录详细的错误信息 + errStr := fmt.Sprintf("%v", err) + zlog.Error("core数据库迁移失败", "error_string", errStr, "error_type", fmt.Sprintf("%T", err)) + zlog.Error("core数据库迁移失败详细信息: " + errStr) + panic("core database migration failed: " + errStr) } // ============ 迁移代码结束 ============ @@ -143,8 +147,10 @@ func InitLogDb(currentDir string) (bool, error) { // ============ 使用 gormigrate 替代 AutoMigrate(完全向后兼容) ============ zlog.Info("开始执行log数据库迁移...") if err := RunLogDBMigrations(db); err != nil { - zlog.Error("log数据库迁移失败", "error", err) - panic("log database migration failed: " + err.Error()) + errStr := fmt.Sprintf("%v", err) + zlog.Error("log数据库迁移失败", "error_string", errStr, "error_type", fmt.Sprintf("%T", err)) + zlog.Error("log数据库迁移失败详细信息: " + errStr) + panic("log database migration failed: " + errStr) } // ============ 迁移代码结束 ============ @@ -215,8 +221,10 @@ func InitManaulLogDb(currentDir string, custFileName string) { // ============ 使用 gormigrate 替代 AutoMigrate(完全向后兼容) ============ zlog.Info("开始执行手动log数据库迁移...", "file", custFileName) if err := RunLogDBMigrations(db); err != nil { - zlog.Error("手动log数据库迁移失败", "file", custFileName, "error", err) - panic("manual log database migration failed: " + err.Error()) + errStr := fmt.Sprintf("%v", err) + zlog.Error("手动log数据库迁移失败", "file", custFileName, "error_string", errStr, "error_type", fmt.Sprintf("%T", err)) + zlog.Error("手动log数据库迁移失败详细信息: " + errStr) + panic("manual log database migration failed: " + errStr) } // ============ 迁移代码结束 ============ @@ -265,8 +273,10 @@ func InitStatsDb(currentDir string) (bool, error) { // ============ 使用 gormigrate 替代 AutoMigrate(完全向后兼容) ============ zlog.Info("开始执行stats数据库迁移...") if err := RunStatsDBMigrations(db); err != nil { - zlog.Error("stats数据库迁移失败", "error", err) - panic("stats database migration failed: " + err.Error()) + errStr := fmt.Sprintf("%v", err) + zlog.Error("stats数据库迁移失败", "error_string", errStr, "error_type", fmt.Sprintf("%T", err)) + zlog.Error("stats数据库迁移失败详细信息: " + errStr) + panic("stats database migration failed: " + errStr) } // ============ 迁移代码结束 ============ @@ -646,6 +656,363 @@ func repairDatabaseByDump(dbPath string, password string, backupPath string) err } } +// ExecuteSQLCommand 执行SQL命令工具 +func ExecuteSQLCommand(currentDir string) { + if currentDir == "" { + currentDir = utils.GetCurrentDir() + } + + // 初始化审计日志 + auditLogger, auditLogPath := initSQLAuditLogger(currentDir) + if auditLogger != nil { + defer auditLogger.Close() + writeAuditLog(auditLogger, "INFO", "SQL执行工具启动", "") + fmt.Printf("📝 审计日志: %s\n", auditLogPath) + } + + fmt.Println("================================================") + fmt.Println(" SamWaf SQL 执行工具") + fmt.Println("================================================") + fmt.Println("\n可以在以下数据库上执行 SQL 语句:") + fmt.Println("1. 核心数据库 (local.db) - 存储配置、规则等") + fmt.Println("2. 日志数据库 (local_log.db) - 存储访问日志") + fmt.Println("3. 统计数据库 (local_stats.db) - 存储统计数据") + fmt.Println("\n⚠️ 警告:") + fmt.Println("- 执行前请确保已备份数据库") + fmt.Println("- UPDATE/DELETE 操作会直接修改数据,请谨慎使用") + fmt.Println("- 不当的 SQL 可能导致数据丢失或系统异常") + fmt.Println("- 所有操作将被记录到审计日志") + + fmt.Print("\n请选择数据库 (1-3),或输入 'q' 退出: ") + var input string + fmt.Scanln(&input) + + if input == "q" || input == "Q" { + fmt.Println("已退出 SQL 执行工具") + writeAuditLog(auditLogger, "INFO", "用户退出SQL执行工具", "") + return + } + + var db *gorm.DB + var dbName string + var dbPath string + var password string + + switch input { + case "1": + dbPath = currentDir + "/data/local.db" + dbName = "核心数据库 (local.db)" + password = global.GWAF_PWD_COREDB + case "2": + dbPath = currentDir + "/data/local_log.db" + dbName = "日志数据库 (local_log.db)" + password = global.GWAF_PWD_LOGDB + case "3": + dbPath = currentDir + "/data/local_stats.db" + dbName = "统计数据库 (local_stats.db)" + password = global.GWAF_PWD_STATDB + default: + fmt.Println("✗ 无效的选择") + writeAuditLog(auditLogger, "ERROR", "无效的数据库选择", fmt.Sprintf("输入: %s", input)) + return + } + + // 检查文件是否存在 + if _, err := os.Stat(dbPath); os.IsNotExist(err) { + fmt.Printf("✗ 数据库文件不存在: %s\n", dbPath) + writeAuditLog(auditLogger, "ERROR", "数据库文件不存在", dbPath) + return + } + + // 记录连接数据库 + writeAuditLog(auditLogger, "INFO", fmt.Sprintf("连接数据库: %s", dbName), dbPath) + + // 打开数据库 + fmt.Printf("\n正在连接到 %s...\n", dbName) + key := url.QueryEscape(password) + dns := fmt.Sprintf("%s?_db_key=%s", dbPath, key) + var err error + db, err = gorm.Open(sqlite.Open(dns), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Silent), + }) + if err != nil { + fmt.Printf("✗ 连接数据库失败: %v\n", err) + writeAuditLog(auditLogger, "ERROR", "连接数据库失败", fmt.Sprintf("数据库: %s, 错误: %v", dbName, err)) + return + } + + sqlDB, err := db.DB() + if err != nil { + fmt.Printf("✗ 获取数据库连接失败: %v\n", err) + writeAuditLog(auditLogger, "ERROR", "获取数据库连接失败", fmt.Sprintf("错误: %v", err)) + return + } + defer sqlDB.Close() + + fmt.Printf("✓ 已连接到 %s\n", dbName) + writeAuditLog(auditLogger, "INFO", "成功连接数据库", dbName) + + fmt.Println("\n================================================") + fmt.Println("SQL 执行模式") + fmt.Println("================================================") + fmt.Println("提示:") + fmt.Println("- 输入 SQL 语句并按回车执行") + fmt.Println("- 输入 'tables' 查看所有表") + fmt.Println("- 输入 'quit' 或 'exit' 退出") + fmt.Println("- 示例: SELECT * FROM account LIMIT 10") + fmt.Println("================================================") + + // 创建输入扫描器 + scanner := bufio.NewScanner(os.Stdin) + + // 交互式执行 SQL + for { + fmt.Print("SQL> ") + + if !scanner.Scan() { + break + } + + sqlInput := scanner.Text() + sqlInput = strings.TrimSpace(sqlInput) + + // 跳过空行 + if sqlInput == "" { + continue + } + + // 特殊命令处理 + switch strings.ToLower(sqlInput) { + case "quit", "exit", "q": + fmt.Println("\n已退出 SQL 执行工具") + writeAuditLog(auditLogger, "INFO", "用户退出SQL执行工具", "") + return + case "tables": + writeAuditLog(auditLogger, "INFO", "查看表列表", dbName) + listTables(db) + continue + case "help", "?": + fmt.Println("\n可用命令:") + fmt.Println(" tables - 显示所有表") + fmt.Println(" help/? - 显示此帮助") + fmt.Println(" quit/exit/q - 退出") + fmt.Println("\nSQL 语句示例:") + fmt.Println(" SELECT * FROM account LIMIT 10;") + fmt.Println(" UPDATE hosts SET status=1 WHERE code='xxx';") + fmt.Println(" DELETE FROM web_logs WHERE unix_add_time < 1234567890;") + fmt.Println("") + continue + } + + // 执行 SQL(带审计日志) + executeSingleSQLWithAudit(db, sqlInput, dbName, auditLogger) + fmt.Println("") + } + + if err := scanner.Err(); err != nil { + fmt.Printf("✗ 读取输入错误: %v\n", err) + writeAuditLog(auditLogger, "ERROR", "读取输入错误", fmt.Sprintf("错误: %v", err)) + } +} + +// initSQLAuditLogger 初始化SQL审计日志 +func initSQLAuditLogger(currentDir string) (*os.File, string) { + // 确保logs目录存在 + logsDir := filepath.Join(currentDir, "logs") + if err := os.MkdirAll(logsDir, os.ModePerm); err != nil { + fmt.Printf("⚠️ 警告: 无法创建logs目录: %v\n", err) + return nil, "" + } + + // 创建审计日志文件 + logPath := filepath.Join(logsDir, "db.log") + file, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + fmt.Printf("⚠️ 警告: 无法创建审计日志文件: %v\n", err) + return nil, "" + } + + return file, logPath +} + +// writeAuditLog 写入审计日志 +func writeAuditLog(logger *os.File, level, action, detail string) { + if logger == nil { + return + } + + timestamp := time.Now().Format("2006-01-02 15:04:05.000") + logLine := fmt.Sprintf("[%s] [%s] %s", timestamp, level, action) + if detail != "" { + logLine += fmt.Sprintf(" | %s", detail) + } + logLine += "\n" + + if _, err := logger.WriteString(logLine); err != nil { + fmt.Printf("⚠️ 警告: 写入审计日志失败: %v\n", err) + } +} + +// executeSingleSQLWithAudit 执行单条 SQL 语句(带审计日志) +func executeSingleSQLWithAudit(db *gorm.DB, sqlStr string, dbName string, auditLogger *os.File) { + sqlStr = strings.TrimSpace(sqlStr) + if sqlStr == "" { + return + } + + // 记录SQL执行 + writeAuditLog(auditLogger, "INFO", fmt.Sprintf("执行SQL [数据库: %s]", dbName), sqlStr) + + // 判断 SQL 类型 + sqlUpper := strings.ToUpper(sqlStr) + isQuery := strings.HasPrefix(sqlUpper, "SELECT") || + strings.HasPrefix(sqlUpper, "PRAGMA") || + strings.HasPrefix(sqlUpper, "SHOW") + + if isQuery { + // 查询语句 + executeQuerySQLWithAudit(db, sqlStr, dbName, auditLogger) + } else { + // 修改语句(UPDATE/DELETE/INSERT等) + executeModifySQLWithAudit(db, sqlStr, dbName, auditLogger) + } +} + +// listTables 列出所有表 +func listTables(db *gorm.DB) { + var tables []string + err := db.Raw("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name").Scan(&tables).Error + if err != nil { + fmt.Printf("✗ 查询表列表失败: %v\n", err) + return + } + + fmt.Println("\n数据库中的表:") + fmt.Println("----------------------------------------") + for i, table := range tables { + // 获取表的记录数 + var count int64 + db.Table(table).Count(&count) + fmt.Printf("%2d. %-30s (记录数: %d)\n", i+1, table, count) + } + fmt.Println("----------------------------------------") +} + +// executeQuerySQLWithAudit 执行查询语句(带审计) +func executeQuerySQLWithAudit(db *gorm.DB, sqlStr string, dbName string, auditLogger *os.File) { + rows, err := db.Raw(sqlStr).Rows() + if err != nil { + fmt.Printf("✗ 执行查询失败: %v\n", err) + writeAuditLog(auditLogger, "ERROR", fmt.Sprintf("查询执行失败 [数据库: %s]", dbName), fmt.Sprintf("错误: %v", err)) + return + } + defer rows.Close() + + // 获取列名 + columns, err := rows.Columns() + if err != nil { + fmt.Printf("✗ 获取列信息失败: %v\n", err) + writeAuditLog(auditLogger, "ERROR", fmt.Sprintf("获取列信息失败 [数据库: %s]", dbName), fmt.Sprintf("错误: %v", err)) + return + } + + fmt.Println("\n查询结果:") + fmt.Println("----------------------------------------") + + // 打印列名 + for i, col := range columns { + if i > 0 { + fmt.Print(" | ") + } + fmt.Printf("%-20s", col) + } + fmt.Println() + fmt.Println(strings.Repeat("-", len(columns)*23)) + + // 打印数据行 + rowCount := 0 + for rows.Next() { + // 创建接收数据的切片 + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + for i := range values { + valuePtrs[i] = &values[i] + } + + if err := rows.Scan(valuePtrs...); err != nil { + fmt.Printf("✗ 读取数据失败: %v\n", err) + writeAuditLog(auditLogger, "ERROR", fmt.Sprintf("读取数据失败 [数据库: %s]", dbName), fmt.Sprintf("错误: %v", err)) + return + } + + // 打印每一列的值 + for i, val := range values { + if i > 0 { + fmt.Print(" | ") + } + // 处理 nil 值 + if val == nil { + fmt.Printf("%-20s", "NULL") + } else { + // 将值转换为字符串 + strVal := fmt.Sprintf("%v", val) + if len(strVal) > 20 { + strVal = strVal[:17] + "..." + } + fmt.Printf("%-20s", strVal) + } + } + fmt.Println() + rowCount++ + + // 限制显示行数,避免输出过多 + if rowCount >= 100 { + fmt.Println("... (仅显示前100行)") + break + } + } + + fmt.Println("----------------------------------------") + fmt.Printf("✓ 查询完成,共 %d 行\n", rowCount) + + // 记录审计日志 + writeAuditLog(auditLogger, "SUCCESS", fmt.Sprintf("查询执行成功 [数据库: %s]", dbName), fmt.Sprintf("返回 %d 行", rowCount)) +} + +// executeModifySQLWithAudit 执行修改语句(带审计) +func executeModifySQLWithAudit(db *gorm.DB, sqlStr string, dbName string, auditLogger *os.File) { + // 二次确认 + fmt.Print("\n⚠️ 您即将执行修改操作,是否继续?(yes/no): ") + + scanner := bufio.NewScanner(os.Stdin) + if !scanner.Scan() { + fmt.Println("✗ 操作已取消") + writeAuditLog(auditLogger, "CANCEL", fmt.Sprintf("用户取消修改操作 [数据库: %s]", dbName), "读取确认失败") + return + } + + confirm := strings.TrimSpace(strings.ToLower(scanner.Text())) + + if confirm != "yes" && confirm != "y" { + fmt.Println("✗ 操作已取消") + writeAuditLog(auditLogger, "CANCEL", fmt.Sprintf("用户取消修改操作 [数据库: %s]", dbName), fmt.Sprintf("用户输入: %s", confirm)) + return + } + + // 记录用户确认执行 + writeAuditLog(auditLogger, "CONFIRM", fmt.Sprintf("用户确认执行修改 [数据库: %s]", dbName), "") + + result := db.Exec(sqlStr) + if result.Error != nil { + fmt.Printf("✗ 执行失败: %v\n", result.Error) + writeAuditLog(auditLogger, "ERROR", fmt.Sprintf("修改执行失败 [数据库: %s]", dbName), fmt.Sprintf("错误: %v", result.Error)) + return + } + + fmt.Printf("✓ 执行成功,影响 %d 行\n", result.RowsAffected) + writeAuditLog(auditLogger, "SUCCESS", fmt.Sprintf("修改执行成功 [数据库: %s]", dbName), fmt.Sprintf("影响 %d 行", result.RowsAffected)) +} + // RepairAllDatabases 修复所有数据库 func RepairAllDatabases(currentDir string) { if currentDir == "" { diff --git a/wafdb/migrations_core.go b/wafdb/migrations_core.go index dd4f2d0..6c8c50b 100644 --- a/wafdb/migrations_core.go +++ b/wafdb/migrations_core.go @@ -168,7 +168,9 @@ func RunCoreDBMigrations(db *gorm.DB) error { // 执行迁移 if err := m.Migrate(); err != nil { - return fmt.Errorf("core数据库迁移失败: %w", err) + errMsg := fmt.Sprintf("core数据库迁移失败: %v", err) + zlog.Error("迁移执行错误", "error", err.Error()) + return fmt.Errorf("%s", errMsg) } zlog.Info("core数据库迁移成功完成") @@ -218,6 +220,11 @@ func createCoreIndexes(tx *gorm.DB) error { zlog.Info("开始创建core索引...") startTime := time.Now() + // 先检查并清理 ip_tags 表中的重复数据(针对唯一索引) + if err := cleanupDuplicateIPTags(tx); err != nil { + zlog.Warn("清理重复数据时出现问题(非致命)", "error", err.Error()) + } + indexes := []struct { Name string SQL string @@ -233,10 +240,18 @@ func createCoreIndexes(tx *gorm.DB) error { } for _, idx := range indexes { + zlog.Info("开始创建索引", "index", idx.Name, "sql", idx.SQL) + indexStartTime := time.Now() + if err := tx.Exec(idx.SQL).Error; err != nil { - return fmt.Errorf("创建索引失败 %s: %w", idx.Name, err) + // 记录详细的错误信息 + errMsg := fmt.Sprintf("创建索引失败 %s: %v (错误类型: %T)", idx.Name, err, err) + zlog.Error("索引创建失败详情", "index", idx.Name, "error", err.Error(), "sql", idx.SQL) + return fmt.Errorf("%s", errMsg) } - zlog.Info("索引创建成功", "index", idx.Name) + + indexDuration := time.Since(indexStartTime) + zlog.Info("索引创建成功", "index", idx.Name, "耗时", indexDuration.String()) } duration := time.Since(startTime) @@ -244,6 +259,50 @@ func createCoreIndexes(tx *gorm.DB) error { return nil } +// cleanupDuplicateIPTags 清理 ip_tags 表中的重复数据 +func cleanupDuplicateIPTags(tx *gorm.DB) error { + zlog.Info("检查 ip_tags 表中的重复数据...") + + // 检查是否存在重复数据 + var duplicateCount int64 + err := tx.Raw(` + SELECT COUNT(*) FROM ( + SELECT user_code, tenant_id, ip, ip_tag, COUNT(*) as cnt + FROM ip_tags + GROUP BY user_code, tenant_id, ip, ip_tag + HAVING cnt > 1 + ) + `).Scan(&duplicateCount).Error + + if err != nil { + return fmt.Errorf("检查重复数据失败: %w", err) + } + + if duplicateCount == 0 { + zlog.Info("ip_tags 表无重复数据,可以安全创建唯一索引") + return nil + } + + zlog.Warn("发现重复数据,开始清理", "重复组数", duplicateCount) + + // 删除重复数据,保留 id 最小的记录 + result := tx.Exec(` + DELETE FROM ip_tags + WHERE id NOT IN ( + SELECT MIN(id) + FROM ip_tags + GROUP BY user_code, tenant_id, ip, ip_tag + ) + `) + + if result.Error != nil { + return fmt.Errorf("清理重复数据失败: %w", result.Error) + } + + zlog.Info("重复数据清理完成", "删除记录数", result.RowsAffected) + return nil +} + // dropCoreIndexes 删除所有core索引 func dropCoreIndexes(tx *gorm.DB) error { zlog.Info("开始删除core索引") diff --git a/wafdb/migrations_log.go b/wafdb/migrations_log.go index 2b11b9d..2478db2 100644 --- a/wafdb/migrations_log.go +++ b/wafdb/migrations_log.go @@ -91,7 +91,9 @@ func RunLogDBMigrations(db *gorm.DB) error { // 执行迁移 if err := m.Migrate(); err != nil { - return fmt.Errorf("log数据库迁移失败: %w", err) + errMsg := fmt.Sprintf("log数据库迁移失败: %v", err) + zlog.Error("迁移执行错误", "error", err.Error()) + return fmt.Errorf("%s", errMsg) } zlog.Info("log数据库迁移成功完成") @@ -180,10 +182,18 @@ func createLogIndexes(tx *gorm.DB) error { } for _, idx := range indexes { + zlog.Info("开始创建索引", "index", idx.Name, "sql", idx.SQL) + indexStartTime := time.Now() + if err := tx.Exec(idx.SQL).Error; err != nil { - return fmt.Errorf("创建索引失败 %s: %w", idx.Name, err) + // 记录详细的错误信息 + errMsg := fmt.Sprintf("创建索引失败 %s: %v (错误类型: %T)", idx.Name, err, err) + zlog.Error("索引创建失败详情", "index", idx.Name, "error", err.Error(), "sql", idx.SQL) + return fmt.Errorf("%s", errMsg) } - zlog.Info("索引创建成功", "index", idx.Name) + + indexDuration := time.Since(indexStartTime) + zlog.Info("索引创建成功", "index", idx.Name, "耗时", indexDuration.String()) } duration := time.Since(startTime) diff --git a/wafdb/migrations_stats.go b/wafdb/migrations_stats.go index 281dc45..2ce9ace 100644 --- a/wafdb/migrations_stats.go +++ b/wafdb/migrations_stats.go @@ -93,7 +93,9 @@ func RunStatsDBMigrations(db *gorm.DB) error { // 执行迁移 if err := m.Migrate(); err != nil { - return fmt.Errorf("stats数据库迁移失败: %w", err) + errMsg := fmt.Sprintf("stats数据库迁移失败: %v", err) + zlog.Error("迁移执行错误", "error", err.Error()) + return fmt.Errorf("%s", errMsg) } zlog.Info("stats数据库迁移成功完成") @@ -160,6 +162,11 @@ func createStatsIndexes(tx *gorm.DB) error { zlog.Info("开始创建stats索引(可能需要几分钟)...") startTime := time.Now() + // 先检查并清理 ip_tags 表中的重复数据(针对唯一索引) + if err := cleanupDuplicateIPTagsInStats(tx); err != nil { + zlog.Warn("清理重复数据时出现问题", "error", err.Error()) + } + indexes := []struct { Name string SQL string @@ -187,10 +194,18 @@ func createStatsIndexes(tx *gorm.DB) error { } for _, idx := range indexes { + zlog.Info("开始创建索引", "index", idx.Name, "sql", idx.SQL) + indexStartTime := time.Now() + if err := tx.Exec(idx.SQL).Error; err != nil { - return fmt.Errorf("创建索引失败 %s: %w", idx.Name, err) + // 记录详细的错误信息 + errMsg := fmt.Sprintf("创建索引失败 %s: %v (错误类型: %T)", idx.Name, err, err) + zlog.Error("索引创建失败详情", "index", idx.Name, "error", err.Error(), "sql", idx.SQL) + return fmt.Errorf("%s", errMsg) } - zlog.Info("索引创建成功", "index", idx.Name) + + indexDuration := time.Since(indexStartTime) + zlog.Info("索引创建成功", "index", idx.Name, "耗时", indexDuration.String()) } duration := time.Since(startTime) @@ -198,6 +213,50 @@ func createStatsIndexes(tx *gorm.DB) error { return nil } +// cleanupDuplicateIPTagsInStats 清理 ip_tags 表中的重复数据(stats数据库) +func cleanupDuplicateIPTagsInStats(tx *gorm.DB) error { + zlog.Info("检查 ip_tags 表中的重复数据...") + + // 检查是否存在重复数据 + var duplicateCount int64 + err := tx.Raw(` + SELECT COUNT(*) FROM ( + SELECT user_code, tenant_id, ip, ip_tag, COUNT(*) as cnt + FROM ip_tags + GROUP BY user_code, tenant_id, ip, ip_tag + HAVING cnt > 1 + ) + `).Scan(&duplicateCount).Error + + if err != nil { + return fmt.Errorf("检查重复数据失败: %w", err) + } + + if duplicateCount == 0 { + zlog.Info("ip_tags 表无重复数据,可以安全创建唯一索引") + return nil + } + + zlog.Warn("发现重复数据,开始清理", "重复组数", duplicateCount) + + // 删除重复数据,保留 id 最小的记录 + result := tx.Exec(` + DELETE FROM ip_tags + WHERE id NOT IN ( + SELECT MIN(id) + FROM ip_tags + GROUP BY user_code, tenant_id, ip, ip_tag + ) + `) + + if result.Error != nil { + return fmt.Errorf("清理重复数据失败: %w", result.Error) + } + + zlog.Info("重复数据清理完成", "删除记录数", result.RowsAffected) + return nil +} + // dropStatsIndexes 删除所有stats索引 func dropStatsIndexes(tx *gorm.DB) error { zlog.Info("开始删除stats索引")