fix(auto): 防止多进程环境下任务重复执行

- 在多个apps的ready方法中增加多进程保护机制,避免重复启动任务线程
- 统一通过检查环境变量RUN_MAIN和DJANGO_SETTINGS_MODULE判断是否主进程
- MangoServer/Dockerfile中添加RUN_MAIN环境变量确保Docker环境正确启动任务
- AutoSystem模块使用文件锁防止Docker环境下重复初始化
- RunTasks类增加多进程保护跳过非主进程定时任务初始化
- AutoUser模块日志调用改进,异常时记录到system日志
- 修正AutoSystem中数据库连接使用上下文管理器,保证连接重试机制生效
This commit is contained in:
毛鹏
2025-12-03 11:45:37 +08:00
parent 10e891aa0c
commit 426618dcaa
7 changed files with 192 additions and 38 deletions

View File

@@ -10,7 +10,9 @@ ARG DJANGO_ENV
ENV DJANGO_SETTINGS_MODULE=src.settings
ENV DJANGO_ENV=${DJANGO_ENV}
# 添加RUN_MAIN环境变量确保在Docker环境下也能正确启动定时任务
ENV RUN_MAIN=true
RUN echo "构建后端服务:$DJANGO_ENV"
# CMD ["sh", "-c", "daphne -b 0.0.0.0 -p 8000 src.asgi:application"]
CMD ["uvicorn", "src.asgi:application", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
CMD ["uvicorn", "src.asgi:application", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]

View File

@@ -2,9 +2,11 @@ from threading import Thread
import atexit
import time
import os
from django.apps import AppConfig
from src.auto_test.auto_api.service.test_case.case_flow import ApiCaseFlow
from src.tools.log_collector import log
class AutoApiConfig(AppConfig):
@@ -12,6 +14,10 @@ class AutoApiConfig(AppConfig):
name = 'src.auto_test.auto_api'
def ready(self):
# 多进程保护机制,防止在多进程环境下重复执行
if self._is_duplicate_process():
return
def run():
time.sleep(10)
self.test_case_consumption()
@@ -20,6 +26,22 @@ class AutoApiConfig(AppConfig):
task.start()
atexit.register(self.shutdown)
def _is_duplicate_process(self):
"""
检查是否为重复进程,防止在多进程环境下重复执行
"""
# 检查是否为重载进程
run_main = os.environ.get('RUN_MAIN', None)
if run_main != 'true':
return True
# 检查DJANGO环境变量
django_settings = os.environ.get('DJANGO_SETTINGS_MODULE')
if not django_settings:
return True
return False
def test_case_consumption(self):
self.case_flow = ApiCaseFlow()
self.api_task = Thread(target=self.case_flow.process_tasks)
@@ -28,4 +50,4 @@ class AutoApiConfig(AppConfig):
def shutdown(self):
self.case_flow.stop()
self.api_task.join()
self.api_task.join()

View File

@@ -1,6 +1,7 @@
from threading import Thread
import time
import os
from django.apps import AppConfig
from src.tools.log_collector import log
@@ -11,6 +12,10 @@ class AutoPytestConfig(AppConfig):
name = 'src.auto_test.auto_pytest'
def ready(self):
# 多进程保护机制,防止在多进程环境下重复执行
if self._is_duplicate_process():
return
def run():
time.sleep(10)
self.pull_code()
@@ -18,6 +23,22 @@ class AutoPytestConfig(AppConfig):
task = Thread(target=run)
task.start()
def _is_duplicate_process(self):
"""
检查是否为重复进程,防止在多进程环境下重复执行
"""
# 检查是否为重载进程
run_main = os.environ.get('RUN_MAIN', None)
if run_main != 'true':
return True
# 检查DJANGO环境变量
django_settings = os.environ.get('DJANGO_SETTINGS_MODULE')
if not django_settings:
return True
return False
# def test_case_consumption(self):
# self.case_flow = PytestCaseFlow()
# self.pytest_task = Thread(target=self.case_flow.process_tasks)
@@ -34,4 +55,4 @@ class AutoPytestConfig(AppConfig):
except Exception as e:
import traceback
log.pytest.error(f'异常提示:{e}, 首次启动项目,请启动完成之后再重启一次!')
log.pytest.info(f'如果您的项目已经配置了pytest等相关配置则关注下这个异常如果没有配置请忽略')
log.pytest.info(f'如果您的项目已经配置了pytest等相关配置则关注下这个异常如果没有配置请忽略')

View File

@@ -17,7 +17,7 @@ from mangotools.enums import CacheValueTypeEnum
from src.enums.system_enum import CacheDataKeyEnum
from src.enums.tools_enum import TaskEnum
from src.tools.decorator.retry import ensure_db_connection
from src.tools.decorator.retry import ensure_db_connection, db_connection_context
from src.tools.log_collector import log
@@ -26,6 +26,10 @@ class AutoSystemConfig(AppConfig):
name = 'src.auto_test.auto_system'
def ready(self):
# 多进程保护机制,防止在多进程环境下重复执行
if self._is_duplicate_process():
return
def run():
time.sleep(10)
self.delayed_task()
@@ -35,11 +39,53 @@ class AutoSystemConfig(AppConfig):
self.init_ass()
self.start_consumer()
if os.environ.get('RUN_MAIN', None) == 'true':
task1 = threading.Thread(target=run)
task1.start()
# 启动后台任务
task1 = threading.Thread(target=run)
task1.start()
atexit.register(self.shutdown)
def _is_duplicate_process(self):
"""
检查是否为重复进程,防止在多进程环境下重复执行
"""
# 获取当前进程ID
pid = os.getpid()
# 检查是否为重载进程
run_main = os.environ.get('RUN_MAIN', None)
if run_main != 'true':
log.system.debug(f"跳过重复进程初始化 - PID: {pid}, RUN_MAIN: {run_main}")
return True
# 检查DJANGO环境变量
django_settings = os.environ.get('DJANGO_SETTINGS_MODULE')
if not django_settings:
log.system.debug(f"跳过重复进程初始化 - PID: {pid}, DJANGO_SETTINGS_MODULE未设置")
return True
# 在Docker环境下使用文件锁机制防止重复执行
lock_file = f"/tmp/mango_system_init_{os.getppid()}.lock"
try:
# 尝试创建锁文件
fd = os.open(lock_file, os.O_CREAT | os.O_EXCL)
os.close(fd)
# 注册退出时清理锁文件
atexit.register(lambda: os.path.exists(lock_file) and os.remove(lock_file))
log.system.debug(f"主进程初始化 - PID: {pid}")
return False
except FileExistsError:
log.system.debug(f"跳过重复进程初始化 - PID: {pid}, 锁文件已存在")
return True
except Exception as e:
# 如果无法创建锁文件(如权限问题),使用备用方法
log.system.debug(f"锁文件检查异常 - PID: {pid}, 错误: {e}")
# 检查父进程ID避免在子进程中重复执行
ppid = os.getppid()
if hasattr(self, '_initialized_ppid') and self._initialized_ppid == ppid:
return True
self._initialized_ppid = ppid
return False
@staticmethod
def delayed_task():
try:
@@ -136,24 +182,25 @@ class AutoSystemConfig(AppConfig):
@ensure_db_connection(max_retries=1)
def set_case_status(self):
from src.auto_test.auto_ui.models import UiCase, UiCaseStepsDetailed, PageSteps
from src.auto_test.auto_pytest.models import PytestCase
from src.auto_test.auto_api.models import ApiInfo, ApiCase, ApiCaseDetailed
ten_minutes_ago = timezone.now() - timedelta(minutes=10)
models_to_update = [
UiCase,
UiCaseStepsDetailed,
PageSteps,
PytestCase,
ApiInfo,
ApiCase,
ApiCaseDetailed
]
for model in models_to_update:
model.objects.filter(
status=TaskEnum.PROCEED.value,
update_time__lt=ten_minutes_ago
).update(status=TaskEnum.FAIL.value)
with db_connection_context():
from src.auto_test.auto_ui.models import UiCase, UiCaseStepsDetailed, PageSteps
from src.auto_test.auto_pytest.models import PytestCase
from src.auto_test.auto_api.models import ApiInfo, ApiCase, ApiCaseDetailed
ten_minutes_ago = timezone.now() - timedelta(minutes=10)
models_to_update = [
UiCase,
UiCaseStepsDetailed,
PageSteps,
PytestCase,
ApiInfo,
ApiCase,
ApiCaseDetailed
]
for model in models_to_update:
model.objects.filter(
status=TaskEnum.PROCEED.value,
update_time__lt=ten_minutes_ago
).update(status=TaskEnum.FAIL.value)

View File

@@ -4,6 +4,7 @@
# @Time : 2023/3/24 17:33
# @Author : 毛鹏
import atexit
import os
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
@@ -19,6 +20,11 @@ class RunTasks:
@classmethod
def create_jobs(cls):
# 多进程保护机制,防止在多进程环境下重复执行
if cls._is_duplicate_process():
log.system.debug("不在主进程中,跳过定时任务初始化")
return
queryset = TimeTasks.objects.all()
for timer in queryset:
if timer.cron:
@@ -31,6 +37,23 @@ class RunTasks:
cls.scheduler.start()
atexit.register(cls.scheduler.shutdown)
@classmethod
def _is_duplicate_process(cls):
"""
检查是否为重复进程,防止在多进程环境下重复执行
"""
# 检查是否为重载进程
run_main = os.environ.get('RUN_MAIN', None)
if run_main != 'true':
return True
# 检查DJANGO环境变量
django_settings = os.environ.get('DJANGO_SETTINGS_MODULE')
if not django_settings:
return True
return False
@classmethod
@orm_retry('timing')
def timing(cls, timing_strategy_id):
@@ -63,4 +86,4 @@ class RunTasks:
elif task.type == TestCaseTypeEnum.UI.value:
add_tasks.add_test_suite_details(task.ui_case.id, TestCaseTypeEnum.UI)
else:
add_tasks.add_test_suite_details(task.pytest_case.id, TestCaseTypeEnum.PYTEST)
add_tasks.add_test_suite_details(task.pytest_case.id, TestCaseTypeEnum.PYTEST)

View File

@@ -1,5 +1,5 @@
from threading import Thread
import os
from django.apps import AppConfig
@@ -8,8 +8,28 @@ class AutoUiConfig(AppConfig):
name = 'src.auto_test.auto_ui'
def ready(self):
# 多进程保护机制,防止在多进程环境下重复执行
if self._is_duplicate_process():
return
def run():
pass
task = Thread(target=run)
task.start()
def _is_duplicate_process(self):
"""
检查是否为重复进程,防止在多进程环境下重复执行
"""
# 检查是否为重载进程
run_main = os.environ.get('RUN_MAIN', None)
if run_main != 'true':
return True
# 检查DJANGO环境变量
django_settings = os.environ.get('DJANGO_SETTINGS_MODULE')
if not django_settings:
return True
return False

View File

@@ -18,6 +18,10 @@ class AutoUserConfig(AppConfig):
name = 'src.auto_test.auto_user'
def ready(self):
# 多进程保护机制,防止在多进程环境下重复执行
if self._is_duplicate_process():
return
if os.getenv('DJANGO_ENV', 'master') == 'master':
self.check_version()
@@ -26,9 +30,24 @@ class AutoUserConfig(AppConfig):
self.new_role()
self.new_user()
if os.environ.get('RUN_MAIN', None) == 'true':
task1 = threading.Thread(target=run)
task1.start()
task1 = threading.Thread(target=run)
task1.start()
def _is_duplicate_process(self):
"""
检查是否为重复进程,防止在多进程环境下重复执行
"""
# 检查是否为重载进程
run_main = os.environ.get('RUN_MAIN', None)
if run_main != 'true':
return True
# 检查DJANGO环境变量
django_settings = os.environ.get('DJANGO_SETTINGS_MODULE')
if not django_settings:
return True
return False
def new_role(self):
try:
@@ -45,8 +64,8 @@ class AutoUserConfig(AppConfig):
Role.objects.create(name="测试开发工程师", description="测试开发工程师")
Role.objects.create(name="自动化工程师", description="自动化工程师")
Role.objects.create(name="测试工程师", description="测试工程师")
except Exception as e:
log.user.error(f'异常提示:{e}, 首次启动项目,请启动完成之后再重启一次!')
except Exception as error:
log.system.error(f'角色创建失败: {error}')
def new_user(self):
try:
@@ -61,8 +80,8 @@ class AutoUserConfig(AppConfig):
'config': {}
}
)
except Exception as e:
log.user.error(f'异常提示:{e}, 首次启动项目,请启动完成之后再重启一次!')
except Exception as error:
log.system.error(f'用户创建失败: {error}')
def check_version(self):
import re
@@ -72,4 +91,4 @@ class AutoUserConfig(AppConfig):
match = re.search(r'VERSION\s*=\s*([\d.]+)', text)
if not (match and match.group(1) == VERSION):
raise Exception(
f'当前版本与最新不一致请执行git pull 升级到最新版本!最新版本:{match.group(1)},当前版本:{VERSION}')
f'当前版本与最新不一致请执行git pull 升级到最新版本!最新版本:{match.group(1)},当前版本:{VERSION}')