优化代码

This commit is contained in:
xupingmao
2025-08-15 22:32:02 +08:00
parent 0bb9778902
commit 3348b85aa9
15 changed files with 75 additions and 68 deletions

View File

@@ -1 +1 @@
develop-2025.08.10
develop-2025.08.15

View File

@@ -22,7 +22,7 @@
{% if show_nav and xconfig.WebConfig.ui_show_footer %}
<div class="footer">
<div class="x-center pad10">
<div class="x-center padding-sm">
<div>
<!-- left -->
</div>

View File

@@ -3,13 +3,13 @@
# @since 2021/02/12 23:04:00
# @modified 2022/03/19 18:54:15
import xutils
from xutils import dbutil
from xutils import Storage
from xutils import textutil, webutil
import math
import web.db
import typing
from xutils import dbutil
from xutils import Storage
from xutils import textutil, webutil
from xnote.core import xauth, xtables, xtemplate, xconfig
from xutils.sqldb import TableProxy, TempTableProxy
from xnote.plugin import DataTable
@@ -49,9 +49,9 @@ class DbScanHandler:
@xauth.login_required("admin")
def do_delete(self):
key = xutils.get_argument("key", "")
key = xutils.get_argument_str("key", "")
dbutil.delete(key)
return dict(code="success")
return webutil.SuccessResult()
@xauth.login_required("admin")
def do_search(self):
@@ -129,11 +129,11 @@ class DbScanHandler:
@xauth.login_required("admin")
def GET(self):
action = xutils.get_argument("action", "")
db_key = xutils.get_argument("db_key", "")
action = xutils.get_argument_str("action", "")
db_key = xutils.get_argument_str("db_key", "")
q_user_name = xutils.get_argument_str("q_user_name", "")
prefix = xutils.get_argument_str("prefix", "")
reverse = xutils.get_argument("reverse", "")
reverse = xutils.get_argument_str("reverse", "")
key_from = xutils.get_argument_str("key_from", "")
p = xutils.get_argument_str("p")
@@ -157,7 +157,7 @@ class DbScanHandler:
if q_user_name != "":
real_prefix = prefix + ":" + q_user_name
def func(key, value):
def func(key: str, value: str):
# print("db_scan:", key, value)
self.scan_count += 1
if self.scan_count > max_scan:
@@ -214,7 +214,7 @@ class DbScanHandler:
else:
return not table_info.is_deleted
def handle_admin_stat_list(self, kw):
def handle_admin_stat_list(self, kw: Storage):
p = xutils.get_argument_str("p", "")
show_delete = xutils.get_argument_bool("show_delete", False)
@@ -324,8 +324,8 @@ class SqlDBDetailHandler:
db = xtables.MySqliteDB(db=dbpath)
offset = webutil.get_page_offset(page=page, page_size=page_size)
result.rows = list(db.select(name, offset=offset, limit=page_size))
result.count = db.select(name, what="count(1) AS amount").first().amount
result.rows = list(db.select(name, offset=offset, limit=page_size)) # type: ignore
result.count = db.select(name, what="count(1) AS amount").first().amount # type: ignore
return result
def get_db_rows(self, name="", page=1, page_size=20, dbpath=""):
@@ -479,7 +479,7 @@ class DatabaseDriverInfoHandler(BaseTablePlugin):
# TODO 可以一次性取出所有的变量
if not hasattr(self, "mysql_vars"):
self.mysql_vars = {}
for item in db.query("show variables"):
for item in db.query("show variables"): # type: ignore
key = item.get("Variable_name")
value = item.get("Value")
self.mysql_vars[key] = value
@@ -653,7 +653,7 @@ class SqliteStructHelper(StructHelper):
def get_create_sql(self):
vars = dict(type="table", tbl_name=self.table_name)
row = self.db.query("select name, sql from sqlite_master where type=$type AND tbl_name=$tbl_name", vars=vars).first()
row = self.db.query("select name, sql from sqlite_master where type=$type AND tbl_name=$tbl_name", vars=vars).first() # type: ignore
assert row != None
return row["sql"]

View File

