Compare commits

..

9 Commits

Author SHA1 Message Date
毛鹏
eb57b31100 优化请求 2025-12-05 18:26:39 +08:00
毛鹏
f81991b7d5 优化链接 2025-12-05 18:07:38 +08:00
毛鹏
5837140c5a 优化链接 2025-12-05 17:39:58 +08:00
毛鹏
2c87901e60 优化链接 2025-12-05 17:10:38 +08:00
毛鹏
d9a64520f7 优化链接 2025-12-05 17:08:00 +08:00
毛鹏
22991acfe5 优化链接 2025-12-05 16:39:54 +08:00
毛鹏
18db8f013f 优化链接 2025-12-05 16:16:30 +08:00
毛鹏
ba2fdd1b1f 优化链接 2025-12-05 16:13:11 +08:00
毛鹏
3f80288052 优化链接 2025-12-05 15:10:10 +08:00
26 changed files with 203 additions and 153 deletions

Binary file not shown.

View File

@@ -9,7 +9,7 @@ from queue import Queue
import time
from src.models.system_model import ConsumerCaseModel
from src.tools.decorator.retry import ensure_db_connection
from src.tools.decorator.retry import async_task_db_connection
class ApiCaseFlow:
@@ -24,7 +24,7 @@ class ApiCaseFlow:
cls.running = False
@classmethod
@ensure_db_connection(True)
@async_task_db_connection(max_retries=3, retry_delay=3)
def process_tasks(cls):
while cls.running:
if not cls.queue.empty():
@@ -33,7 +33,7 @@ class ApiCaseFlow:
time.sleep(0.1)
@classmethod
@ensure_db_connection()
@async_task_db_connection(max_retries=3, retry_delay=3)
def execute_task(cls, case_model: ConsumerCaseModel):
from src.auto_test.auto_api.service.test_case.test_case import TestCase
test_case = TestCase(

View File

@@ -17,6 +17,7 @@ from src.enums.tools_enum import StatusEnum, TaskEnum, TestCaseTypeEnum
from src.exceptions import *
from src.models.api_model import RequestModel, ApiCaseResultModel, ApiCaseStepsResultModel, ResponseModel
from src.models.system_model import TestSuiteDetailsResultModel
from src.tools.decorator.retry import async_task_db_connection
from ..base.case_base import CaseBase
from ..base.case_parameter import CaseParameter
@@ -207,12 +208,14 @@ class TestCase:
model.save()
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=3)
def update_test_case(cls, case_id: int, status: int):
model = ApiCase.objects.get(id=case_id)
model.status = status
model.save()
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=3)
def update_test_case_detailed_parameter(cls, parameter_id, result_data: ApiCaseStepsResultModel):
model = ApiCaseDetailedParameter.objects.get(id=parameter_id)
model.status = result_data.status

View File

@@ -39,15 +39,6 @@ class AutoPytestConfig(AppConfig):
return False
# def test_case_consumption(self):
# self.case_flow = PytestCaseFlow()
# self.pytest_task = Thread(target=self.case_flow.process_tasks)
# self.pytest_task.daemon = True
# self.pytest_task.start()
# def shutdown(self):
# self.case_flow.stop()
# self.pytest_task.join()
def pull_code(self):
from src.auto_test.auto_pytest.service.base import git_obj
try:

View File

