mirror of
https://gitee.com/mao-peng/MangoTestingPlatform.git
synced 2025-12-06 20:04:44 +08:00
Compare commits
9 Commits
22bce7e27f
...
eb57b31100
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eb57b31100 | ||
|
|
f81991b7d5 | ||
|
|
5837140c5a | ||
|
|
2c87901e60 | ||
|
|
d9a64520f7 | ||
|
|
22991acfe5 | ||
|
|
18db8f013f | ||
|
|
ba2fdd1b1f | ||
|
|
3f80288052 |
Binary file not shown.
@@ -9,7 +9,7 @@ from queue import Queue
|
||||
import time
|
||||
|
||||
from src.models.system_model import ConsumerCaseModel
|
||||
from src.tools.decorator.retry import ensure_db_connection
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
|
||||
|
||||
class ApiCaseFlow:
|
||||
@@ -24,7 +24,7 @@ class ApiCaseFlow:
|
||||
cls.running = False
|
||||
|
||||
@classmethod
|
||||
@ensure_db_connection(True)
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def process_tasks(cls):
|
||||
while cls.running:
|
||||
if not cls.queue.empty():
|
||||
@@ -33,7 +33,7 @@ class ApiCaseFlow:
|
||||
time.sleep(0.1)
|
||||
|
||||
@classmethod
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def execute_task(cls, case_model: ConsumerCaseModel):
|
||||
from src.auto_test.auto_api.service.test_case.test_case import TestCase
|
||||
test_case = TestCase(
|
||||
|
||||
@@ -17,6 +17,7 @@ from src.enums.tools_enum import StatusEnum, TaskEnum, TestCaseTypeEnum
|
||||
from src.exceptions import *
|
||||
from src.models.api_model import RequestModel, ApiCaseResultModel, ApiCaseStepsResultModel, ResponseModel
|
||||
from src.models.system_model import TestSuiteDetailsResultModel
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
from ..base.case_base import CaseBase
|
||||
from ..base.case_parameter import CaseParameter
|
||||
|
||||
@@ -207,12 +208,14 @@ class TestCase:
|
||||
model.save()
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def update_test_case(cls, case_id: int, status: int):
|
||||
model = ApiCase.objects.get(id=case_id)
|
||||
model.status = status
|
||||
model.save()
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def update_test_case_detailed_parameter(cls, parameter_id, result_data: ApiCaseStepsResultModel):
|
||||
model = ApiCaseDetailedParameter.objects.get(id=parameter_id)
|
||||
model.status = result_data.status
|
||||
|
||||
@@ -39,15 +39,6 @@ class AutoPytestConfig(AppConfig):
|
||||
|
||||
return False
|
||||
|
||||
# def test_case_consumption(self):
|
||||
# self.case_flow = PytestCaseFlow()
|
||||
# self.pytest_task = Thread(target=self.case_flow.process_tasks)
|
||||
# self.pytest_task.daemon = True
|
||||
# self.pytest_task.start()
|
||||
# def shutdown(self):
|
||||
# self.case_flow.stop()
|
||||
# self.pytest_task.join()
|
||||
|
||||
def pull_code(self):
|
||||
from src.auto_test.auto_pytest.service.base import git_obj
|
||||
try:
|
||||
|
||||
@@ -14,7 +14,7 @@ from src.auto_test.auto_system.models import TestSuite, TestSuiteDetails
|
||||
from src.enums.tools_enum import TaskEnum, TestCaseTypeEnum
|
||||
from src.exceptions import MangoServerError
|
||||
from src.models.system_model import ConsumerCaseModel
|
||||
from src.tools.decorator.retry import ensure_db_connection
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
from src.tools.log_collector import log
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ class PyCaseFlow:
|
||||
cls.running = False
|
||||
|
||||
@classmethod
|
||||
@ensure_db_connection(True)
|
||||
@async_task_db_connection(infinite_retry=True)
|
||||
def process_tasks(cls):
|
||||
while cls.running:
|
||||
if not cls.queue.empty():
|
||||
@@ -40,7 +40,7 @@ class PyCaseFlow:
|
||||
time.sleep(0.1)
|
||||
|
||||
@classmethod
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection()
|
||||
def execute_task(cls, case_model: ConsumerCaseModel):
|
||||
from src.auto_test.auto_pytest.service.test_case.test_case import TestCase
|
||||
test_case = TestCase(
|
||||
@@ -55,6 +55,7 @@ class PyCaseFlow:
|
||||
cls.queue.put(case_model)
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def get_case(cls, data):
|
||||
with cls._get_case_lock:
|
||||
test_suite_details = TestSuiteDetails.objects.filter(
|
||||
|
||||
@@ -9,16 +9,15 @@ from django.db import connection
|
||||
from src.auto_test.auto_pytest.models import PytestCase
|
||||
from src.exceptions import *
|
||||
from src.models.pytest_model import PytestCaseResultModel
|
||||
from src.tools.decorator.retry import orm_retry
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
|
||||
|
||||
class PtestTestReportWriting:
|
||||
|
||||
@classmethod
|
||||
@orm_retry('update_pytest_test_case')
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def update_pytest_test_case(cls, data: PytestCaseResultModel):
|
||||
log.ui.debug(f'开始写入Pytest用例<{data.name}>测试结果,用例的测试状态是:{data.status}')
|
||||
connection.ensure_connection()
|
||||
case = PytestCase.objects.get(id=data.id)
|
||||
case.status = data.status
|
||||
case.result_data = data.result_data
|
||||
|
||||
@@ -11,13 +11,13 @@ import atexit
|
||||
import time
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from django.apps import AppConfig
|
||||
from django.db import close_old_connections
|
||||
from django.utils import timezone
|
||||
from mangotools.decorator import func_info
|
||||
from mangotools.enums import CacheValueTypeEnum
|
||||
|
||||
from src.enums.system_enum import CacheDataKeyEnum
|
||||
from src.enums.tools_enum import TaskEnum
|
||||
from src.tools.decorator.retry import ensure_db_connection, db_connection_context
|
||||
from src.tools.log_collector import log
|
||||
|
||||
|
||||
@@ -37,7 +37,9 @@ class AutoSystemConfig(AppConfig):
|
||||
self.populate_time_tasks()
|
||||
self.run_tests()
|
||||
self.init_ass()
|
||||
self.start_consumer()
|
||||
|
||||
# 设置定时任务调度器
|
||||
self.setup_scheduler()
|
||||
|
||||
# 启动后台任务
|
||||
task1 = threading.Thread(target=run)
|
||||
@@ -151,6 +153,8 @@ class AutoSystemConfig(AppConfig):
|
||||
self.system_task.join()
|
||||
except AttributeError:
|
||||
pass
|
||||
# 停止全局调度器
|
||||
self.stop_scheduler()
|
||||
|
||||
def init_ass(self):
|
||||
try:
|
||||
@@ -180,14 +184,44 @@ class AutoSystemConfig(AppConfig):
|
||||
except Exception as e:
|
||||
log.system.error(f'异常提示:{e}, 首次启动项目,请启动完成之后再重启一次!')
|
||||
|
||||
def start_consumer(self):
|
||||
scheduler = BackgroundScheduler()
|
||||
scheduler.add_job(self.set_case_status, 'interval', minutes=5)
|
||||
scheduler.start()
|
||||
def setup_scheduler(self):
|
||||
"""设置定时任务调度器"""
|
||||
try:
|
||||
# 创建调度器实例
|
||||
self.scheduler = BackgroundScheduler()
|
||||
|
||||
# 添加定时任务
|
||||
self.scheduler.add_job(
|
||||
self.set_case_status,
|
||||
'interval',
|
||||
minutes=5,
|
||||
id='set_case_status'
|
||||
)
|
||||
|
||||
# 启动调度器
|
||||
self.scheduler.start()
|
||||
|
||||
# 注册退出时停止调度器
|
||||
atexit.register(self.stop_scheduler)
|
||||
except Exception as e:
|
||||
log.system.error(f'定时任务调度器设置异常: {e}')
|
||||
|
||||
def stop_scheduler(self):
|
||||
"""停止调度器"""
|
||||
try:
|
||||
if hasattr(self, 'scheduler') and self.scheduler:
|
||||
self.scheduler.shutdown()
|
||||
except Exception as e:
|
||||
log.system.error(f'停止调度器异常: {e}')
|
||||
|
||||
|
||||
@ensure_db_connection(max_retries=1)
|
||||
def set_case_status(self):
|
||||
with db_connection_context():
|
||||
from django.db import transaction
|
||||
|
||||
try:
|
||||
# 确保开始时连接是干净的
|
||||
close_old_connections()
|
||||
|
||||
from src.auto_test.auto_ui.models import UiCase, UiCaseStepsDetailed, PageSteps
|
||||
from src.auto_test.auto_pytest.models import PytestCase
|
||||
from src.auto_test.auto_api.models import ApiInfo, ApiCase, ApiCaseDetailed
|
||||
@@ -208,4 +242,10 @@ class AutoSystemConfig(AppConfig):
|
||||
model.objects.filter(
|
||||
status=TaskEnum.PROCEED.value,
|
||||
update_time__lt=ten_minutes_ago
|
||||
).update(status=TaskEnum.FAIL.value)
|
||||
).update(status=TaskEnum.FAIL.value)
|
||||
|
||||
# 确保事务提交
|
||||
transaction.commit()
|
||||
finally:
|
||||
# 确保结束时连接被关闭
|
||||
close_old_connections()
|
||||
@@ -18,6 +18,7 @@ from src.enums.system_enum import SocketEnum, ClientTypeEnum, ClientNameEnum
|
||||
from src.exceptions import *
|
||||
from src.models.socket_model import SocketDataModel, QueueModel
|
||||
from src.settings import IS_DEBUG_LOG
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
@@ -161,6 +162,7 @@ class ChatConsumer(WebsocketConsumer):
|
||||
log.system.debug(f"发送的数据:{data_json}")
|
||||
return data_json
|
||||
|
||||
@async_task_db_connection(max_retries=1, retry_delay=1)
|
||||
def verify_user(self) -> tuple[bool, int]:
|
||||
if 'username' not in self.scope.get("query_string").decode():
|
||||
log.system.debug('您的执行器代码是旧的,请使用新的执行器再来进行连接!')
|
||||
|
||||
@@ -15,7 +15,7 @@ from src.auto_test.auto_system.models import TestSuiteDetails, TestSuite, Tasks
|
||||
from src.enums.tools_enum import TaskEnum, TestCaseTypeEnum
|
||||
from src.exceptions import MangoServerError
|
||||
from src.models.system_model import ConsumerCaseModel
|
||||
from src.tools.decorator.retry import ensure_db_connection, db_connection_context
|
||||
from src.tools.decorator.retry import db_connection_context, async_task_db_connection
|
||||
from src.tools.log_collector import log
|
||||
from django.db.utils import Error, InterfaceError, OperationalError
|
||||
|
||||
@@ -31,7 +31,7 @@ class ConsumerThread:
|
||||
def stop(self):
|
||||
self.running = False
|
||||
|
||||
@ensure_db_connection(True, max_retries=100)
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def consumer(self):
|
||||
reset_tims = time.time()
|
||||
while self.running:
|
||||
@@ -92,7 +92,7 @@ class ConsumerThread:
|
||||
else:
|
||||
self.send_case(test_suite, test_suite_details, case_model, retry, max_retry)
|
||||
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def clean_test_suite_status(self):
|
||||
# 使用数据库连接上下文管理器确保连接被正确释放
|
||||
with db_connection_context():
|
||||
@@ -108,7 +108,7 @@ class ConsumerThread:
|
||||
i.status = TaskEnum.SUCCESS.value
|
||||
i.save()
|
||||
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def clean_proceed(self):
|
||||
"""
|
||||
把进行中的,修改为待开始,或者失败
|
||||
@@ -126,7 +126,7 @@ class ConsumerThread:
|
||||
log.system.info(
|
||||
f'推送时间超过{self.reset_time}分钟,状态重置为:待执行,用例ID:{test_suite_detail.case_id}')
|
||||
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def clean_proceed_set_fail(self):
|
||||
"""
|
||||
把重试次数满的,修改为0,只有未知错误才会设置为失败
|
||||
@@ -144,7 +144,7 @@ class ConsumerThread:
|
||||
log.system.info(
|
||||
f'重试次数超过{self.retry_frequency + 1}次的任务状态重置为:失败,用例ID:{test_suite_detail.case_id}')
|
||||
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def update_status_proceed(self, test_suite, test_suite_details):
|
||||
# 使用数据库连接上下文管理器确保连接被正确释放
|
||||
with db_connection_context():
|
||||
@@ -156,7 +156,7 @@ class ConsumerThread:
|
||||
test_suite_details.push_time = timezone.now()
|
||||
test_suite_details.save()
|
||||
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def consumer_error(self, test_suite, test_suite_details, error):
|
||||
# 使用数据库连接上下文管理器确保连接被正确释放
|
||||
with db_connection_context():
|
||||
|
||||
@@ -17,11 +17,13 @@ from src.enums.system_enum import CacheDataKeyEnum
|
||||
from src.enums.tools_enum import StatusEnum, EnvironmentEnum, TestCaseTypeEnum
|
||||
from src.exceptions import *
|
||||
from src.tools.log_collector import log
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
|
||||
|
||||
class NoticeMain:
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def notice_main(cls, test_env: int, project_product: int, test_suite_id: int):
|
||||
test_object = TestObject.objects.get(environment=test_env, project_product=project_product)
|
||||
notice_obj = NoticeConfig.objects.filter(test_object=test_object.id, status=StatusEnum.SUCCESS.value)
|
||||
@@ -38,6 +40,7 @@ class NoticeMain:
|
||||
raise ToolsError(error.code, error.msg)
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def test_notice_send(cls, _id):
|
||||
notice_obj = NoticeConfig.objects.get(id=_id)
|
||||
test_report = TestReportModel(**{
|
||||
@@ -71,6 +74,7 @@ class NoticeMain:
|
||||
log.system.error('暂不支持钉钉打卡')
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def __we_chat_send(cls, i, test_report: TestReportModel | None = None):
|
||||
try:
|
||||
wechat = WeChatSend(WeChatNoticeModel(webhook=i.config), test_report, cls.get_domain_name())
|
||||
@@ -79,6 +83,7 @@ class NoticeMain:
|
||||
raise MangoServerError(error.code, error.msg)
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def __wend_mail_send(cls, i, test_report: TestReportModel | None = None):
|
||||
try:
|
||||
user_info = User.objects.filter(name__in=json.loads(i.config))
|
||||
@@ -103,6 +108,7 @@ class NoticeMain:
|
||||
email.send_main()
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def test_report(cls, test_suite_id: int) -> TestReportModel:
|
||||
test_suite = TestSuite.objects.get(id=test_suite_id)
|
||||
execution_duration = test_suite.update_time - test_suite.create_time
|
||||
@@ -170,6 +176,7 @@ class NoticeMain:
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def mail_config():
|
||||
try:
|
||||
send_user = CacheData.objects.get(key=CacheDataKeyEnum.SYSTEM_SEND_USER.name).value
|
||||
@@ -183,6 +190,7 @@ class NoticeMain:
|
||||
return send_user, email_host, stamp_key
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def get_domain_name(cls):
|
||||
domain_name = f'请先到系统管理->系统设置中设置:{CacheDataKeyEnum.SYSTEM_DOMAIN_NAME.value},此处才会显示跳转连接'
|
||||
try:
|
||||
|
||||
@@ -23,7 +23,7 @@ from .ui import UIConsumer
|
||||
class ServerInterfaceReflection(APIConsumer, SystemConsumer, UIConsumer, PerfConsumer, PytestConsumer):
|
||||
|
||||
def __init__(self):
|
||||
self.executor = concurrent.futures.ThreadPoolExecutor(10)
|
||||
self.executor = concurrent.futures.ThreadPoolExecutor(5)
|
||||
self.server_data_received = Signal()
|
||||
self.server_data_received.connect(self.data_received_handler)
|
||||
|
||||
@@ -39,8 +39,7 @@ class ServerInterfaceReflection(APIConsumer, SystemConsumer, UIConsumer, PerfCon
|
||||
|
||||
def handle_task_result(self, future):
|
||||
try:
|
||||
result = future.result() # 获取任务执行结果
|
||||
# 处理任务执行结果
|
||||
result = future.result()
|
||||
except Exception as e:
|
||||
traceback.print_exc() # 打印异常追踪信息
|
||||
traceback.print_exc()
|
||||
log.system.error(f"任务执行出现异常:{e}")
|
||||
|
||||
@@ -6,12 +6,12 @@
|
||||
|
||||
from src.auto_test.auto_api.service.api_import.recording import Recording
|
||||
from src.models.api_model import RecordingApiModel
|
||||
from src.tools.decorator.retry import ensure_db_connection
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
|
||||
|
||||
class APIConsumer:
|
||||
|
||||
@classmethod
|
||||
@ensure_db_connection(max_retries=1)
|
||||
@async_task_db_connection(max_retries=1)
|
||||
def a_recording_api(cls, data: dict):
|
||||
Recording.write(RecordingApiModel(**data))
|
||||
|
||||
@@ -8,17 +8,17 @@ from src.auto_test.auto_system.service.update_test_suite import UpdateTestSuite
|
||||
from src.models.pytest_model import PytestCaseResultModel
|
||||
|
||||
from src.models.system_model import TestSuiteDetailsResultModel
|
||||
from src.tools.decorator.retry import ensure_db_connection
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
|
||||
|
||||
class PytestConsumer:
|
||||
|
||||
@classmethod
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection()
|
||||
def p_test_suite_details(cls, data: dict):
|
||||
UpdateTestSuite.update_test_suite_details(TestSuiteDetailsResultModel(**data))
|
||||
|
||||
@classmethod
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection()
|
||||
def p_test_case(cls, data: dict):
|
||||
PtestTestReportWriting.update_pytest_test_case(PytestCaseResultModel(**data))
|
||||
|
||||
@@ -14,13 +14,13 @@ from src.enums.system_enum import CacheDataKey2Enum, ClientTypeEnum
|
||||
from src.enums.tools_enum import TestCaseTypeEnum
|
||||
from src.models.socket_model import SocketDataModel
|
||||
from src.models.system_model import GetTaskModel
|
||||
from src.tools.decorator.retry import ensure_db_connection
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
|
||||
|
||||
class SystemConsumer:
|
||||
|
||||
@classmethod
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection()
|
||||
def t_set_operation_options(cls, data: dict):
|
||||
data = {
|
||||
'describe': data.get('version'),
|
||||
@@ -41,7 +41,7 @@ class SystemConsumer:
|
||||
CacheDataCRUD.inside_post(data)
|
||||
|
||||
@classmethod
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection()
|
||||
def t_set_userinfo(cls, data):
|
||||
from src.auto_test.auto_system.service.socket_link.socket_user import SocketUser
|
||||
from src.auto_test.auto_system.consumers import ChatConsumer
|
||||
@@ -54,7 +54,7 @@ class SystemConsumer:
|
||||
))
|
||||
|
||||
@classmethod
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection()
|
||||
def t_get_task(cls, data: dict):
|
||||
data = GetTaskModel(**data)
|
||||
if data.type == TestCaseTypeEnum.UI:
|
||||
|
||||
@@ -9,22 +9,22 @@ from src.auto_test.auto_ui.service.test_report_writing import TestReportWriting
|
||||
|
||||
from src.models.system_model import TestSuiteDetailsResultModel
|
||||
from src.models.ui_model import PageStepsResultModel, UiCaseResultModel
|
||||
from src.tools.decorator.retry import ensure_db_connection
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
|
||||
|
||||
class UIConsumer:
|
||||
|
||||
@classmethod
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection()
|
||||
def u_page_steps(cls, data: dict):
|
||||
TestReportWriting.update_page_step_status(PageStepsResultModel(**data))
|
||||
|
||||
@classmethod
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection()
|
||||
def u_test_suite_details(cls, data: dict):
|
||||
UpdateTestSuite.update_test_suite_details(TestSuiteDetailsResultModel(**data))
|
||||
|
||||
@classmethod
|
||||
@ensure_db_connection()
|
||||
@async_task_db_connection()
|
||||
def u_test_case(cls, data: dict):
|
||||
TestReportWriting.update_test_case(UiCaseResultModel(**data))
|
||||
|
||||
@@ -11,7 +11,7 @@ from apscheduler.triggers.cron import CronTrigger
|
||||
from src.auto_test.auto_system.models import Tasks, TasksDetails, TimeTasks
|
||||
from src.auto_test.auto_system.service.tasks.add_tasks import AddTasks
|
||||
from src.enums.tools_enum import StatusEnum, TestCaseTypeEnum
|
||||
from src.tools.decorator.retry import orm_retry
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
from src.tools.log_collector import log
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ class RunTasks:
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
@orm_retry('timing')
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def timing(cls, timing_strategy_id):
|
||||
scheduled_tasks_obj = Tasks.objects.filter(timing_strategy=timing_strategy_id,
|
||||
status=StatusEnum.SUCCESS.value)
|
||||
@@ -64,7 +64,7 @@ class RunTasks:
|
||||
cls.distribute(scheduled_tasks)
|
||||
|
||||
@classmethod
|
||||
@orm_retry('trigger')
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def trigger(cls, scheduled_tasks_id):
|
||||
scheduled_tasks = Tasks.objects.get(id=scheduled_tasks_id)
|
||||
cls.distribute(scheduled_tasks)
|
||||
|
||||
@@ -9,11 +9,13 @@ from src.auto_test.auto_system.models import TestSuiteDetails
|
||||
from src.enums.tools_enum import StatusEnum
|
||||
from src.enums.tools_enum import TestCaseTypeEnum
|
||||
from src.models.system_model import CaseCounterModel
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
|
||||
|
||||
class TestCounter:
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def res_main(cls, _id):
|
||||
model = TestSuiteDetails.objects.get(id=_id)
|
||||
if model.type == TestCaseTypeEnum.API.value:
|
||||
|
||||
@@ -14,17 +14,20 @@ from src.enums.tools_enum import TaskEnum, StatusEnum, TestCaseTypeEnum
|
||||
from src.models.socket_model import SocketDataModel
|
||||
from src.models.system_model import TestSuiteDetailsResultModel
|
||||
from src.tools.log_collector import log
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
|
||||
|
||||
class UpdateTestSuite:
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def update_test_suite(cls, test_suite_id: int, status: int):
|
||||
test_suite = TestSuite.objects.get(id=test_suite_id)
|
||||
test_suite.status = status
|
||||
test_suite.save()
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def update_test_suite_details(cls, data: TestSuiteDetailsResultModel):
|
||||
log.system.debug(f'开始更新测试套数据:{data.model_dump_json()}')
|
||||
test_suite_detail = TestSuiteDetails.objects.get(id=data.id)
|
||||
@@ -55,6 +58,7 @@ class UpdateTestSuite:
|
||||
cls.send_test_result(data.test_suite, data.error_message)
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=3)
|
||||
def send_test_result(cls, test_suite_id: int, msg: str):
|
||||
test_suite = TestSuite.objects.get(id=test_suite_id)
|
||||
if test_suite.is_notice == StatusEnum.SUCCESS.value:
|
||||
|
||||
@@ -15,6 +15,7 @@ from src.enums.tools_enum import TaskEnum, TestCaseTypeEnum
|
||||
from src.exceptions import MangoServerError
|
||||
from src.models.system_model import ConsumerCaseModel, GetTaskModel
|
||||
from src.tools.log_collector import log
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
|
||||
|
||||
class UiCaseFlow:
|
||||
@@ -43,6 +44,7 @@ class UiCaseFlow:
|
||||
cls.execute_task(case_model)
|
||||
|
||||
@classmethod
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def get_case(cls, data: GetTaskModel):
|
||||
model = User.objects.get(username=data.username)
|
||||
with cls._get_case_lock:
|
||||
|
||||
@@ -9,13 +9,13 @@ from django.db import connection
|
||||
from src.auto_test.auto_ui.models import PageSteps, UiCase, UiCaseStepsDetailed
|
||||
from src.exceptions import *
|
||||
from src.models.ui_model import UiCaseResultModel, PageStepsResultModel
|
||||
from src.tools.decorator.retry import orm_retry
|
||||
from src.tools.decorator.retry import async_task_db_connection
|
||||
|
||||
|
||||
class TestReportWriting:
|
||||
|
||||
@classmethod
|
||||
@orm_retry('update_page_step_status')
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def update_page_step_status(cls, data: PageStepsResultModel) -> None:
|
||||
try:
|
||||
log.ui.debug(f'开始写入步骤测试结果,步骤数据是:{data.model_dump_json()}')
|
||||
@@ -28,10 +28,9 @@ class TestReportWriting:
|
||||
'忽略这个报错,如果是在步骤详情中没有查到,可以忽略这个错误,步骤详情中的调试不会修改整个步骤状态')
|
||||
|
||||
@classmethod
|
||||
@orm_retry('update_test_case')
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def update_test_case(cls, data: UiCaseResultModel):
|
||||
log.ui.debug(f'开始写入用例测试结果,用例的测试状态是:{data.status}')
|
||||
connection.ensure_connection()
|
||||
case = UiCase.objects.get(id=data.id)
|
||||
case.status = data.status
|
||||
case.save()
|
||||
@@ -39,7 +38,7 @@ class TestReportWriting:
|
||||
cls.update_step(i)
|
||||
|
||||
@classmethod
|
||||
@orm_retry('update_step')
|
||||
@async_task_db_connection(max_retries=3, retry_delay=2)
|
||||
def update_step(cls, step_data: PageStepsResultModel):
|
||||
log.ui.debug(f'开始写入用例中步骤测试结果,步骤数据是:{step_data.model_dump_json()}')
|
||||
|
||||
|
||||
@@ -56,3 +56,4 @@ ERROR_MSG_0050 = (1050, '接口断言中预期值不允许为空')
|
||||
ERROR_MSG_0055 = (1052, '接口通用断言中的值不允许为空')
|
||||
ERROR_MSG_0056 = (1056, '一个测试环境现在不支持配置2个数据库,删除一个再进行尝试')
|
||||
ERROR_MSG_0057 = (1057, '用例已被删除,历史未执行的全部修改为失败')
|
||||
ERROR_MSG_0058 = (1058, '请求频繁,请稍后重试')
|
||||
|
||||
@@ -8,7 +8,6 @@ from django.conf import settings
|
||||
from jwt import exceptions
|
||||
from rest_framework.authentication import BaseAuthentication
|
||||
from rest_framework.exceptions import AuthenticationFailed
|
||||
from src.auto_test.auto_user.models import User
|
||||
|
||||
class JwtQueryParamsAuthentication(BaseAuthentication):
|
||||
|
||||
@@ -22,9 +21,6 @@ class JwtQueryParamsAuthentication(BaseAuthentication):
|
||||
# 3.验证第三段合法性
|
||||
try:
|
||||
payload = jwt.decode(token, salt, algorithms='HS256')
|
||||
User.objects.get(id=payload['id'])
|
||||
except User.DoesNotExist:
|
||||
raise AuthenticationFailed({'code': 300, 'msg': '没有该用户信息,请重新登录', 'data': None})
|
||||
except exceptions.ExpiredSignatureError:
|
||||
raise AuthenticationFailed({'code': 301, 'msg': '当前用户登录已过期,请重新登录', 'data': None})
|
||||
except jwt.DecodeError:
|
||||
@@ -32,8 +28,7 @@ class JwtQueryParamsAuthentication(BaseAuthentication):
|
||||
except jwt.InvalidTokenError:
|
||||
raise AuthenticationFailed({'code': 303, 'msg': 'token 非法,请使用有效的 token', 'data': None})
|
||||
except Exception as e:
|
||||
|
||||
raise AuthenticationFailed({'code': 304, 'msg': f'token 验证失败: {str(e)}', 'data': None})
|
||||
raise AuthenticationFailed({'code': 304, 'msg': f'请求频繁,请稍后重试: {str(e)}', 'data': None})
|
||||
return payload, token
|
||||
|
||||
def authenticate_header(self, request):
|
||||
|
||||
@@ -12,6 +12,7 @@ from django.core.handlers.asgi import ASGIRequest
|
||||
from rest_framework.response import Response
|
||||
from src.auto_test.auto_user.models import User
|
||||
from src.auto_test.auto_user.views.user_logs import UserLogsCRUD
|
||||
from django.db import close_old_connections
|
||||
|
||||
|
||||
class UserLogsMiddleWare(MiddlewareMixin):
|
||||
@@ -43,8 +44,12 @@ class UserLogsMiddleWare(MiddlewareMixin):
|
||||
user_name = request_data.get('username')
|
||||
if isinstance(user_name, list):
|
||||
user_name = user_name[0]
|
||||
# 确保在数据库操作前后关闭连接
|
||||
close_old_connections()
|
||||
user_id = User._default_manager.get(username=user_name).id
|
||||
close_old_connections()
|
||||
except User.DoesNotExist:
|
||||
close_old_connections()
|
||||
pass
|
||||
try:
|
||||
request_data = json.dumps(request_data, ensure_ascii=False)
|
||||
@@ -63,6 +68,9 @@ class UserLogsMiddleWare(MiddlewareMixin):
|
||||
self._save_user_logs_async(log_entry)
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
# 确保在异步处理完成后关闭所有数据库连接
|
||||
close_old_connections()
|
||||
|
||||
def _capture_request_data(self, request: ASGIRequest, response: Response) -> dict:
|
||||
if request.method == 'POST' or request.method == 'PUT':
|
||||
@@ -115,6 +123,10 @@ class UserLogsMiddleWare(MiddlewareMixin):
|
||||
def _save_user_logs_async(self, log_entry: dict) -> None:
|
||||
"""异步保存用户日志到数据库"""
|
||||
try:
|
||||
# 确保在数据库操作前后关闭连接
|
||||
close_old_connections()
|
||||
UserLogsCRUD.inside_post(log_entry)
|
||||
close_old_connections()
|
||||
except Exception as e:
|
||||
close_old_connections()
|
||||
print(e)
|
||||
|
||||
@@ -123,13 +123,15 @@ if not IS_SQLITE:
|
||||
'PORT': MYSQL_PORT,
|
||||
'OPTIONS': {
|
||||
'charset': 'utf8mb4',
|
||||
'connect_timeout': 20, # 增加到20秒
|
||||
'read_timeout': 60, # 增加读取超时到60秒
|
||||
'write_timeout': 60, # 增加写入超时到60秒
|
||||
'init_command': "SET sql_mode='STRICT_TRANS_TABLES', wait_timeout=28800",
|
||||
'connect_timeout': 20,
|
||||
'read_timeout': 60,
|
||||
'write_timeout': 60,
|
||||
'init_command': "SET sql_mode='STRICT_TRANS_TABLES', wait_timeout=300",
|
||||
'isolation_level': 'READ COMMITTED',
|
||||
'autocommit': True,
|
||||
},
|
||||
'CONN_MAX_AGE': 300,
|
||||
'CONN_HEALTH_CHECKS': True,
|
||||
}
|
||||
}
|
||||
else:
|
||||
|
||||
@@ -10,11 +10,12 @@ from rest_framework.request import Request
|
||||
from mangotools.exceptions import MangoToolsError
|
||||
from mangotools.mangos import Mango
|
||||
from src.exceptions import MangoServerError
|
||||
from src.exceptions.error_msg import ERROR_MSG_0000
|
||||
from src.exceptions.error_msg import ERROR_MSG_0000, ERROR_MSG_0058
|
||||
from src.settings import IS_SEND_MAIL
|
||||
from src.tools.log_collector import log
|
||||
from src.tools.view import RESPONSE_MSG_0107
|
||||
from src.tools.view.response_data import ResponseData
|
||||
from django.db import close_old_connections, utils
|
||||
|
||||
log_dict = {
|
||||
'ui': log.ui,
|
||||
@@ -42,7 +43,11 @@ def error_response(app: str):
|
||||
except FileNotFoundError as error:
|
||||
log_dict.get(app, log.system).error(f'错误内容:{error}-错误详情:{traceback.format_exc()}')
|
||||
return ResponseData.fail(RESPONSE_MSG_0107)
|
||||
except utils.OperationalError as error:
|
||||
close_old_connections()
|
||||
return ResponseData.fail(ERROR_MSG_0058, data=str(error))
|
||||
except Exception as error:
|
||||
close_old_connections()
|
||||
try:
|
||||
username = request.user.get('username')
|
||||
except AttributeError:
|
||||
|
||||
@@ -11,92 +11,9 @@ from functools import wraps
|
||||
import time
|
||||
from django.db import connection, close_old_connections
|
||||
from django.db.utils import Error, InterfaceError, OperationalError
|
||||
from mangotools.mangos import Mango
|
||||
from mangotools.mangos.mangos import MangoToolsError
|
||||
|
||||
from src.settings import IS_SEND_MAIL
|
||||
from src.tools.log_collector import log
|
||||
|
||||
|
||||
def orm_retry(func_name: str, max_retries=5, delay=2):
|
||||
def decorator(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
try_count = 0
|
||||
error = None
|
||||
trace = None
|
||||
while try_count < max_retries:
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Error as error:
|
||||
error = error
|
||||
trace = traceback.print_exc()
|
||||
log.system.error(f'重试失败: 函数:{func_name}, 错误提示:{error}')
|
||||
close_old_connections()
|
||||
connection.ensure_connection()
|
||||
try_count += 1
|
||||
time.sleep(delay) # 等待一段时间后重试
|
||||
else:
|
||||
if error is not None and IS_SEND_MAIL:
|
||||
from src.settings import VERSION
|
||||
kwargs['version'] = VERSION
|
||||
Mango.s(func, error, trace, args, kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def ensure_db_connection(is_while=False, max_retries=3):
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
try_count = 0
|
||||
while try_count < max_retries:
|
||||
from src.exceptions import MangoServerError
|
||||
try:
|
||||
close_old_connections()
|
||||
result = func(*args, **kwargs)
|
||||
# 确保在函数执行完成后关闭连接
|
||||
close_old_connections()
|
||||
return result
|
||||
except (InterfaceError, OperationalError, Error) as e:
|
||||
try_count += 1
|
||||
if try_count > max_retries:
|
||||
log.system.error(
|
||||
f'重试失败-1: 函数:{func.__name__}, 数据list:{args},数据dict:{kwargs} 详情:{traceback.format_exc()}')
|
||||
if IS_SEND_MAIL:
|
||||
from src.settings import VERSION
|
||||
kwargs['version'] = VERSION
|
||||
Mango.s(func, e, traceback.format_exc(), args, kwargs)
|
||||
raise e
|
||||
time.sleep(2)
|
||||
close_old_connections()
|
||||
connection.ensure_connection()
|
||||
except (MangoServerError, MangoToolsError) as e:
|
||||
try_count += 1
|
||||
log.system.error(
|
||||
f'重试失败-2: 函数:{func.__name__}, 数据list:{args},数据dict:{kwargs} 详情:{traceback.format_exc()}')
|
||||
# 确保在异常情况下也关闭连接
|
||||
close_old_connections()
|
||||
except Exception as e:
|
||||
try_count += 1
|
||||
log.system.error(
|
||||
f'重试失败-3, 如果是首次启动项目,请启动完成之后再重启一次!: 函数:{func.__name__}, 数据list:{args},数据dict:{kwargs} 详情:{traceback.format_exc()}')
|
||||
# 确保在异常情况下也关闭连接
|
||||
close_old_connections()
|
||||
else:
|
||||
if is_while:
|
||||
# 确保在返回前关闭连接
|
||||
close_old_connections()
|
||||
return func(*args, **kwargs)
|
||||
# 确保在返回前关闭连接
|
||||
close_old_connections()
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
@contextmanager
|
||||
def db_connection_context():
|
||||
"""数据库连接上下文管理器"""
|
||||
@@ -105,3 +22,71 @@ def db_connection_context():
|
||||
yield
|
||||
finally:
|
||||
close_old_connections()
|
||||
|
||||
|
||||
def async_task_db_connection(max_retries=3, retry_delay=3, infinite_retry=False):
|
||||
"""异步任务数据库连接管理装饰器
|
||||
|
||||
专为异步任务设计,确保在任务开始和结束时正确管理数据库连接
|
||||
集成重试机制,默认重试3次,每次间隔3秒
|
||||
|
||||
Args:
|
||||
max_retries (int): 最大重试次数,默认3次
|
||||
retry_delay (int): 重试间隔秒数,默认3秒
|
||||
infinite_retry (bool): 是否永远重试,默认False
|
||||
"""
|
||||
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
try_count = 0
|
||||
last_exception = None
|
||||
|
||||
while infinite_retry or try_count <= max_retries:
|
||||
try:
|
||||
# 确保开始时连接是干净的
|
||||
close_old_connections()
|
||||
|
||||
# 执行任务
|
||||
result = func(*args, **kwargs)
|
||||
|
||||
# 确保结束时连接被关闭
|
||||
close_old_connections()
|
||||
return result
|
||||
|
||||
except (InterfaceError, OperationalError, Error) as e:
|
||||
last_exception = e
|
||||
try_count += 1
|
||||
|
||||
# 记录重试日志
|
||||
log.system.warning(
|
||||
f'异步任务数据库连接异常,正在进行第{try_count}次重试: '
|
||||
f'函数:{func.__name__}, 错误:{str(e)}'
|
||||
)
|
||||
|
||||
if not infinite_retry and try_count > max_retries:
|
||||
log.system.error(
|
||||
f'异步任务数据库连接重试失败: 函数:{func.__name__}, '
|
||||
f'错误:{str(e)}, 详情:{traceback.format_exc()}')
|
||||
close_old_connections()
|
||||
raise e
|
||||
|
||||
# 等待后重试
|
||||
time.sleep(retry_delay)
|
||||
close_old_connections()
|
||||
connection.ensure_connection()
|
||||
|
||||
except Exception as e:
|
||||
close_old_connections()
|
||||
log.system.error(
|
||||
f'异步任务执行异常: 函数:{func.__name__}, 错误:{str(e)}, 详情:{traceback.format_exc()}')
|
||||
raise e
|
||||
finally:
|
||||
close_old_connections()
|
||||
|
||||
close_old_connections()
|
||||
return None
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
Reference in New Issue
Block a user