@@ -18,8 +18,8 @@
<button class="btn btn-default">按钮2</button>
<button class="btn btn-danger">按钮3</button>
<button class="btn danger">按钮4</button>
<button class="btn circle">按钮5</button>
<button class="btn link-style">按钮6</button>
<button class="btn pill">胶囊样式按钮</button>
<button class="btn link-style">链接样式按钮</button>
""" %}
<span class="card-title">效果</span>

View File

@@ -170,7 +170,8 @@
height: 30px;
}
.btn.circle {
/* 胶囊样式 */
.btn.pill {
border-radius: 14px;
}

View File

@@ -17,7 +17,7 @@
}
.dialog-body {
div.dialog-body {
position: absolute;
padding: 10px;
top: 0px;
@@ -25,6 +25,7 @@
overflow-y: auto;
right: 0px;
left: 0px;
margin-bottom: 0px;
}
.dialog-footer {

View File

@@ -7,6 +7,8 @@
import os
import shutil
import threading
import typing
from . import xconfig
try:
@@ -43,11 +45,11 @@ class FileBuilder:
def __enter__(self):
return self
def append_file_to(self, fpath, target_fp):
def append_file_to(self, fpath, target_fp: typing.BinaryIO):
with open(fpath, "rb") as read_fp:
for line in read_fp.readlines():
line = line.strip()
if line == "":
if len(line) == 0:
continue
if line.startswith(b"//"):
# 快速判断,不准确

View File

@@ -77,6 +77,7 @@ def add_failed_log(table_name="", record=None, reason=""):
def migrate_sqlite_table(new_table: xtables.TableProxy, old_dbname="", check_exist_func=None):
"""把sqlite的表从旧的数据库迁移到新的数据库"""
dbpath = xconfig.FileConfig.get_db_path(old_dbname)
if not os.path.exists(dbpath):
return

View File

@@ -8,7 +8,7 @@ def do_upgrade():
def migrate_user_20230616():
new_table = xtables.get_user_table()
def check_user_exists(record):
def check_user_exists(record: dict):
id = record.get("id")
found = new_table.select_first(where=dict(id=id))
if found != None:

View File

@@ -114,7 +114,7 @@ def to_py_datetime(datetime_info: "str|datetime.date"):
def is_str(s):
return isinstance(s, str)
def before(days=None, month=None, format=False):
def before(days:typing.Optional[int]=None, month:typing.Optional[int]=None, format=False):
if days is not None:
fasttime = time.time() - days * SECONDS_PER_DAY
if format:
@@ -122,7 +122,7 @@ def before(days=None, month=None, format=False):
return fasttime
return None
def days_before(days, format=False):
def days_before(days:int, format=False):
"""获取N天前的日期"""
seconds = time.time()
seconds -= days * 3600 * 24
@@ -131,7 +131,8 @@ def days_before(days, format=False):
return time.localtime(seconds)
def format_datetime(value=None, format='%Y-%m-%d %H:%M:%S'):
def format_datetime(value: typing.Union[None, datetime.datetime, float] = None,
format='%Y-%m-%d %H:%M:%S'):
"""格式化日期时间
>>> format_datetime(0)
'1970-01-01 08:00:00'
@@ -144,10 +145,9 @@ def format_datetime(value=None, format='%Y-%m-%d %H:%M:%S'):
st = time.localtime(value)
return time.strftime(format, st)
def format_time(seconds = None):
return format_datetime(seconds)
format_time = format_datetime
def format_time_only(seconds=None):
def format_time_only(seconds: typing.Optional[float]=None):
"""只格式化时间 TODO 时区问题
>>> format_time_only(0)
'08:00:00'
@@ -158,7 +158,7 @@ def format_time_only(seconds=None):
st = time.localtime(seconds)
return time.strftime('%H:%M:%S', st)
def format_weekday(date_str, fmt = "") -> str:
def format_weekday(date_str:str, fmt = "") -> str:
if fmt == "":
fmt = "%Y-%m-%d"
@@ -167,8 +167,9 @@ def format_weekday(date_str, fmt = "") -> str:
return WDAY_DICT.get(wday) or ""
format_wday = format_weekday
convert_date_to_wday = format_weekday
def datetime_to_weekday(datetime_obj):
def datetime_to_weekday(datetime_obj: typing.Union[None, datetime.datetime, str]):
"""把datetime转换成星期"""
if datetime_obj is None:
return ""
@@ -212,7 +213,7 @@ def format_date(value: typing.Union[None, datetime.datetime, str, float]=None, f
raise Exception(f"invalid type: {type(value)}")
def format_mmdd(seconds=None):
def format_mmdd(seconds:typing.Union[str, float, None]=None):
"""格式化月/日
>>> format_mmdd(0)
'01/01'
@@ -229,17 +230,17 @@ def format_mmdd(seconds=None):
else:
return format_date(seconds, "%m/%d")
def format_millis(mills):
def format_millis(mills: typing.Union[int, float]):
return format_time(mills / 1000)
def parse_date_to_timestamp(date_str):
def parse_date_to_timestamp(date_str: str):
st = time.strptime(date_str, DATE_FORMAT)
return time.mktime(st)
def parse_date_to_struct(date_str=""):
return time.strptime(date_str, DATE_FORMAT)
def parse_date_to_object(date_str):
def parse_date_to_object(date_str: str):
"""解析日期结构
@param {string} date_str 日期的格式
@@ -286,7 +287,7 @@ def parse_date_to_object(date_str):
return date_object
def parse_datetime(date = "", fmt = DEFAULT_FORMAT) -> float:
def parse_datetime(date: typing.Union[str, datetime.datetime] = "", fmt = DEFAULT_FORMAT) -> float:
"""解析时间字符串为unix时间戳
:param {string} date: 时间
:param {string} fmt: 时间的格式
@@ -324,15 +325,12 @@ def current_wday() -> str:
wday = str(tm.tm_wday + 1)
return WDAY_DICT.get(wday) or ""
def convert_date_to_wday(date_str):
return format_wday(date_str)
def date_str_add(date_str="1970-01-01", years=0, months=0, days=0):
tm = parse_date_to_struct(date_str)
year, month, day = date_add(tm, years=years, months=months, days=days)
return datetime.datetime(year, month, day).strftime(DATE_FORMAT)
def date_add(tm, years = 0, months = 0, days = 0):
def date_add(tm: typing.Optional[time.struct_time], years = 0, months = 0, days = 0):
"""date计算"""
if tm is None:
tm = time.localtime()
@@ -371,7 +369,7 @@ def get_last_day_of_year(date: datetime.date):
"""获取一年的最后一天"""
return datetime.date(date.year, 12, 31)
def is_leap_year(year):
def is_leap_year(year: int):
"""判断是否是闰年"""
return ((year % 4 == 0) and (year % 100 != 0)) or (year % 400 == 0)
@@ -396,7 +394,9 @@ def get_days_of_month(year=2020, month=1):
d = days[month-1]
return d
def match_time(year = None, month = None, day = None, wday = None, tm = None):
def match_time(year: typing.Optional[int] = None, month: typing.Optional[int] = None,
day: typing.Optional[int] = None, wday: typing.Optional[int] = None,
tm: typing.Optional[time.struct_time] = None):
if tm is None:
tm = time.localtime()
if year is not None and year != tm.tm_year:
@@ -412,7 +412,7 @@ def match_time(year = None, month = None, day = None, wday = None, tm = None):
def get_today():
return format_date()
def is_empty_datetime(value):
def is_empty_datetime(value: typing.Union[str, datetime.datetime]):
if value == DEFAULT_DATETIME or value == "":
return True
if isinstance(value, datetime.datetime):