@@ -14,7 +14,7 @@ from src.auto_test.auto_system.models import TestSuite, TestSuiteDetails
from src.enums.tools_enum import TaskEnum, TestCaseTypeEnum
from src.exceptions import MangoServerError
from src.models.system_model import ConsumerCaseModel
from src.tools.decorator.retry import ensure_db_connection
from src.tools.decorator.retry import async_task_db_connection
from src.tools.log_collector import log
@@ -31,7 +31,7 @@ class PyCaseFlow:
cls.running = False
@classmethod
@ensure_db_connection(True)
@async_task_db_connection(infinite_retry=True)
def process_tasks(cls):
while cls.running:
if not cls.queue.empty():
@@ -40,7 +40,7 @@ class PyCaseFlow:
time.sleep(0.1)
@classmethod
@ensure_db_connection()
@async_task_db_connection()
def execute_task(cls, case_model: ConsumerCaseModel):
from src.auto_test.auto_pytest.service.test_case.test_case import TestCase
test_case = TestCase(
@@ -55,6 +55,7 @@ class PyCaseFlow:
cls.queue.put(case_model)
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=2)
def get_case(cls, data):
with cls._get_case_lock:
test_suite_details = TestSuiteDetails.objects.filter(

View File

@@ -9,16 +9,15 @@ from django.db import connection
from src.auto_test.auto_pytest.models import PytestCase
from src.exceptions import *
from src.models.pytest_model import PytestCaseResultModel
from src.tools.decorator.retry import orm_retry
from src.tools.decorator.retry import async_task_db_connection
class PtestTestReportWriting:
@classmethod
@orm_retry('update_pytest_test_case')
@async_task_db_connection(max_retries=3, retry_delay=2)
def update_pytest_test_case(cls, data: PytestCaseResultModel):
log.ui.debug(f'开始写入Pytest用例<{data.name}>测试结果,用例的测试状态是:{data.status}')
connection.ensure_connection()
case = PytestCase.objects.get(id=data.id)
case.status = data.status
case.result_data = data.result_data

View File

@@ -11,13 +11,13 @@ import atexit
import time
from apscheduler.schedulers.background import BackgroundScheduler
from django.apps import AppConfig
from django.db import close_old_connections
from django.utils import timezone
from mangotools.decorator import func_info
from mangotools.enums import CacheValueTypeEnum
from src.enums.system_enum import CacheDataKeyEnum
from src.enums.tools_enum import TaskEnum
from src.tools.decorator.retry import ensure_db_connection, db_connection_context
from src.tools.log_collector import log
@@ -37,7 +37,9 @@ class AutoSystemConfig(AppConfig):
self.populate_time_tasks()
self.run_tests()
self.init_ass()
self.start_consumer()
# 设置定时任务调度器
self.setup_scheduler()
# 启动后台任务
task1 = threading.Thread(target=run)
@@ -151,6 +153,8 @@ class AutoSystemConfig(AppConfig):
self.system_task.join()
except AttributeError:
pass
# 停止全局调度器
self.stop_scheduler()
def init_ass(self):
try:
@@ -180,14 +184,44 @@ class AutoSystemConfig(AppConfig):
except Exception as e:
log.system.error(f'异常提示:{e}, 首次启动项目,请启动完成之后再重启一次!')
def start_consumer(self):
scheduler = BackgroundScheduler()
scheduler.add_job(self.set_case_status, 'interval', minutes=5)
scheduler.start()
def setup_scheduler(self):
"""设置定时任务调度器"""
try:
# 创建调度器实例
self.scheduler = BackgroundScheduler()
# 添加定时任务
self.scheduler.add_job(
self.set_case_status,
'interval',
minutes=5,
id='set_case_status'
)
# 启动调度器
self.scheduler.start()
# 注册退出时停止调度器
atexit.register(self.stop_scheduler)
except Exception as e:
log.system.error(f'定时任务调度器设置异常: {e}')
def stop_scheduler(self):
"""停止调度器"""
try:
if hasattr(self, 'scheduler') and self.scheduler:
self.scheduler.shutdown()
except Exception as e:
log.system.error(f'停止调度器异常: {e}')
@ensure_db_connection(max_retries=1)
def set_case_status(self):
with db_connection_context():
from django.db import transaction
try:
# 确保开始时连接是干净的
close_old_connections()
from src.auto_test.auto_ui.models import UiCase, UiCaseStepsDetailed, PageSteps
from src.auto_test.auto_pytest.models import PytestCase
from src.auto_test.auto_api.models import ApiInfo, ApiCase, ApiCaseDetailed
@@ -208,4 +242,10 @@ class AutoSystemConfig(AppConfig):
model.objects.filter(
status=TaskEnum.PROCEED.value,
update_time__lt=ten_minutes_ago
).update(status=TaskEnum.FAIL.value)
).update(status=TaskEnum.FAIL.value)
# 确保事务提交
transaction.commit()
finally:
# 确保结束时连接被关闭
close_old_connections()

View File

@@ -18,6 +18,7 @@ from src.enums.system_enum import SocketEnum, ClientTypeEnum, ClientNameEnum
from src.exceptions import *
from src.models.socket_model import SocketDataModel, QueueModel
from src.settings import IS_DEBUG_LOG
from src.tools.decorator.retry import async_task_db_connection
T = TypeVar('T')
@@ -161,6 +162,7 @@ class ChatConsumer(WebsocketConsumer):
log.system.debug(f"发送的数据:{data_json}")
return data_json
@async_task_db_connection(max_retries=1, retry_delay=1)
def verify_user(self) -> tuple[bool, int]:
if 'username' not in self.scope.get("query_string").decode():
log.system.debug('您的执行器代码是旧的,请使用新的执行器再来进行连接!')

View File

@@ -15,7 +15,7 @@ from src.auto_test.auto_system.models import TestSuiteDetails, TestSuite, Tasks
from src.enums.tools_enum import TaskEnum, TestCaseTypeEnum
from src.exceptions import MangoServerError
from src.models.system_model import ConsumerCaseModel
from src.tools.decorator.retry import ensure_db_connection, db_connection_context
from src.tools.decorator.retry import db_connection_context, async_task_db_connection
from src.tools.log_collector import log
from django.db.utils import Error, InterfaceError, OperationalError
@@ -31,7 +31,7 @@ class ConsumerThread:
def stop(self):
self.running = False
@ensure_db_connection(True, max_retries=100)
@async_task_db_connection(max_retries=3, retry_delay=3)
def consumer(self):
reset_tims = time.time()
while self.running:
@@ -92,7 +92,7 @@ class ConsumerThread:
else:
self.send_case(test_suite, test_suite_details, case_model, retry, max_retry)
@ensure_db_connection()
@async_task_db_connection(max_retries=3, retry_delay=3)
def clean_test_suite_status(self):
# 使用数据库连接上下文管理器确保连接被正确释放
with db_connection_context():
@@ -108,7 +108,7 @@ class ConsumerThread:
i.status = TaskEnum.SUCCESS.value
i.save()
@ensure_db_connection()
@async_task_db_connection(max_retries=3, retry_delay=3)
def clean_proceed(self):
"""
把进行中的,修改为待开始,或者失败
@@ -126,7 +126,7 @@ class ConsumerThread:
log.system.info(
f'推送时间超过{self.reset_time}分钟状态重置为待执行用例ID{test_suite_detail.case_id}')
@ensure_db_connection()
@async_task_db_connection(max_retries=3, retry_delay=3)
def clean_proceed_set_fail(self):
"""
把重试次数满的修改为0只有未知错误才会设置为失败
@@ -144,7 +144,7 @@ class ConsumerThread:
log.system.info(
f'重试次数超过{self.retry_frequency + 1}次的任务状态重置为失败用例ID{test_suite_detail.case_id}')
@ensure_db_connection()
@async_task_db_connection(max_retries=3, retry_delay=3)
def update_status_proceed(self, test_suite, test_suite_details):
# 使用数据库连接上下文管理器确保连接被正确释放
with db_connection_context():
@@ -156,7 +156,7 @@ class ConsumerThread:
test_suite_details.push_time = timezone.now()
test_suite_details.save()
@ensure_db_connection()
@async_task_db_connection(max_retries=3, retry_delay=3)
def consumer_error(self, test_suite, test_suite_details, error):
# 使用数据库连接上下文管理器确保连接被正确释放
with db_connection_context():

View File