View File

@@ -272,7 +272,7 @@ def check_table_name(table_name):
TableInfo.check_table_name(table_name)
def get_table_info(table_name):
def get_table_info(table_name: str):
return TableInfo.get_by_name(table_name)
@@ -552,7 +552,7 @@ def db_get(key: str, default_value=None):
except KeyError:
return default_value
def db_get_object(key, default_value=None):
def db_get_object(key: str, default_value=None):
value = db_get(key)
if value is None:
return value
@@ -560,7 +560,7 @@ def db_get_object(key, default_value=None):
return value
def db_get_str(key, default_value=None, encoding="utf-8"):
def db_get_str(key: str, default_value=None, encoding="utf-8"):
check_leveldb()
try:
if key == "" or key == None:
@@ -570,8 +570,8 @@ def db_get_str(key, default_value=None, encoding="utf-8"):
# print("key=%r", key)
raise TypeError("expect str but see %r" % type(key))
key = key.encode(encoding)
value = _leveldb.Get(key)
key_bytes = key.encode(encoding)
value = _leveldb.Get(key_bytes)
if value != None:
return value.decode(encoding)
return None
@@ -610,22 +610,22 @@ def db_put(key: str, obj_value, sync=False, check_table=True):
value = convert_object_to_json(obj_value)
_leveldb.Put(bkey, value.encode("utf-8"), sync=sync)
def put_bytes(key, value, sync=False):
def put_bytes(key: bytes, value: bytes, sync=False):
check_before_write(key.decode("utf-8"))
_leveldb.Put(key, value, sync=sync)
def db_delete(key, sync=False):
def db_delete(key: str, sync=False):
check_leveldb()
check_write_state()
# 删除日志
logging.info("Delete key: %s", key)
key = key.encode("utf-8")
_leveldb.Delete(key, sync=sync)
key_bytes = key.encode("utf-8")
_leveldb.Delete(key_bytes, sync=sync)
def db_batch_delete(keys=[]):
def db_batch_delete(keys: typing.Sequence[str]=[]):
check_leveldb()
key_bytes_list = []
@@ -640,7 +640,7 @@ def create_write_batch(db_instance=None):
def scan(key_from=None,
key_to=None,
func=None, # type: typing.Callable[[str, object], bool]|None
func=None, # type: typing.Callable[[str, typing.Any], bool]|None
reverse=False,
parse_json=True):
"""扫描数据库

View File

@@ -8,6 +8,7 @@
@FilePath : /xnote/xutils/db/dbutil_id_gen.py
@Description : id生成
"""
import typing
import time
import struct
import xutils
@@ -37,13 +38,13 @@ class IdGenerator:
return 0
return int(value)
def create_new_id(self, id_type="uuid", id_value=None):
def create_new_id(self, id_type="uuid", id_value: typing.Union[str, float, None] = None):
if id_type == "uuid":
base.validate_none(id_value, "invalid id_value")
return xutils.create_uuid()
if id_type == "timeseq":
base.validate_none(id_value, "invalid id_value")
assert isinstance(id_value, (float, type(None)))
return TimeSeqId.create(id_value)
if id_type == "auto_increment":
@@ -51,10 +52,11 @@ class IdGenerator:
return self.create_increment_id()
if id_type == "value":
assert id_value != None
if not isinstance(id_value, str):
raise Exception(f"expect str value but see {type(id_value)}")
return id_value
raise Exception("unknown id_type:%s" % id_type)
raise Exception(f"unknown id_type:{id_type}")
@@ -66,7 +68,7 @@ class TimeSeqId:
max_retry = 100
@classmethod
def create(cls, value):
def create(cls, value: typing.Optional[float]):
"""生成一个时间序列, python目前支持的最大时间是 9999 年
@param {float|None} value 时间序列,单位是秒,可选
@return {string} 16位字符串
@@ -74,7 +76,7 @@ class TimeSeqId:
return cls.create_v1(value)
@classmethod
def get_valid_ms(cls, value):
def get_valid_ms(cls, value: typing.Optional[float]):
if value != None:
error_msg = "expect <class 'float'> but see %r" % type(value)
assert isinstance(value, float), error_msg
@@ -98,7 +100,7 @@ class TimeSeqId:
raise Exception("too many retry")
@classmethod
def create_v1(cls, value):
def create_v1(cls, value: typing.Optional[float]):
"""v1版本是使用16进制编码的16位字符"""
ms = cls.get_valid_ms(value)
return struct.pack(">Q", ms).hex()

View File

@@ -51,7 +51,7 @@ class LdbTable:
比较麻烦,要重新构建主键
"""
def __init__(self, table_name, user_name=None):
def __init__(self, table_name:str, user_name=None):
# 参数检查
check_table_name(table_name)
table_info = get_table_info(table_name)
@@ -127,7 +127,7 @@ class LdbTable:
assert len(parts) == 3, parts
return parts[1]
def _format_value(self, key, value):
def _format_value(self, key:str, value):
if not isinstance(value, dict):
value = Storage(_raw=value)
@@ -148,7 +148,7 @@ class LdbTable:
def _create_new_id(self, id_type="uuid", id_value=None):
return self.id_gen.create_new_id(id_type, id_value)
def _check_before_delete(self, key):
def _check_before_delete(self, key: str):
if not key.startswith(self.prefix):
raise Exception("invalid key:%s" % key)

View File

@@ -78,7 +78,7 @@ class KvTableV2:
clean_value_before_update(obj_copy)
return obj_copy
def _check_before_delete(self, key):
def _check_before_delete(self, key: str):
if not key.startswith(self.prefix):
raise Exception("invalid key:%s" % key)
@@ -273,12 +273,12 @@ class KvTableV2:
db_batch_delete(keys)
self.index_db.delete(where="id in $ids", vars=dict(ids=ids))
def delete_by_id(self, id):
def delete_by_id(self, id: typing.Union[int, str]):
id = str(id)
key = self._build_key(id)
self.delete_by_key(key)
def delete_by_key(self, key):
def delete_by_key(self, key: str):
"""通过key删除记录: 这里先删除记录,然后删除索引,如果删除记录成功,删除索引失败,还可以在列表发起重试"""
validate_str(key, "delete_by_key: invalid key")
self._check_before_delete(key)

View File

@@ -273,7 +273,7 @@ class SQLDBInterface:
def select_first(self, vars=None, what='*', where=None, order=None, group=None,
limit=None, offset=None, _test=False):
return None
pass
def update(self, where, vars=None, _test=False, **values):
pass