@@ -17,11 +17,13 @@ from src.enums.system_enum import CacheDataKeyEnum
from src.enums.tools_enum import StatusEnum, EnvironmentEnum, TestCaseTypeEnum
from src.exceptions import *
from src.tools.log_collector import log
from src.tools.decorator.retry import async_task_db_connection
class NoticeMain:
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=2)
def notice_main(cls, test_env: int, project_product: int, test_suite_id: int):
test_object = TestObject.objects.get(environment=test_env, project_product=project_product)
notice_obj = NoticeConfig.objects.filter(test_object=test_object.id, status=StatusEnum.SUCCESS.value)
@@ -38,6 +40,7 @@ class NoticeMain:
raise ToolsError(error.code, error.msg)
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=2)
def test_notice_send(cls, _id):
notice_obj = NoticeConfig.objects.get(id=_id)
test_report = TestReportModel(**{
@@ -71,6 +74,7 @@ class NoticeMain:
log.system.error('暂不支持钉钉打卡')
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=2)
def __we_chat_send(cls, i, test_report: TestReportModel | None = None):
try:
wechat = WeChatSend(WeChatNoticeModel(webhook=i.config), test_report, cls.get_domain_name())
@@ -79,6 +83,7 @@ class NoticeMain:
raise MangoServerError(error.code, error.msg)
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=2)
def __wend_mail_send(cls, i, test_report: TestReportModel | None = None):
try:
user_info = User.objects.filter(name__in=json.loads(i.config))
@@ -103,6 +108,7 @@ class NoticeMain:
email.send_main()
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=2)
def test_report(cls, test_suite_id: int) -> TestReportModel:
test_suite = TestSuite.objects.get(id=test_suite_id)
execution_duration = test_suite.update_time - test_suite.create_time
@@ -170,6 +176,7 @@ class NoticeMain:
)
@staticmethod
@async_task_db_connection(max_retries=3, retry_delay=2)
def mail_config():
try:
send_user = CacheData.objects.get(key=CacheDataKeyEnum.SYSTEM_SEND_USER.name).value
@@ -183,6 +190,7 @@ class NoticeMain:
return send_user, email_host, stamp_key
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=2)
def get_domain_name(cls):
domain_name = f'请先到系统管理->系统设置中设置:{CacheDataKeyEnum.SYSTEM_DOMAIN_NAME.value},此处才会显示跳转连接'
try:

View File

@@ -23,7 +23,7 @@ from .ui import UIConsumer
class ServerInterfaceReflection(APIConsumer, SystemConsumer, UIConsumer, PerfConsumer, PytestConsumer):
def __init__(self):
self.executor = concurrent.futures.ThreadPoolExecutor(10)
self.executor = concurrent.futures.ThreadPoolExecutor(5)
self.server_data_received = Signal()
self.server_data_received.connect(self.data_received_handler)
@@ -39,8 +39,7 @@ class ServerInterfaceReflection(APIConsumer, SystemConsumer, UIConsumer, PerfCon
def handle_task_result(self, future):
try:
result = future.result() # 获取任务执行结果
# 处理任务执行结果
result = future.result()
except Exception as e:
traceback.print_exc() # 打印异常追踪信息
traceback.print_exc()
log.system.error(f"任务执行出现异常:{e}")

View File

@@ -6,12 +6,12 @@
from src.auto_test.auto_api.service.api_import.recording import Recording
from src.models.api_model import RecordingApiModel
from src.tools.decorator.retry import ensure_db_connection
from src.tools.decorator.retry import async_task_db_connection
class APIConsumer:
@classmethod
@ensure_db_connection(max_retries=1)
@async_task_db_connection(max_retries=1)
def a_recording_api(cls, data: dict):
Recording.write(RecordingApiModel(**data))

View File

@@ -8,17 +8,17 @@ from src.auto_test.auto_system.service.update_test_suite import UpdateTestSuite
from src.models.pytest_model import PytestCaseResultModel
from src.models.system_model import TestSuiteDetailsResultModel
from src.tools.decorator.retry import ensure_db_connection
from src.tools.decorator.retry import async_task_db_connection
class PytestConsumer:
@classmethod
@ensure_db_connection()
@async_task_db_connection()
def p_test_suite_details(cls, data: dict):
UpdateTestSuite.update_test_suite_details(TestSuiteDetailsResultModel(**data))
@classmethod
@ensure_db_connection()
@async_task_db_connection()
def p_test_case(cls, data: dict):
PtestTestReportWriting.update_pytest_test_case(PytestCaseResultModel(**data))

View File

@@ -14,13 +14,13 @@ from src.enums.system_enum import CacheDataKey2Enum, ClientTypeEnum
from src.enums.tools_enum import TestCaseTypeEnum
from src.models.socket_model import SocketDataModel
from src.models.system_model import GetTaskModel
from src.tools.decorator.retry import ensure_db_connection
from src.tools.decorator.retry import async_task_db_connection
class SystemConsumer:
@classmethod
@ensure_db_connection()
@async_task_db_connection()
def t_set_operation_options(cls, data: dict):
data = {
'describe': data.get('version'),
@@ -41,7 +41,7 @@ class SystemConsumer:
CacheDataCRUD.inside_post(data)
@classmethod
@ensure_db_connection()
@async_task_db_connection()
def t_set_userinfo(cls, data):
from src.auto_test.auto_system.service.socket_link.socket_user import SocketUser
from src.auto_test.auto_system.consumers import ChatConsumer
@@ -54,7 +54,7 @@ class SystemConsumer:
))
@classmethod
@ensure_db_connection()
@async_task_db_connection()
def t_get_task(cls, data: dict):
data = GetTaskModel(**data)
if data.type == TestCaseTypeEnum.UI:

View File

@@ -9,22 +9,22 @@ from src.auto_test.auto_ui.service.test_report_writing import TestReportWriting
from src.models.system_model import TestSuiteDetailsResultModel
from src.models.ui_model import PageStepsResultModel, UiCaseResultModel
from src.tools.decorator.retry import ensure_db_connection
from src.tools.decorator.retry import async_task_db_connection
class UIConsumer:
@classmethod
@ensure_db_connection()
@async_task_db_connection()
def u_page_steps(cls, data: dict):
TestReportWriting.update_page_step_status(PageStepsResultModel(**data))
@classmethod
@ensure_db_connection()
@async_task_db_connection()
def u_test_suite_details(cls, data: dict):
UpdateTestSuite.update_test_suite_details(TestSuiteDetailsResultModel(**data))
@classmethod
@ensure_db_connection()
@async_task_db_connection()
def u_test_case(cls, data: dict):
TestReportWriting.update_test_case(UiCaseResultModel(**data))

View File

@@ -11,7 +11,7 @@ from apscheduler.triggers.cron import CronTrigger
from src.auto_test.auto_system.models import Tasks, TasksDetails, TimeTasks
from src.auto_test.auto_system.service.tasks.add_tasks import AddTasks
from src.enums.tools_enum import StatusEnum, TestCaseTypeEnum
from src.tools.decorator.retry import orm_retry
from src.tools.decorator.retry import async_task_db_connection
from src.tools.log_collector import log
@@ -55,7 +55,7 @@ class RunTasks:
return False
@classmethod
@orm_retry('timing')
@async_task_db_connection(max_retries=3, retry_delay=2)
def timing(cls, timing_strategy_id):
scheduled_tasks_obj = Tasks.objects.filter(timing_strategy=timing_strategy_id,
status=StatusEnum.SUCCESS.value)
@@ -64,7 +64,7 @@ class RunTasks:
cls.distribute(scheduled_tasks)
@classmethod
@orm_retry('trigger')
@async_task_db_connection(max_retries=3, retry_delay=2)
def trigger(cls, scheduled_tasks_id):
scheduled_tasks = Tasks.objects.get(id=scheduled_tasks_id)
cls.distribute(scheduled_tasks)

View File

@@ -9,11 +9,13 @@ from src.auto_test.auto_system.models import TestSuiteDetails
from src.enums.tools_enum import StatusEnum
from src.enums.tools_enum import TestCaseTypeEnum
from src.models.system_model import CaseCounterModel
from src.tools.decorator.retry import async_task_db_connection
class TestCounter:
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=3)
def res_main(cls, _id):
model = TestSuiteDetails.objects.get(id=_id)
if model.type == TestCaseTypeEnum.API.value:

View File

@@ -14,17 +14,20 @@ from src.enums.tools_enum import TaskEnum, StatusEnum, TestCaseTypeEnum
from src.models.socket_model import SocketDataModel
from src.models.system_model import TestSuiteDetailsResultModel
from src.tools.log_collector import log
from src.tools.decorator.retry import async_task_db_connection
class UpdateTestSuite:
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=3)
def update_test_suite(cls, test_suite_id: int, status: int):
test_suite = TestSuite.objects.get(id=test_suite_id)
test_suite.status = status
test_suite.save()
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=3)
def update_test_suite_details(cls, data: TestSuiteDetailsResultModel):
log.system.debug(f'开始更新测试套数据:{data.model_dump_json()}')
test_suite_detail = TestSuiteDetails.objects.get(id=data.id)
@@ -55,6 +58,7 @@ class UpdateTestSuite:
cls.send_test_result(data.test_suite, data.error_message)
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=3)
def send_test_result(cls, test_suite_id: int, msg: str):
test_suite = TestSuite.objects.get(id=test_suite_id)
if test_suite.is_notice == StatusEnum.SUCCESS.value:

View File

@@ -15,6 +15,7 @@ from src.enums.tools_enum import TaskEnum, TestCaseTypeEnum
from src.exceptions import MangoServerError
from src.models.system_model import ConsumerCaseModel, GetTaskModel
from src.tools.log_collector import log
from src.tools.decorator.retry import async_task_db_connection
class UiCaseFlow:
@@ -43,6 +44,7 @@ class UiCaseFlow:
cls.execute_task(case_model)
@classmethod
@async_task_db_connection(max_retries=3, retry_delay=2)
def get_case(cls, data: GetTaskModel):
model = User.objects.get(username=data.username)
with cls._get_case_lock:

View File

@@ -9,13 +9,13 @@ from django.db import connection
from src.auto_test.auto_ui.models import PageSteps, UiCase, UiCaseStepsDetailed
from src.exceptions import *
from src.models.ui_model import UiCaseResultModel, PageStepsResultModel
from src.tools.decorator.retry import orm_retry
from src.tools.decorator.retry import async_task_db_connection
class TestReportWriting:
@classmethod
@orm_retry('update_page_step_status')
@async_task_db_connection(max_retries=3, retry_delay=2)
def update_page_step_status(cls, data: PageStepsResultModel) -> None:
try:
log.ui.debug(f'开始写入步骤测试结果,步骤数据是:{data.model_dump_json()}')
@@ -28,10 +28,9 @@ class TestReportWriting:
'忽略这个报错,如果是在步骤详情中没有查到,可以忽略这个错误,步骤详情中的调试不会修改整个步骤状态')
@classmethod
@orm_retry('update_test_case')
@async_task_db_connection(max_retries=3, retry_delay=2)
def update_test_case(cls, data: UiCaseResultModel):
log.ui.debug(f'开始写入用例测试结果,用例的测试状态是:{data.status}')
connection.ensure_connection()
case = UiCase.objects.get(id=data.id)
case.status = data.status
case.save()
@@ -39,7 +38,7 @@ class TestReportWriting:
cls.update_step(i)
@classmethod
@orm_retry('update_step')
@async_task_db_connection(max_retries=3, retry_delay=2)
def update_step(cls, step_data: PageStepsResultModel):
log.ui.debug(f'开始写入用例中步骤测试结果,步骤数据是:{step_data.model_dump_json()}')

View File

@@ -56,3 +56,4 @@ ERROR_MSG_0050 = (1050, '接口断言中预期值不允许为空')
ERROR_MSG_0055 = (1052, '接口通用断言中的值不允许为空')
ERROR_MSG_0056 = (1056, '一个测试环境现在不支持配置2个数据库删除一个再进行尝试')
ERROR_MSG_0057 = (1057, '用例已被删除,历史未执行的全部修改为失败')
ERROR_MSG_0058 = (1058, '请求频繁,请稍后重试')

View File

@@ -8,7 +8,6 @@ from django.conf import settings
from jwt import exceptions
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from src.auto_test.auto_user.models import User
class JwtQueryParamsAuthentication(BaseAuthentication):
@@ -22,9 +21,6 @@ class JwtQueryParamsAuthentication(BaseAuthentication):
# 3.验证第三段合法性
try:
payload = jwt.decode(token, salt, algorithms='HS256')
User.objects.get(id=payload['id'])
except User.DoesNotExist:
raise AuthenticationFailed({'code': 300, 'msg': '没有该用户信息,请重新登录', 'data': None})
except exceptions.ExpiredSignatureError:
raise AuthenticationFailed({'code': 301, 'msg': '当前用户登录已过期,请重新登录', 'data': None})
except jwt.DecodeError:
@@ -32,8 +28,7 @@ class JwtQueryParamsAuthentication(BaseAuthentication):
except jwt.InvalidTokenError:
raise AuthenticationFailed({'code': 303, 'msg': 'token 非法,请使用有效的 token', 'data': None})
except Exception as e:
raise AuthenticationFailed({'code': 304, 'msg': f'token 验证失败: {str(e)}', 'data': None})
raise AuthenticationFailed({'code': 304, 'msg': f'请求频繁,请稍后重试: {str(e)}', 'data': None})
return payload, token
def authenticate_header(self, request):

View File

@@ -12,6 +12,7 @@ from django.core.handlers.asgi import ASGIRequest
from rest_framework.response import Response
from src.auto_test.auto_user.models import User
from src.auto_test.auto_user.views.user_logs import UserLogsCRUD
from django.db import close_old_connections
class UserLogsMiddleWare(MiddlewareMixin):
@@ -43,8 +44,12 @@ class UserLogsMiddleWare(MiddlewareMixin):
user_name = request_data.get('username')
if isinstance(user_name, list):
user_name = user_name[0]
# 确保在数据库操作前后关闭连接
close_old_connections()
user_id = User._default_manager.get(username=user_name).id
close_old_connections()
except User.DoesNotExist:
close_old_connections()
pass
try:
request_data = json.dumps(request_data, ensure_ascii=False)
@@ -63,6 +68,9 @@ class UserLogsMiddleWare(MiddlewareMixin):
self._save_user_logs_async(log_entry)
except Exception:
pass
finally:
# 确保在异步处理完成后关闭所有数据库连接
close_old_connections()
def _capture_request_data(self, request: ASGIRequest, response: Response) -> dict:
if request.method == 'POST' or request.method == 'PUT':
@@ -115,6 +123,10 @@ class UserLogsMiddleWare(MiddlewareMixin):
def _save_user_logs_async(self, log_entry: dict) -> None:
"""异步保存用户日志到数据库"""
try:
# 确保在数据库操作前后关闭连接
close_old_connections()
UserLogsCRUD.inside_post(log_entry)
close_old_connections()
except Exception as e:
close_old_connections()
print(e)

View File

@@ -123,13 +123,15 @@ if not IS_SQLITE:
'PORT': MYSQL_PORT,
'OPTIONS': {
'charset': 'utf8mb4',
'connect_timeout': 20, # 增加到20秒
'read_timeout': 60, # 增加读取超时到60秒
'write_timeout': 60, # 增加写入超时到60秒
'init_command': "SET sql_mode='STRICT_TRANS_TABLES', wait_timeout=28800",
'connect_timeout': 20,
'read_timeout': 60,
'write_timeout': 60,
'init_command': "SET sql_mode='STRICT_TRANS_TABLES', wait_timeout=300",
'isolation_level': 'READ COMMITTED',
'autocommit': True,
},
'CONN_MAX_AGE': 300,
'CONN_HEALTH_CHECKS': True,
}
}
else:

View File

@@ -10,11 +10,12 @@ from rest_framework.request import Request
from mangotools.exceptions import MangoToolsError
from mangotools.mangos import Mango
from src.exceptions import MangoServerError
from src.exceptions.error_msg import ERROR_MSG_0000
from src.exceptions.error_msg import ERROR_MSG_0000, ERROR_MSG_0058
from src.settings import IS_SEND_MAIL
from src.tools.log_collector import log
from src.tools.view import RESPONSE_MSG_0107
from src.tools.view.response_data import ResponseData
from django.db import close_old_connections, utils
log_dict = {
'ui': log.ui,
@@ -42,7 +43,11 @@ def error_response(app: str):
except FileNotFoundError as error:
log_dict.get(app, log.system).error(f'错误内容:{error}-错误详情:{traceback.format_exc()}')
return ResponseData.fail(RESPONSE_MSG_0107)
except utils.OperationalError as error:
close_old_connections()
return ResponseData.fail(ERROR_MSG_0058, data=str(error))
except Exception as error:
close_old_connections()
try:
username = request.user.get('username')
except AttributeError:

View File

@@ -11,92 +11,9 @@ from functools import wraps
import time
from django.db import connection, close_old_connections
from django.db.utils import Error, InterfaceError, OperationalError
from mangotools.mangos import Mango
from mangotools.mangos.mangos import MangoToolsError
from src.settings import IS_SEND_MAIL
from src.tools.log_collector import log
def orm_retry(func_name: str, max_retries=5, delay=2):
def decorator(func):
def wrapper(*args, **kwargs):
try_count = 0
error = None
trace = None
while try_count < max_retries:
try:
return func(*args, **kwargs)
except Error as error:
error = error
trace = traceback.print_exc()
log.system.error(f'重试失败: 函数:{func_name}, 错误提示:{error}')
close_old_connections()
connection.ensure_connection()
try_count += 1
time.sleep(delay) # 等待一段时间后重试
else:
if error is not None and IS_SEND_MAIL:
from src.settings import VERSION
kwargs['version'] = VERSION
Mango.s(func, error, trace, args, kwargs)
return wrapper
return decorator
def ensure_db_connection(is_while=False, max_retries=3):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try_count = 0
while try_count < max_retries:
from src.exceptions import MangoServerError
try:
close_old_connections()
result = func(*args, **kwargs)
# 确保在函数执行完成后关闭连接
close_old_connections()
return result
except (InterfaceError, OperationalError, Error) as e:
try_count += 1
if try_count > max_retries:
log.system.error(
f'重试失败-1: 函数:{func.__name__}, 数据list{args},数据dict{kwargs} 详情:{traceback.format_exc()}')
if IS_SEND_MAIL:
from src.settings import VERSION
kwargs['version'] = VERSION
Mango.s(func, e, traceback.format_exc(), args, kwargs)
raise e
time.sleep(2)
close_old_connections()
connection.ensure_connection()
except (MangoServerError, MangoToolsError) as e:
try_count += 1
log.system.error(
f'重试失败-2: 函数:{func.__name__}, 数据list{args},数据dict{kwargs} 详情:{traceback.format_exc()}')
# 确保在异常情况下也关闭连接
close_old_connections()
except Exception as e:
try_count += 1
log.system.error(
f'重试失败-3, 如果是首次启动项目,请启动完成之后再重启一次!: 函数:{func.__name__}, 数据list{args},数据dict{kwargs} 详情:{traceback.format_exc()}')
# 确保在异常情况下也关闭连接
close_old_connections()
else:
if is_while:
# 确保在返回前关闭连接
close_old_connections()
return func(*args, **kwargs)
# 确保在返回前关闭连接
close_old_connections()
return wrapper
return decorator
@contextmanager
def db_connection_context():
"""数据库连接上下文管理器"""
@@ -105,3 +22,71 @@ def db_connection_context():
yield
finally:
close_old_connections()
def async_task_db_connection(max_retries=3, retry_delay=3, infinite_retry=False):
"""异步任务数据库连接管理装饰器
专为异步任务设计,确保在任务开始和结束时正确管理数据库连接
集成重试机制默认重试3次每次间隔3秒
Args:
max_retries (int): 最大重试次数默认3次
retry_delay (int): 重试间隔秒数默认3秒
infinite_retry (bool): 是否永远重试默认False
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try_count = 0
last_exception = None
while infinite_retry or try_count <= max_retries:
try:
# 确保开始时连接是干净的
close_old_connections()
# 执行任务
result = func(*args, **kwargs)
# 确保结束时连接被关闭
close_old_connections()
return result
except (InterfaceError, OperationalError, Error) as e:
last_exception = e
try_count += 1
# 记录重试日志
log.system.warning(
f'异步任务数据库连接异常,正在进行第{try_count}次重试: '
f'函数:{func.__name__}, 错误:{str(e)}'
)
if not infinite_retry and try_count > max_retries:
log.system.error(
f'异步任务数据库连接重试失败: 函数:{func.__name__}, '
f'错误:{str(e)}, 详情:{traceback.format_exc()}')
close_old_connections()
raise e
# 等待后重试
time.sleep(retry_delay)
close_old_connections()
connection.ensure_connection()
except Exception as e:
close_old_connections()
log.system.error(
f'异步任务执行异常: 函数:{func.__name__}, 错误:{str(e)}, 详情:{traceback.format_exc()}')
raise e
finally:
close_old_connections()
close_old_connections()
return None
return wrapper
return decorator