!585 多系统交叉验证性能工具:update3-错误修复

From: @westtide 
Reviewed-by: @gaoruoshu 
Signed-off-by: @gaoruoshu
This commit is contained in:
openeuler-ci-bot
2024-03-07 11:54:19 +00:00
committed by Gitee
20 changed files with 2108 additions and 1198 deletions

View File

@@ -1,76 +1,77 @@
# OSPP多系统交叉验证性能工具
# OSPP:多系统交叉验证性能工具
## 文件结构
多系统交叉验证性能工具位于A-Tun的 tools 目录下,实现了载入对比系统的系统参数与进行性能测试的功能.
```bash
├── config
│ ├── change.json
│ ├── config.json
│ ├── prefix.json
│ └── sysyctl_paraments.json
├── data
├── LICENSE
├── log
├── get_sysctl_ulimit_2023-09-20-10-01-00.log
│ ├── load_config_2023-09-20-10-01-00.log
├── OSPP data
│ ├── systemctl@centos.txt
│ └── systemctl@openeuler.txt
├── README.md
├── src
│ ├── benchmark_test.py
│ ├── clean_data.py
│ ├── clean_log.py
├── get_parameters.py
├── import subprocess.py
│ ├── load_check.py
├── main.py
├── regex_prefix.py
└── set_parameters.py
├── tests
│ ├── test_benchmark_test.py
├── test_get_parameters.py
├── test_load_config.py
└── test_regex_prefix.py
└── tools
├── byte-unixbench
│ ├── LICENSE.txt
│ ├── README.md
│ └── UnixBench
├── netperf
└── requirement.txt
---
## 介绍
### 目录结构
- `conf` 配置文件夹
- `config.json` 配置: 测试模式、主机的 `IP``User` 等信息
- `modify.conf` 配置: 需要修改的`ulimit``sysctl` 参数
- `sysctl_parameters.json` 用于查询键值的字典
- `data` 存储数据处理结果、性能测试结果
- `log` 日志文件
- `src` 主程序
- `tests` 测试文件
- `tools` 第三方性能测试工具
### 支持特性
- 2 种测试模式:
- `communication_test` 支持 3 台主机, 本机控制远程 PC1 与 PC2 进行交叉测试
- `host_test` 支持本机与远程 PC 进行交叉测试
- 3 种参数修改方式:
- `copy by all` 支持载入全部参数
- `copy by block` 支持分块导入
- `copy by line` 支持按行修改
- 参数备份功能
- 性能测试
- `CPU, Memory and Disk` 使用 `UnixBench` 进行测试``
- `Network` 使用 `netperf` 进行测试
- 可以在 `modify.conf``extra_benchmark` 配置额外的性能测试命令
### 依赖
- `Python 3.x`
- `UnixBench` 安装命令: `cd ./tools && git clone https://github.com/kdlucas/byte-unixbench.git`
- `nertpef`: 使用包管理器进行安装
```
## 依赖
Python 3.x
UnixBenchCPU、内存与文件性能测试工具安装在 /tools/ 目录下
netperf网络性能测试工具安装在 /tools/ 目录下
## 运行方法
### 配置主机IP与用户名编辑 /config/config.json
### 配置
### 运行主程序
- 编辑 `/config/config.json`
-`modify.conf` 配置需要修改的`ulimit``sysctl` 参数
### 在`multisysttem_performance`下运行主程序
```python
python ./src/main.py
```
### 清理日志文件
### 清理日志文件
```python
python ./src/clean_log.py
```
### 清理数据文件
### 清理数据文件
```
python ./src/clean_data.py
```
### 测试
```
pytest -p no:logging ./tests/test_***.py
```
### 查看性能测试结果
- 位于 `data` 下的 `UnixBench_res_output.txt` 文件

View File

@@ -15,8 +15,7 @@
"user":"west2"
}
},
"communication_test": {
"communication_test": {
"mode": "communication_test",
"local": {
"ip": "172.22.70.189",

View File

@@ -1,40 +1,9 @@
# 这是一个示例的.conf配置文件,你可以在这里配置要修改的syscyl/ulimit系统参数提供分块修改/全局修改/单项修改的功能
[ulimit]
# 以下是ulimit的配置项,ulimit的参数较少所以没有分块直接在这里配置即可
# 配置项的格式为:参数名 = 参数值,程序会自动识别参数名并修改,去掉#即可生效
# 以下是示例配置,你可以根据自己的需求修改
[ulimit_auto]
# ulimit_auto = 1 表示启用ulimit自动修改将载入对应系统的ulimit配置, 否则不启用自动修改
ulimit_auto = 1
-t = unlimited
# 表示 cpu time (seconds) 指定CPU使用时间的上限单位为秒
-f = unlimited
# 表示 file size (blocks) 最大文件大小(以 512 字节块为单位)
-d = unlimited
# 表示 data seg size (kbytes) 限制数据段数据区的大小以千字节KB为单位
-s = 4096
# 表示 stack size (kbytes) 最大栈段大小(以千字节为单位)
-c = 0
# 表示 core file size (blocks)最大核心文件大小(以 512 字节块为单位
-m = unlimited
# 表示 resident set size (kbytes)指定可使用内存的上限单位为KB
-u = 61501
# 表示 processes 用户最多可开启的程序数目
# -n = 1024
# 表示 file descriptors 最大文件描述符加 1
#-l = 64
# 表示 locked-in-memory size (kbytes) 用于限制锁定在内存中的数据的大小,以 KB 为单位
#-v = unlimited
# 表示 address space (kbytes) 指定可使用的虚拟内存上限单位为KB
#-x = unlimited
# 表示 file locks 控制文件锁的数量,通常设置为 unlimited 表示没有限制
#-i = 61501
# 表示 pending signals 用于限制等待处理的信号的数量
-q = 819200
# 表示 bytes in POSIX msg queues 控制 POSIX 消息队列的大小,以字节为单位
-e = 0
# 表示 max nice 限制进程的 "nice" 值,即进程的优先级
-r = 0
# 表示 max rt priority 用于限制实时进程的优先级
[sysctl_block]
# sysctl分块配置分块配置与全局配置冲突优先于单项修改
@@ -56,7 +25,6 @@ File_Access_Control = 1
Quota_Management = 1
XFS_File_System = 1
Pipes_and_Message_Queues_mqueue = 1
Pipes_and_Message_Queues_pipe = 1
Kernel_Locks = 1
Kernel_Module = 1
Kernel_Memory = 1
@@ -129,4 +97,12 @@ sysctl_copy_all = 0
# sysctl单项修改
# sysctl_single_line = 1 表示启用单项修改,将载入下方列出的系统参数与对应的值
# sysctl_single_line = 0 表示禁用单项修改,不会载入下方列出的系统参数与对应的值
sysctl_single_line = 1
sysctl_single_line = 1
[extra_benchmark]
# extra_benchmark = 1 表示启用额外的性能测试,将在 dir 中查找对应工具,并执行 cmd
# cmd 例子: cmd = 'yum list installed | grep netperf'
# 执行方式: process = subprocess.run(cmd, shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
extra_benchmark = 1
cmd = 'yum list installed | grep netperf'

File diff suppressed because it is too large Load Diff

View File

@@ -1,17 +0,0 @@
real-time non-blocking time (microseconds, -R) unlimited
core file size (blocks, -c) unlimited
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 60555
max locked memory (kbytes, -l) 65536
max memory size (kbytes, -m) unlimited
open files (-n) 524288
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 8192
cpu time (seconds, -t) unlimited
max user processes (-u) 60555
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited

View File

@@ -0,0 +1,19 @@
#!/usr/bin/bash
# Copyright (c) 2022 Huawei Technologies Co., Ltd.
# A-Tune is licensed under the Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
# PURPOSE.
# See the Mulan PSL v2 for more details.
# #############################################
# @Author : westtide
# @Contact : tocokeo@outlook.com
# @Date : 2023/9/22
# @License : Mulan PSL v2
# @Desc : __init__.py
# #############################################

View File

@@ -34,7 +34,7 @@ def clean_files_with_prefix(directory, prefix):
data_directory = "./data/"
prefix_to_clean = ["differ-2023", "statistical-2023"]
prefix_to_clean = ["differ-2023", "statistical-2023", "backup_", "sysctl", "ulimit"]
for prefix in prefix_to_clean:
clean_files_with_prefix(data_directory, prefix)

View File

@@ -18,136 +18,228 @@
# @Desc : get parameters from system
# #############################################
import os
import difflib
from load_check import *
import getpass
import logging
import subprocess
import paramiko
from load_check import timestamp
from global_var import set_value, get_value
# 获取 linux 版本CentOS Steam 或者 OpenEuler
def get_os_version():
linux_version = "Unknown"
"""
获取 linux 版本CentOS Steam 或者 OpenEuler
"""
logging.info(f'开始: get_os_version')
linux_version = "Others"
try:
with open("/etc/os-release", "r") as file:
for line in file:
if line.startswith("NAME="):
linux_version = line.strip().split("=")[1].strip('"').replace(" ", "")
logging.info('获取当前系统版本为 %s', linux_version, extra={'logfile': log_file})
break
output = subprocess.run(['/bin/cat', '/etc/os-release'], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output_text = output.stdout.decode('utf-8').strip('\n')
logging.info(f'output = {output} output_text = {output_text}')
if 'openEuler' in output_text:
linux_version = 'openEuler'
logging.info(f'获取当前系统版本为{linux_version}')
elif 'CentOS' in output_text:
linux_version = 'CentOS'
logging.info(f'获取当前系统版本为{linux_version}')
else:
linux_version = 'Others'
except FileNotFoundError:
logging.error('failed: \'cat /etc/os-release\' ', extra={'logfile': log_file})
logging.error(f'failed: \'cat /etc/os-release\',linux_version = Unknown ')
print("未找到 /etc/os-release 文件,请检查文件是否存在。")
return linux_version
# bash: "syscyl -a" 存储在sysctl@$linux-version$中
def run_sysctl_command_and_save_result(file_name):
bash_command = "sysctl -a"
with open(file_name, "w") as output_file:
process = subprocess.Popen(bash_command, shell=True, stdout=output_file, stderr=subprocess.PIPE)
_, stderr = process.communicate()
logging.info('运行命令 \'%s\'', bash_command, extra={'logfile': log_file})
if process.returncode != 0:
print(f"命令执行出错:{stderr.decode()}")
logging.error('失败: 运行命令 \'%s\' ', bash_command, extra={'logfile': log_file})
def run_command_and_save_result(cmd, file_dir):
"""
在本地运行 cmd 命令并保存结果到 file_dir
cod: 命令
file_dir: 保存结果的路径
"""
logging.info(f'开始: run_command_and_save_result')
try:
with open(file_dir, "w") as output_file:
process = subprocess.run(cmd, shell=False, stdout=output_file, stderr=subprocess.PIPE)
logging.info(f'成功:本地运行命令{cmd},保存结果到文件{file_dir}')
if process.returncode != 0:
print(f"失败:本地命令执行{cmd}出错,查阅日志获得更多信息")
logging.error(f'失败: 本地命令执行出错{cmd}出错,错误信息:{process.stderr.decode()} ')
os.chmod(file_dir, 0o644)
except Exception as e:
print(f"发生异常: {e}, 查阅日志获得更多信息")
logging.error(f'发生异常: {e}, cmd = {cmd}, file_dir = {file_dir}')
# bash: "ulimit -a" 存储在ulimit@$linux-version$中
def run_ulimit_command_and_save_result(file_name):
bash_command = "ulimit -a"
with open(file_name, "w") as output_file:
process = subprocess.Popen(bash_command, shell=True, stdout=output_file, stderr=subprocess.PIPE)
_, stderr = process.communicate()
logging.info('运行命令 \'%s\'', bash_command, extra={'logfile': log_file})
if process.returncode != 0:
print(f"命令执行出错:{stderr.decode()}")
logging.error('失败: 运行命令 \'%s\' ', bash_command, extra={'logfile': log_file})
def local_run_sys_ulimit_save(sysctl_res_file_name, ulimit_res_file_name):
"""
执行sysctl -a 与 ulimit -a命令并保存结果, 封装了 run_command_and_save_result
sysctl_res_file_name: 保存sysctl -a命令结果的文件名
ulimit_res_file_name: 保存ulimit -a命令结果的文件名
"""
logging.info(f'开始: local_run_sys_ulimit_save, sysctl_res_file_name = {sysctl_res_file_name}, '
f'ulimit_res_file_name = {ulimit_res_file_name}')
local_sys_command = ['sysctl', '-a']
local_ulimit_command = ['ulimit', '-a']
if sysctl_res_file_name and ulimit_res_file_name:
run_command_and_save_result(local_sys_command, sysctl_res_file_name)
run_command_and_save_result(local_ulimit_command, ulimit_res_file_name)
# 执行sysctl -a 与 ulimit -a命令并保存结果
def run_command_save(sysctl_res_file_name, ulimit_res_file_name):
# 执行sysctl -a命令并保存结果到sysctl@xxx.txt文件中
run_sysctl_command_and_save_result("./data/" + sysctl_res_file_name)
logging.info('成功run_sysctl_command_and_save_result, 保存:\'%s\'', "./data/" + sysctl_res_file_name,
extra={'logfile': log_file})
# 执行ulimit -a命令并保存结果到ulimit@xxx.txt文件中
run_ulimit_command_and_save_result("./data/" + ulimit_res_file_name)
logging.info('成功run_ulimit_command_and_save_result, 保存在:\'%s\'', "./data/" + ulimit_res_file_name,
extra={'logfile': log_file})
def remote_run_command_and_copy(remote):
"""
在 remote 主机上执行 sysctl/ulimit 命令,并将结果保存到本机 ./data/{cmd}_{os_version}.txt
remote: {"user":"xxx","ip":"xxx"}
"""
logging.info(f'开始: remote_run_command_and_copy')
if remote is None:
print("错误: 空参数")
logging.error(f'失败: remote is None')
return False
remote_user = remote["user"]
remote_ip = remote["ip"]
os_version = ""
if not remote_user or not remote_ip:
print("错误: 空参数")
logging.error(f'失败: remote_user and remote_ip is None')
return False
try:
# 连接远程主机
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
getpassword = getpass.getpass(f"正在连接: 请输入 {remote_user}@{remote_ip} 的密码: ")
logging.info(f'密码输入成功,正在连接 {remote_user}@{remote_ip}')
ssh_client.connect(hostname=remote_ip, username=remote_user, password=getpassword)
logging.info(f'成功: 连接 {remote_user}@{remote_ip}')
# 获取remote系统版本
get_os_cmd = 'cat /etc/os-release'
stdin, stdout, stderr = ssh_client.exec_command(get_os_cmd)
output_text = stdout.read().decode('utf-8').strip('\n')
os_version = 'openEuler' if 'openEuler' in output_text else 'CentOS'
logging.info(f'成功: 获取 {remote_user}@{remote_ip} 的系统版本为{os_version}')
cmds = ['sysctl', 'ulimit']
for cmd in cmds:
remote_cmd = f'{cmd} -a'
ssh_client.exec_command('cd ~/A-Tune/tools/multisystem_performance')
local_file = f'./data/{cmd}@{os_version}-{timestamp}.txt'
stdin, stdout, stderr = ssh_client.exec_command(remote_cmd)
logging.info(f'成功: 在 {remote_user}@{remote_ip} 上执行命令{remote_cmd}')
res = stdout.read().decode('utf-8')
with open(local_file, 'w') as file:
file.write(res)
logging.info('成功: 保存命令结果到本地文件{local_file}')
print(f'成功:在 {remote_ip} 上获取 {cmd} 参数,保存到本地文件{local_file}')
ssh_client.close()
except paramiko.AuthenticationException as auth_exception:
print(f"身份验证失败: {auth_exception}")
logging.error(f"身份验证失败: {auth_exception}")
except paramiko.SSHException as ssh_exception:
print(f"SSH 连接错误: {ssh_exception}")
logging.error(f"SSH 连接错误: {ssh_exception}")
logging.error(f'')
except Exception as e:
print(f"发生异常: {e}")
logging.error(f"发生异常: {e}")
return os_version
# 使用 difflib 的比较,保存比较结果 diff_result 到文件 save_difflib_res
def use_differ_res(os_version, sysctl_res_file_name, save_difflib_res):
# 根据当前操作系统匹配对方的sysctl@xxx文件使用scp获取对方配置
file1 = sysctl_res_file_name
if os_version == "openEuler":
# 后期这里会替换为适应两种模式的命令
command = "scp west1@172.22.60.188:~/my_ospp/data/sysctl@CentOSStream.txt ./data/ "
try:
subprocess.run(command, shell=True, check=True)
except subprocess.CalledProcessError as e:
print(f"执行命令出错:{e}")
file2 = "sysctl@CentOSStream.txt"
else:
# 后期这里会替换为适应两种模式的命令
command = "scp west2@172.22.60.189:~/my_ospp/data/sysctl@openEuler.txt ./data/ "
try:
subprocess.run(command, shell=True, check=True)
except subprocess.CalledProcessError as e:
print(f"执行命令出错:{e}")
file2 = "sysctl@openEuler.txt"
def run_command_and_save_by_mode(mode, data):
"""
根据 mode 选择模式,在远程执行命令并保存结果到本地,封装了 remote_run_command_and_copy 和 local_run_sys_ulimit_save
mode: 1-host_test 模式, 2-communication_test 模式
data: json 配置文件,结构为{"user":"xxx","ip":"xxx"}
local: 本地保存结果的目录
"""
# 使用difflib的比较file1与file2的结果
with open("./data/" + file1) as f1, open("./data/" + file2) as f2:
logging.info('compare file1 = \'%s\', file2 = \'%s\'', "./data/" + file1, "./data/" + file2,
extra={'logfile': log_file})
logging.info(f'开始: run_command_and_save_by_mode')
if mode == '1':
# host_test 模式时在local上远程控制pc1与pc2, 并将结果保存到./data/{cmd}_{os_version}.txt,返回值为 os_version
PC1_VERSION = remote_run_command_and_copy(data["host_test"]["pc1"])
PC2_VERSION = remote_run_command_and_copy(data["host_test"]["pc2"])
logging.info(f'pc1_version = {PC1_VERSION}, pc2_version = {PC2_VERSION}')
set_value('PC1_VERSION', PC1_VERSION)
set_value('PC2_VERSION', PC2_VERSION)
if mode == '2':
# Communication_test 模式: 在 local 上远程控制 remote, 并将结果保存到./data/{cmd}_{os_version}.txt,返回值为 os_version
LOCAL_VERSION = get_os_version()
set_value('LOCAL_VERSION', LOCAL_VERSION)
local_sys = f"./data/sysctl@{LOCAL_VERSION}-{timestamp}.txt"
local_ulimit = f"./data/ulimit@{LOCAL_VERSION}-{timestamp}.txt"
logging.info(f'local_version = {LOCAL_VERSION},local_sys = {local_sys},local_ulimit = {local_ulimit}')
if LOCAL_VERSION == 'Others':
print('本工具暂不支持该系统版本,请使用 CentOS 或 openEuler,继续使用可能出现错误,查阅日志获得更多信息')
logging.error(f'本工具暂不支持该系统版本,请使用 CentOS 或 openEuler')
# 保存本地sysctl -a 与 ulimit -a 命令结果
local_run_sys_ulimit_save(local_sys, local_ulimit)
# 在local上远程控制remote,并将结果保存到./data/{cmd}_{os_version}.txt
REMOTE_VERSION = remote_run_command_and_copy(data["communication_test"]["remote"])
set_value('REMOTE_VERSION', REMOTE_VERSION)
logging.info(f'remote_version = {REMOTE_VERSION},remote = {data["communication_test"]["remote"]}')
def differ_sysctl_res(mode, save_differ_sysctl_res):
"""
使用 difflib 比较,保存比较结果 diff_result 到文件 save_differ_sysctl_res
mode: 1-host_test 模式下读取pc1与pc2的文件
2-communication_test 模式下读取local与remote的文件
save_differ_sysctl_res: 指定保存differ结果的文件名
"""
file1 = ''
file2 = ""
logging.info(f'开始: differ_sysctl_res')
PC1_VERSION = get_value('PC1_VERSION')
PC2_VERSION = get_value('PC2_VERSION')
LOCAL_VERSION = get_value('LOCAL_VERSION')
REMOTE_VERSION = get_value('REMOTE_VERSION')
if mode == '1':
# host_test 模式时比较pc1与pc2的结果已经存在local上对比结果仍存在 local 上
# 只对比 sysctl -a 命令结果
file1 = f'./data/sysctl@{PC1_VERSION}-{timestamp}.txt'
file2 = f'./data/sysctl@{PC2_VERSION}-{timestamp}.txt'
set_value('file1', file1)
set_value('file2', file2)
logging.info(f'file1 = {file1}, file2 = {file2}')
if mode == '2':
# Communication_test 模式时比较local与remote的结果已经存在local上对比结果仍存在 local 上
# 只对比 sysctl -a 命令结果
file1 = f'./data/sysctl@{LOCAL_VERSION}-{timestamp}.txt'
file2 = f'./data/sysctl@{REMOTE_VERSION}-{timestamp}.txt'
set_value('file1', file1)
set_value('file2', file2)
logging.info(f'file1 = {file1}, file2 = {file2}')
if not file1 or not file2:
print("错误: file1 or file2 is None")
logging.error(f'错误: file1 or file2 is None')
# 使用 difflib 的比较file1与file2的结果
with open(file1) as f1, open(file2) as f2:
logging.info(f'开始: compare file1 = {file1}, file2 = {file2}')
text1 = f1.readlines()
text2 = f2.readlines()
logging.info('text1 in \'%s\', text2 in \'%s\'', "./data/" + file1, "./data/" + file2,
extra={'logfile': log_file})
logging.info(f'text1 in {file1}, text2 in {file2}')
diff = difflib.Differ()
diff_result = diff.compare(text1, text2)
logging.info('compare file1 and file2 by difflib successfully', extra={'logfile': log_file})
logging.info(f'成功: compare file1 and file2 by difflib')
# 保存比较结果diff_result到文件save_difflib_res
# 保存比较结果 diff_result 到文件 save_differ_sysctl_res
try:
with open("./data/" + save_difflib_res, 'w') as file:
with open(f'./data/{save_differ_sysctl_res}', 'w') as file:
file.write("".join(diff_result))
logging.info('save diff_result to file \'%s\' successfully', "./data/" + save_difflib_res,
extra={'logfile': log_file})
print(f"比较结果已成功保存到文件: ./data/{save_difflib_res}")
os.chmod(f'./data/{save_differ_sysctl_res}', 0o644)
print(f'成功: 比较结果已保存到文件: ./data/{save_differ_sysctl_res}')
logging.info(f'成功: save diff_result to file ./data/{save_differ_sysctl_res}')
except IOError:
logging.error('failed: save diff_result to file \'%s\' ', "./data/" + save_difflib_res,
extra={'logfile': log_file})
print("保存文件时出现错误,请检查路径和文件权限。")
# 显式分析保存比较结果到文件
def analyse_result_to_file(file2lack_dict, file2add_dict, file2modify_dict, file1, file2, file2lack, file2add,
file2modify, save_statistical_res):
result_str = ""
result_str += "file1: " + file1 + ", file2: " + file2 + " 统计结果为:\n"
result_str += "file2 缺失的行有: " + str(len(file2lack)) + "\n"
result_str += "file2 增加的行有: " + str(len(file2add)) + "\n"
result_str += "file2 修改的行有: " + str(len(file2modify)) + "\n"
result_str += "====================================================================\n"
result_str += "file2 缺失的行有:\n"
for k, v in file2lack_dict.items():
result_str += k + " " + v + "\n"
result_str += "====================================================================\n"
result_str += "file2 增加的行有:\n"
for k, v in file2add_dict.items():
result_str += k + " " + v + "\n"
result_str += "====================================================================\n"
result_str += "file2 修改的行有:\n"
for k, v in file2modify_dict.items():
result_str += k + " " + v + "\n"
try:
with open("./data/" + save_statistical_res, "w") as file:
file.write(result_str)
logging.info(f'成功:统计结果已成功保存到文件:./data/{save_statistical_res}')
print("统计结果已成功保存到文件: " + save_statistical_res)
except IOError:
logging.error(f'Error:保存文件时出现错误,请检查路径和文件权限')
print("保存文件时出现错误,请检查路径和文件权限。")
print('保存文件时出现错误,请检查路径和文件权限,查阅日志获得更多信息')
logging.error(f'失败: save diff_result to file ./data/{save_differ_sysctl_res}')

View File

@@ -0,0 +1,37 @@
#!/usr/bin/bash
# Copyright (c) 2022 Huawei Technologies Co., Ltd.
# A-Tune is licensed under the Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
# PURPOSE.
# See the Mulan PSL v2 for more details.
# #############################################
# @Author : westtide
# @Contact : tocokeo@outlook.com
# @Date : 2023/10/22
# @License : Mulan PSL v2
# @Desc : global variables
# #############################################
_global_dict = {}
def _init():
global _global_dict
_global_dict = {}
def set_value(key, value):
_global_dict[key] = value
def get_value(key):
try:
return _global_dict[key]
except KeyError:
return None

View File

@@ -18,189 +18,208 @@
# @Desc : load config file and check dependence tools
# #############################################
from datetime import datetime
import os
import paramiko
import time
import getpass
import logging
import subprocess
import paramiko
from global_var import set_value
# 生成时间戳,用于文件命名
def generate_timestamp_string():
now = datetime.now()
timestamp_string = now.strftime("%Y-%m-%d-%H-%M-%S")
return timestamp_string
timestamp = generate_timestamp_string()
timestamp = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
log_file = f'./log/load_config_{timestamp}.log'
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(file_handler)
logging.getLogger().setLevel(logging.INFO)
logging.info('loac_check.py start')
logging.info('开始: load_check.py')
# 测试pc1与pc2的连通性
def connect_test(pc1, pc2):
"""
测试pc1与pc2的连通性,使用 ssh 连接
pc1: {'ip': '', 'user': ''}
pc2: {'ip': '', 'user': ''}
"""
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# getpass.getpass() 用于隐藏输入的密码
getpassword = getpass.getpass(f"请输入 {pc2['ip']} 的密码: ")
logging.info('输入密码:********', extra={'logfile': log_file})
logging.info(f'输入密码:********')
ssh.connect(hostname=pc2['ip'], port=22, username=pc2['user'], password=getpassword)
logging.info('%s%s 连通性测试成功', {pc1['ip']}, {pc2['ip']}, extra={'logfile': log_file})
logging.info(f'{pc1["ip"]}{pc2["ip"]} 连通性测试成功')
print(f"{pc1['ip']}{pc2['ip']} 连通性测试成功")
ssh.close()
except paramiko.AuthenticationException:
logging.error('%s%s 连通性测试失败:认证失败', pc1['ip'], pc2['ip'], extra={'logfile': log_file})
logging.error(f'{pc1["ip"]}{pc2["ip"]} 连通性测试失败:认证失败')
print(f"{pc1['ip']}{pc2['ip']} 连通性测试失败0 认证失败")
except paramiko.SSHException as e:
logging.error('%s%s 连通性测试失败:%s', pc1['ip'], pc2['ip'], str(e), extra={'logfile': log_file})
logging.error(f'{pc1["ip"]}{pc2["ip"]} 连通性测试失败:{str(e)}')
print(f"{pc1['ip']}{pc2['ip']} 连通性测试失败1 {str(e)}")
except Exception as e:
logging.error('%s%s 连通性测试失败:%s', pc1['ip'], pc2['ip'], str(e), extra={'logfile': log_file})
logging.error(f'{pc1["ip"]}{pc2["ip"]} 连通性测试失败:{str(e)}')
print(f"{pc1['ip']}{pc2['ip']} 连通性测试失败2 {str(e)}")
finally:
ssh.close()
return 1
# host_test模式: 3台主机
def host_test_body(data):
"""
host_test模式: 适用于3台主机的测试模式, 在local上进行pc1与pc2的连通性测试
data: config/config.json 配置文件
"""
local = {'ip': '', 'user': ''} # 本机
pc1 = {'ip': '', 'user': ''} # pc1
pc2 = {'ip': '', 'user': ''} # pc2
print("host_test模式: 在 local 上进行 PC1 与 PC2 的交叉验证测试, 请确保 config 文件配置正确, "
"将在 PC1 上进行性能测试, 请确保 PC1 的项目目录下工具安装正确")
if not data["host_test"]["local"]["ip"]:
logging.error('文件读取本机 IP 失败,需要用户输入', extra={'logfile': log_file})
logging.error(f'文件读取本机 IP 失败,需要用户输入')
local['ip'] = input("请输入本机 IP: ")
if not data["host_test"]["local"]["user"]:
logging.error('文件读取本机 User 失败,需要用户输入', extra={'logfile': log_file})
logging.error(f'文件读取本机 User 失败,需要用户输入')
local['user'] = input("请输入本机用户名: ")
set_value('HOST_LOCAL_USER', local['user'])
set_value('HOST_LOCAL_IP', local['ip'])
if not data["host_test"]["pc1"]["ip"]:
logging.error('文件读取 PC1 IP 失败,需要用户输入', extra={'logfile': log_file})
logging.error(f'文件读取 PC1 IP 失败,需要用户输入')
pc1['ip'] = input("请输入PC1 IP: ")
if not data["host_test"]["pc1"]["user"]:
logging.error('文件读取 PC1 User 失败,需要用户输入', extra={'logfile': log_file})
logging.error(f'文件读取 PC1 User 失败,需要用户输入')
pc1['user'] = input("请输入 PC1 用户名: ")
set_value('HOST_PC1_USER', pc1['user'])
set_value('HOST_PC1_IP', pc1['ip'])
if not data["host_test"]["pc2"]["ip"]:
logging.error('文件读取 PC2 IP 失败,需要用户输入', extra={'logfile': log_file})
logging.error(f'文件读取 PC2 IP 失败,需要用户输入')
pc2['ip'] = input("请输入 PC2 IP: ")
if not data["host_test"]["pc2"]["user"]:
logging.error('文件读取 PC2 User 失败,需要用户输入', extra={'logfile': log_file})
logging.error(f'文件读取 PC2 User 失败,需要用户输入')
pc2['user'] = input("请输入 PC2 用户名: ")
set_value('HOST_PC2_USER', pc2['user'])
set_value('HOST_PC2_IP', pc2['ip'])
try:
connect_test(local, pc1)
logging.info('本机与 PC1 连通性测试成功', extra={'logfile': log_file})
except Exception:
logging.error('本机与 PC1连通性测试失败', extra={'logfile': log_file})
logging.info(f'本机与 PC1 连通性测试成功')
except ConnectionError:
logging.error(f'本机与 PC1连通性测试失败')
print("本机与 PC1 连通性测试失败,请检查网络连接或 config 文件配置")
try:
connect_test(local, pc2)
logging.info('本机与 PC2 连通性测试成功', extra={'logfile': log_file})
except Exception:
logging.error('本机与 PC2 连通性测试失败', extra={'logfile': log_file})
logging.info(f'本机与 PC2 连通性测试成功')
except ConnectionError:
logging.error(f'本机与 PC2 连通性测试失败')
print("本机与 PC2 连通性测试失败,请检查网络连接或 config 文件配置")
try:
connect_test(pc1, pc2)
logging.info(' PC1 与 PC2 连通性测试成功', extra={'logfile': log_file})
except Exception:
logging.error('PC1 与 PC2连通性测试失败', extra={'logfile': log_file})
logging.info(f' PC1 与 PC2 连通性测试成功')
except ConnectionError:
logging.error(f'PC1 与 PC2连通性测试失败')
print("PC1 与 PC2 连通性测试失败,请检查网络连接或 config 文件配置")
else:
logging.error('无效的模式选择', extra={'logfile': log_file})
logging.error(f'无效的模式选择')
print("无效的模式选择,请重新选择")
# communication_test模式: 2台主机
def communication_test_body(data):
local = data['communication_test']['local']
remote = data['communication_test']['remote']
"""
communication_test模式: 2台主机的测试模式, local与remote的连通性测试
data: config/config.json 配置文件
"""
if not data["communication_test"]:
logging.error(f'文件读取 communication_test key 失败,需要用户输入')
print("文件读取 communication_test key 失败,需要用户输入")
try:
local = data['communication_test']['local']
remote = data['communication_test']['remote']
except KeyError:
logging.error(f'文件读取 local or remote 失败,需要用户输入')
print("文件读取失败,需要用户输入")
if not data["communication_test"]["local"]["ip"]:
logging.error('文件读取本机 IP 失败,需要用户输入', extra={'logfile': log_file})
logging.error(f' 读取本机 ip 失败,需要用户输入')
local['ip'] = input("请输入本机IP: ")
if not data["communication_test"]["local"]["user"]:
logging.error('文件读取本机 User 失败,需要用户输入', extra={'logfile': log_file})
logging.error(f'文件读取本机 user 失败,需要用户输入')
local['user'] = input("请输入本机用户名: ")
set_value('LOCAL_USER', local['user'])
set_value('LOCAL_IP', local['ip'])
if not data["communication_test"]["remote"]["ip"]:
logging.error('文件读取远程主机 IP 失败,需要用户输入', extra={'logfile': log_file})
logging.error(f'文件读取远程主机 ip 失败,需要用户输入')
remote['ip'] = input("请输入远程主机IP:")
if not data["communication_test"]["remote"]["user"]:
logging.error('文件读取远程主机 User 失败,需要用户输入', extra={'logfile': log_file})
logging.error(f'文件读取远程主机 user 失败,需要用户输入')
remote['user'] = input("请输入远程主机用户名:")
set_value('REMOTE_USER', remote['user'])
set_value('REMOTE_IP', remote['ip'])
try:
connect_test(local, remote)
except Exception:
logging.error('本机与远程主机连通性测试失败', extra={'logfile': log_file})
except ConnectionError:
logging.error(f'本机与远程主机连通性测试失败')
print("本机与远程主机连通性测试失败,请检查网络连接或 config 文件配置")
def dependence_check():
"""
检查依赖工具是否存在UnixBench, netperf
"""
path = "./tools"
print("检查: UnixBench 工具是否存在")
logging.info('检查: UnixBench 工具是否存在', extra={'logfile': log_file})
logging.info(f'检查: UnixBench 工具是否存在')
if os.path.exists(path + "/byte-unixbench//UnixBench"):
print("检查: UnixBench存在,满足 CPU 与内存测试依赖")
logging.info('检查: UnixBench 已存在,满足 CPU 与内存测试依赖', extra={'logfile': log_file})
print("检查UnixBench:存在,满足 CPU 与内存测试依赖")
logging.info(f'检查: UnixBench 已存在,满足 CPU 与内存测试依赖')
else:
print("tools 目录下不存在 UnixBench,不满足 CPU 与内存测试依赖")
logging.error('tools 目录下不存在 UnixBench', extra={'logfile': log_file})
if input("是否下载 UnixBench? (y/n)") == "y":
print("检查UnixBench: tools 目录下不存在 UnixBench,不满足 CPU 与内存测试依赖")
logging.error(f'tools 目录下不存在 UnixBench')
if input("是否下载 UnixBench? (y/n): ") == "y":
try:
print("尝试执行命令: cd ./tools && git clone https://github.com/kdlucas/byte-unixbench.git ")
logging.info('尝试下载 UnixBench', extra={'logfile': log_file})
logging.info(f'尝试下载 UnixBench')
subprocess.run("cd ./tools && git clone https://github.com/kdlucas/byte-unixbench.git ", check=True)
print("下载UnixBench成功")
logging.info('下载UnixBench成功', extra={'logfile': log_file})
logging.info(f'下载UnixBench成功')
except:
logging.error('下载UnixBench失败', extra={'logfile': log_file})
logging.error(f'下载UnixBench失败')
print(
"下载UnixBench失败,请检查网络连接,或访问 https://github.com/kdlucas/byte-unixbench 手动克隆项目至 tools 目录")
logging.error('UnixBench 缺失', extra={'logfile': log_file})
logging.error(f'UnixBench 缺失')
else:
print("请手动克隆 UnixBench 至 tools 目录")
logging.error('UnixBench 缺失', extra={'logfile': log_file})
logging.error(f'UnixBench 缺失')
exit(1)
print("检查: netperf 工具是否存在")
if os.path.exists(path + "/netperf"):
print("检查: netperf 已存在,满足网络测试依赖")
logging.info('netperf 已存在git版本', extra={'logfile': log_file})
logging.info(f'netperf 已存在git版本')
else:
if subprocess.run("yum list installed | grep netperf", shell=True).returncode == 0:
cmd = ['yum', 'list', 'installed', '|', 'grep', 'netperf']
if subprocess.run(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE).returncode == 0:
print("检查: netperf 已安装,满足网络测试依赖")
logging.info('netperf 已存在,命令行版本', extra={'logfile': log_file})
logging.info(f'netperf 已存在,命令行版本')
else:
print("tools 目录下不存在 netperf,不满足网络测试依赖")
if input("是否下载 netperf? (y/n)") == "y":
try:
print("尝试执行命令: git clone https://github.com/HewlettPackard/netperf.git ./tools/")
logging.info('尝试clone netperf', extra={'logfile': log_file})
logging.info(f'尝试clone netperf')
subprocess.run("/usr/bin/git clone https://github.com/HewlettPackard/netperf.git ./tools/",
check=True)
print("下载 netperf 成功")
logging.info('clone netperf 成功', extra={'logfile': log_file})
except:
logging.error('clone netperf 失败', extra={'logfile': log_file})
logging.info(f'clone netperf 成功')
except ConnectionError:
logging.error(f'clone netperf 失败')
print(
"下载 netperf 失败,请检查网络连接,或访问 https://github.com/HewlettPackard/netperf 手动克隆项目至 tools 目录")
else:
print("请手动克隆 netperf 至 tools 目录")
logging.error('netperf 缺失', extra={'logfile': log_file})
logging.error(f'netperf 缺失')
exit(1)
def change_sysctl_parameters():
if True:
input("请按任意键继续...")
def change_ulimit_parameters():
if True:
input("请按任意键继续...")

View File

@@ -18,13 +18,29 @@
# @Desc : main function of multisystem performance analysis
# #############################################
from get_parameters import *
from load_check import *
from process_parameters import *
from global_var import _init
from run_benchmark import benchmark
from load_check import dependence_check
from modify_parameters import modify_parameters
from process_parameters import process_parameters
# 检查依赖
dependence_check( )
def main():
# 获取、对比、存储系统参数
process_parameters()
_init()
# 检查+-依赖
dependence_check()
# 获取、对比、存储参数参数
process_parameters()
# 修改 sysctl 参数 ulimit 参数
modify_parameters()
# 性能测试
benchmark()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,448 @@
#!/usr/bin/bash
# Copyright (c) 2022 Huawei Technologies Co., Ltd.
# A-Tune is licensed under the Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
# PURPOSE.
# See the Mulan PSL v2 for more details.
# #############################################
# @Author : westtide
# @Contact : tocokeo@outlook.com
# @Date : 2023/9/22
# @License : Mulan PSL v2
# @Desc : modify sysctl parameters and ulimit parameters
# #############################################
import re
import json
import logging
import configparser
import paramiko
import subprocess
from getpass import getpass
from load_check import timestamp
from global_var import get_value
from get_parameters import local_run_sys_ulimit_save
def find_key(sysctl_parameters, target):
"""
find_key 函数用于递归查找 sysctl_parameters 中是否存在 target 键(块),如果存在则返回对应的值
sysctl_parameters: 用于查找的字典
target: 目标键
返回: result/value or None, 是键/块
"""
if isinstance(sysctl_parameters, dict):
for key, value in sysctl_parameters.items():
if key == target:
logging.info(f'成功:{target} in sysctl parameters 0')
return value
elif isinstance(value, (dict, list)):
result = find_key(value, target)
if result is not None:
logging.info(f'成功:{target} in sysctl parameters 1')
return result
elif isinstance(sysctl_parameters, list):
for item in sysctl_parameters:
result = find_key(item, target)
if result is not None:
logging.info(f'成功:{target} in sysctl parameters 2')
return result
return None
def modify_ulimit():
"""
modify_ulimit 函数用于修改 ulimit 参数,读入配置文件modify.conf中的 ulimit 行
ulimit_section: 配置文件中 ulimit 部分的行conf 格式
"""
SELECTED_MODE = get_value('SELECTED_MODE')
HOST_PC1_USER = get_value('HOST_PC1_USER')
HOST_PC1_IP = get_value('HOST_PC1_IP')
HOST_PC2_USER = get_value('HOST_PC2_USER')
REMOTE_USER = get_value('REMOTE_USER')
REMOTE_IP = get_value('REMOTE_IP')
PC2_VERSION = get_value('PC2_VERSION')
REMOTE_VERSION = get_value('REMOTE_VERSION')
success = fail = 0
ulimit_parameters = ['-t', '-f', '-d', '-s', '-c', '-m', '-u', '-n', '-l', '-v', '-x', '-i', '-q', '-e', '-r', '-N']
logging.info(f'开始: modify ulimit parameters, SELECTED_MODE = {SELECTED_MODE}')
if SELECTED_MODE == '1':
ulimit_file_pc2 = f"./data/ulimit@{PC2_VERSION}-{timestamp}.txt"
logging.info(f'模式={SELECTED_MODE}, 将在 PC1 = {HOST_PC2_USER}@{HOST_PC1_IP} 上修改 ulimit 参数')
try:
with open(ulimit_file_pc2, 'r') as f:
ulimit_section = f.readlines()
logging.info(f'成功: 读取 ulimit_file_PC2 = {ulimit_file_pc2}')
except FileNotFoundError as e:
logging.error(f'e = {e}')
print(f'错误: 未找到 ulimit_file_PC2 = {ulimit_file_pc2}, 请检查日志')
commands = []
for par in ulimit_parameters:
pattern = r'(^|\s){}'.format(re.escape(par))
for line in ulimit_section:
matches = re.findall(pattern, line)
if matches:
value = line.split()[-1]
line = line.strip("\n")
logging.info(f'{par} is found in {line}, value is {value}')
cmd = ['sudo', 'ulimit', f'{par}', f'{value}']
commands.append(cmd)
logging.info(f'将 cmd = {cmd} 写入 commands')
break # 如果找到了第一个匹配就退出循环
try:
# 连接远程主机
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
getpassword = getpass.getpass(f"正在连接: 请输入 {HOST_PC1_USER}@{HOST_PC1_IP} 的密码: ")
logging.info(f'密码输入成功,正在连接 {HOST_PC1_USER}@{HOST_PC1_IP}')
ssh_client.connect(hostname=HOST_PC1_IP, username=HOST_PC1_USER, password=getpassword)
logging.info(f'成功: 连接{HOST_PC1_USER}@{HOST_PC1_IP}')
for cmd in commands:
try:
process = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout = process.stdout
stderr = process.stderr
success += 1
logging.info(f'成功 {success}: 载入参数{cmd}, stdout = {stdout},stderr:{stderr}')
except subprocess.CalledProcessError as e:
fail += 1
logging.error(f'失败 {fail}: 载入参数 {cmd}失败, 错误:{e},请查看日志获取具体信息')
logging.info(f'成功 {success}, 失败 {fail}')
ssh_client.close()
except paramiko.AuthenticationException as e:
print(f"身份验证失败: {e}")
logging.error(f"身份验证失败: {e}")
except paramiko.SSHException as e:
print(f"SSH 连接错误: {e}")
logging.error(f"SSH 连接错误: {e}")
if SELECTED_MODE == '2':
ulimit_file_remote = f"./data/ulimit@{REMOTE_VERSION}-{timestamp}.txt"
logging.info(f'模式={SELECTED_MODE}, 将在 PC1 = {REMOTE_USER}@{REMOTE_IP} 上修改 ulimit 参数')
try:
with open(ulimit_file_remote, 'r') as f:
ulimit_data = f.readlines()
except FileNotFoundError as e:
logging.error(f'ulimit_file_remote = {ulimit_file_remote}, e = {e}')
print(f'错误: 未找到文件, 请检查日志')
for par in ulimit_parameters:
pattern = r'(^|\s){}'.format(re.escape(par))
for line in ulimit_data:
matches = re.findall(pattern, line)
if matches:
value = line.split()[-1]
line = line.strip("\n")
logging.info(f'{par} is found in {line}, value is {value}')
cmd = ['sudo', 'ulimit', f'{par}', f'{value}']
try:
process = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
success += 1
logging.info(f'成功 {success}: 载入参数{cmd}')
except subprocess.CalledProcessError as e:
fail += 1
logging.error(f'失败 {fail}: 载入参数 {cmd}失败, 错误:{e},请查看日志获取具体信息')
break # 如果找到了第一个匹配就退出循环
logging.info(f'成功 {success}, 失败 {fail}')
logging.info(f'结束: modify ulimit parameters, SELECTED_MODE = {SELECTED_MODE}')
def modify_sysctl_by_line(sysctl_single_line_sections):
"""
sysctl 参数的行修改模式
"""
logging.info('开始: 使用行模式修改 sysctl 参数 ')
success = fail = 0
for key, value in sysctl_single_line_sections.items():
if key == "sysctl_single_line":
logging.info(f'跳过: {key} = {value}')
continue
try:
logging.info(f'尝试载入 {key} = {value} 的参数')
subprocess.run(f'sysctl {key} = {value}', check=True)
success += 1
print(f'成功:载入 {key} = {value} 的参数')
logging.info(f'{success}成功:载入 sysctl {key} = {value} 的参数')
except subprocess.CalledProcessError as e:
fail += 1
logging.error(f'{fail}失败{e}:载入 {key} = {value} 的参数')
print(f'使用行模式修改 sysctl 参数, 成功:{success} 个, 失败:{fail} 个,请查看日志获取具体信息')
logging.info(f'载入 sysctl 参数成功 {success} 个, 失败 {fail} 个 请查看日志获取具体信息')
def modify_sysctl_by_block(sysctl_block_sections, sysctl_parameters):
"""
sysctl 参数的块修改模式,封装run_command函数
"""
logging.info('开始: Modify sysctl by block')
HOST_PC1_USER = get_value('HOST_PC1_USER')
HOST_PC1_IP = get_value('HOST_PC1_IP')
HOST_PC2_USER = get_value('HOST_PC2_USER')
REMOTE_USER = get_value('REMOTE_USER')
REMOTE_IP = get_value('REMOTE_IP')
PC2_VERSION = get_value('PC2_VERSION')
REMOTE_VERSION = get_value('REMOTE_VERSION')
SELECTED_MODE = get_value('SELECTED_MODE')
if SELECTED_MODE == '1':
success = fail = 0
sysctl_file_pc2 = f"./data/sysctl@{PC2_VERSION}-{timestamp}.txt"
logging.info(f'模式={SELECTED_MODE}, 将在 PC1 = {HOST_PC2_USER}@{HOST_PC1_IP} 上修改 sysctl 参数')
try:
with open(sysctl_file_pc2, 'r') as f:
data = f.readlines()
logging.info(f'成功: 读取 ulimit_file_PC2 = {sysctl_file_pc2}')
except FileNotFoundError as e:
logging.error(f'e = {e}')
print(f'错误: 未找到 ulimit_file_PC2 = {sysctl_file_pc2}, 请检查日志')
commands = []
for category, params in sysctl_block_sections.items(): # 同样需要查找块
if category == "sysctl_block": # 跳过标记块
logging.info(f'跳过: {category} = {params}')
continue
try:
result = find_key(sysctl_parameters, category) # 命中的块, 在块内找子参数
logging.info(f'开始: category = {category}, result = find_key = {result} ')
if result is not None:
for key, _ in result.items(): # 对块的每个参数进行查找
flag = 0
for line in data:
if key in line:
tmp = line.strip('\n')
cmd = ['sudo', 'sysctl', f'{tmp}']
commands.append(cmd)
logging.info(f'成功: key = {key}, 将 {cmd} 写入 commands')
flag += 1
break
if flag == 0:
logging.error(f'失败: {key} not in {data}')
else:
logging.error(f'错误: category = {category} not in sysctl_parameters.json')
except KeyError:
logging.error(f'无效: parameter configuration for {category}')
logging.info(f'查找sysctl_block_sections块结束')
try:
# 连接远程主机
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
getpassword = getpass.getpass(f"正在连接: 请输入 {HOST_PC1_USER}@{HOST_PC1_IP} 的密码: ")
logging.info(f'密码输入成功,正在连接 {HOST_PC1_USER}@{HOST_PC1_IP}')
ssh_client.connect(hostname=HOST_PC1_IP, username=HOST_PC1_USER, password=getpassword)
logging.info(f'成功: 连接{HOST_PC1_USER}@{HOST_PC1_IP}')
for cmd in commands:
try:
process = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout = process.stdout
stderr = process.stderr
success += 1
logging.info(f'成功 {success}: 载入参数{cmd}, stdout = {stdout},stderr:{stderr}')
except subprocess.CalledProcessError as e:
fail += 1
logging.error(f'失败 {fail}: 载入参数 {cmd}失败, 错误:{e},请查看日志获取具体信息')
except paramiko.AuthenticationException as e:
print(f"身份验证失败: {e}")
logging.error(f"身份验证失败: {e}")
except paramiko.SSHException as e:
print(f"SSH 连接错误: {e}")
logging.error(f"SSH 连接错误: {e}")
if SELECTED_MODE == '2':
logging.info(f'开始: SELECTED_MODE = {SELECTED_MODE}')
REMOTE_VERSION = get_value('REMOTE_VERSION')
sysctl_file_remote = f'./data/sysctl@{REMOTE_VERSION}-{timestamp}.txt'
commands = []
with open(sysctl_file_remote, 'r') as f:
data = f.readlines()
for category, params in sysctl_block_sections.items(): # 查找块
if category == "sysctl_block":
logging.info(f'跳过: {category} = {params}')
continue
try:
result = find_key(sysctl_parameters, category) # 命中的块
logging.info(f'开始: category = {category}, result = find_key = {result} ')
if result is not None:
for key, _ in result.items(): # 对块的每个参数进行查找
flag = 0
for line in data:
if key in line:
tmp = line.strip('\n')
cmd = ['sudo', 'sysctl', f'{tmp}']
commands.append(cmd)
logging.info(f'成功: key = {key}, 将 {cmd} 写入 commands')
flag += 1
break
if flag == 0:
logging.error(f'失败: {key} not in {sysctl_file_remote}')
else:
logging.error(f'错误: category = {category} not in sysctl_parameters.json')
except KeyError:
logging.error(f'无效: parameter configuration for {category}')
logging.info(f'查找sysctl_block_sections块结束')
run_command(commands, 'sysctl block')
def modify_sysctl_by_copyall():
"""
sysctl 参数的全量修改模式,封装run_command函数
"""
logging.info('modify sysctl parameters by copy all')
copy_all = get_value('file2')
run_command(copy_all, 'sysctl copy all')
pass
def backup_new_parameters():
local_run_sys_ulimit_save('backup_new_sysctl', 'backup_new_ulimit')
logging.info('成功:备份新的参数')
def backup_old_parameters():
local_run_sys_ulimit_save('backup_old_sysctl', 'backup_old_ulimit')
logging.info('成功:备份旧的参数')
def run_command(commands, modify_mode):
"""
run_command 函数用于载入 sysctl 参数
file_name: 用于载入的文件名
modify_mode: 修改模式, 用于日志记录
"""
print(f'开始: 以{modify_mode}模式修改参数')
logging.info(f'开始: 以{modify_mode}模式修改参数')
success = fail = index = 0
for cmd in commands:
try:
process = subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
success += 1
stdout = process.stdout
stderr = process.stderr
logging.info(f'成功 {success}: 载入参数{cmd}, stdout = {stdout},stderr:{stderr}')
except subprocess.CalledProcessError as e:
logging.error(f'失败 {fail}: 载入参数 {cmd}失败, 错误:{e},请查看日志获取具体信息')
fail += 1
index += 1
print(f'使用{modify_mode}修改参数: 成功{success}个, 失败{fail}个, 请查看日志获取具体信息')
logging.info(f'使用{modify_mode}修改参数: 成功{success}个, 失败{fail}个, 请查看日志获取具体信息')
def modify_parameters():
"""
modify_parameters 函数用于修改参数, 读取配置文件 modify.conf
"""
logging.info('开始: modify parameters')
HOST_LOCAL_USER = get_value('HOST_LOCAL_USER')
HOST_LOCAL_IP = get_value('HOST_LOCAL_IP')
HOST_PC1_USER = get_value('HOST_PC1_USER')
HOST_PC1_IP = get_value('HOST_PC1_IP')
HOST_PC2_USER = get_value('HOST_PC2_USER')
HOST_PC2_IP = get_value('HOST_PC2_IP')
LOCAL_USER = get_value('LOCAL_USER')
LOCAL_IP = get_value('LOCAL_IP')
REMOTE_USER = get_value('REMOTE_USER')
REMOTE_IP = get_value('REMOTE_IP')
PC1_VERSION = get_value('PC1_VERSION')
PC2_VERSION = get_value('PC2_VERSION')
LOCAL_VERSION = get_value('LOCAL_VERSION')
REMOTE_VERSION = get_value('REMOTE_VERSION')
SELECTED_MODE = get_value('SELECTED_MODE')
FILE2_MODIFY_DICT = get_value('FILE2_MODIFY_DICT')
logging.info(
f'HOST_LOCAL_USER = {HOST_LOCAL_USER}, HOST_LOCAL_IP = {HOST_LOCAL_IP}, HOST_PC1_USER = {HOST_PC1_USER}, HOST_PC1_IP = {HOST_PC1_IP}, HOST_PC2_USER = {HOST_PC2_USER}, HOST_PC2_IP = {HOST_PC2_IP}, LOCAL_USER = {LOCAL_USER}, LOCAL_IP = {LOCAL_IP}, REMOTE_USER = {REMOTE_USER}, REMOTE_IP = {REMOTE_IP}')
logging.info(
f'PC1_VERSION = {PC1_VERSION}, PC2_VERSION = {PC2_VERSION}, LOCAL_VERSION = {LOCAL_VERSION}, REMOTE_VERSION = {REMOTE_VERSION}')
logging.info(f'SELECTED_MODE = {SELECTED_MODE}, FILE2_MODIFY_DICT = {FILE2_MODIFY_DICT}')
# 用于匹配、查找的 sysctl 参数集合,已分类、标注
json_file = './config/sysctl_parameters.json'
with open(json_file, 'r') as f:
sysctl_parameters = json.load(f)
logging.info(f'成功导入 json_file: {json_file}')
# 备份旧参数
backup_old_parameters()
logging.info('备份成功')
# 读取配置文件
def preserve_case(option):
return option
config = configparser.ConfigParser()
config.optionxform = preserve_case # 使用常规函数来禁用键名小写转换
config.read('./config/modify.conf')
logging.info(f'读取配置文件: {config}')
# 获取 sysctl 部分的配置项
sysctl_block_section = config['sysctl_block']
logging.info(f'sysctl_block_section = {sysctl_block_section}')
for key, value in sysctl_block_section.items():
logging.debug("%s: %s", key, value)
sysctl_copy_all_section = config['sysctl_copy_all']
logging.info(f'sysctl_copy_all_section = {sysctl_copy_all_section}')
sysctl_single_line_section = config['sysctl_single_line']
logging.info(f'sysctl_single_line_section = {sysctl_single_line_section}')
sys_block, sys_all, sys_single = True, True, True
# 获取修改 ulimit 的配置项
if config['ulimit_auto']['ulimit_auto'] == '0':
logging.info('ulimit 部分为空, 不进行修改')
elif config['ulimit_auto']['ulimit_auto'] == '1':
logging.info('开始载入配置项, 修改ul')
modify_ulimit()
logging.info('修改 ulimit parameters 结束')
# 检测冲突、是否启用各种模式
if sysctl_block_section['sysctl_block'] == '0':
logging.info('sysctl_block部分为空, 不使用块修改模式')
sys_block = False
if sysctl_copy_all_section['sysctl_copy_all'] == '0':
logging.info('sysctl_copy_all部分为空, 不使用全量修改模式')
sys_all = False
if sysctl_single_line_section['sysctl_single_line'] == '0':
logging.info('sysctl_single_line部分为空, 不使用单项修改模式')
sys_single = False
if sys_block and sys_all:
print('sysctl_block部分和sysctl_copy_all部分同时存在, 请检查/config/modify.conf配置文件, 重启程序')
logging.error('sysctl_block部分和sysctl_copy_all部分同时存在, 冲突')
# sysctl 分块模式修改
if sys_block:
modify_sysctl_by_block(sysctl_block_section, sysctl_parameters)
if sys_single:
# 如果单项修改模式也启用, 则再次执行单项修改
modify_sysctl_by_line(sysctl_single_line_section)
# sysctl 全量修改模式
if sys_all:
modify_sysctl_by_copyall()
if sys_single:
# 如果单项修改模式也启用, 则再次执行单项修改
modify_sysctl_by_line(sysctl_single_line_section)
# 仅使用 sysctl 单项修改模式
if not sys_block and not sys_all and sys_single:
modify_sysctl_by_line(sysctl_single_line_section)
# 备份新参数
backup_new_parameters()

View File

@@ -22,11 +22,115 @@ import os
import json
import logging
from get_parameters import run_command_and_save_by_mode, differ_sysctl_res, \
PC1_VERSION, PC2_VERSION
from global_var import set_value, get_value
from get_parameters import run_command_and_save_by_mode, differ_sysctl_res
from load_check import host_test_body, communication_test_body, timestamp
SELECTED_MODE = None
def process_differ(data):
"""
读取比较结果文件,统计比较结果,并保存到文件
封装了 run_command_and_save_by_mode 和 differ_sysctl_res 两个函数
data: 读取的配置文件config.json
"""
PC1_VERSION = get_value('PC1_VERSION')
PC2_VERSION = get_value('PC2_VERSION')
LOCAL_VERSION = get_value('LOCAL_VERSION')
REMOTE_VERSION = get_value('REMOTE_VERSION')
file2_lack = []
file2_modify = []
file2_new = [] # 统计比较结果:列表格式
save_differ_sysctl_res = f'differ-{timestamp}.txt' # 保存比较结果的文件名, 参数会被传递到differ_sysctl_res 函数
save_statistical_res = f'statistical-{timestamp}.txt' # 保存统计结果的文件名, 参数会被传递到statistical_res 函数
'''
根据测试模式选择运行命令
host 模式: PC1 与 PC2 的 4 个文件
communication 模式: local 与 remote 的 4 个文件
格式: local_file = f'./data/{cmd}@{os_version}-{timestamp}.txt' 应该不会冲突
'''
run_command_and_save_by_mode(get_value('SELECTED_MODE'), data)
'''
根据测试模式进行 differ 比较
host 模式: 比较 PC1 与 PC2 的 sysctl
communication 模式: 比较 local 与 remote 的 sysctl
根据 timestamp 避免文件冲突
'''
differ_sysctl_res(get_value('SELECTED_MODE'), save_differ_sysctl_res)
# 读取比较结果文件 save_differ_sysctl_res
try:
with open(f'./data/{save_differ_sysctl_res}') as f:
logging.info(f'./data/{save_differ_sysctl_res} as f ')
lines = f.readlines()
except FileNotFoundError as e:
logging.info(f'{e}file not found')
for i, line in enumerate(lines):
if line.startswith('-'):
if lines[i + 2].startswith('-') or lines[i + 2].startswith(' '):
file2_lack.append(line.strip())
elif lines[i + 2].startswith('+'):
file2_modify.append(line.strip())
elif lines[i + 2].startswith('?'):
if lines[i + 4].startswith('+') and lines[i + 6].startswith('?'):
file2_modify.append(line.strip())
elif line.startswith('+'):
file2_new.append(line.strip())
logging.info(f'file2_lack: {file2_lack}')
logging.info(f'file2_new: {file2_new}')
logging.info(f'file2_modify: {file2_modify}')
# 统计比较结果:字典格式
file2lack_dict = {line.strip('-+ \n').split('=', 1)[0]: line.strip('-+ \n').split('=', 1)[1].strip() for line in
file2_lack}
file2add_dict = {line.strip('-+ \n').split('=', 1)[0]: line.strip('-+ \n').split('=', 1)[1].strip() for line in
file2_new}
FILE2_MODIFY_DICT = {line.strip('-+ \n').split('=', 1)[0]: line.strip('-+ \n').split('=', 1)[1].strip() for line in
file2_modify}
set_value('FILE2_MODIFY_DICT', FILE2_MODIFY_DICT)
# 打印统计结果
print(f'file1:{get_value("file1")}", file2:{get_value("file2")},'
f'统计结果为: ')
print("file2 缺失的行有: " + str(len(file2lack_dict)) + "")
logging.info(f'file2 缺失的行有: {str(len(file2lack_dict))}')
print("file2 增加的行有: " + str(len(file2add_dict)) + "")
logging.info(f'file2 增加的行有: {str(len(file2add_dict))}')
print("file2 修改的行有: " + str(len(FILE2_MODIFY_DICT)) + "")
logging.info(f'file2 修改的行有: {str(len(FILE2_MODIFY_DICT))}')
result_str = ""
result_str += "file1 与 file2 的比较统计结果为:\n"
result_str += "file2 缺失的行有: " + str(len(file2_lack)) + "\n"
result_str += "file2 增加的行有: " + str(len(file2_new)) + "\n"
result_str += "file2 修改的行有: " + str(len(file2_modify)) + "\n"
result_str += "====================================================================\n"
result_str += "file2 缺失的行有:\n"
for k, v in file2lack_dict.items():
result_str += k + " " + v + "\n"
result_str += "====================================================================\n"
result_str += "file2 增加的行有:\n"
for k, v in file2add_dict.items():
result_str += k + " " + v + "\n"
result_str += "====================================================================\n"
result_str += "file2 修改的行有:\n"
for k, v in FILE2_MODIFY_DICT.items():
result_str += k + " " + v + "\n"
# 保存统计结果到文件
try:
with open(f'./data/{save_statistical_res}', "w") as file:
file.write(result_str)
os.chmod(f'./data/{save_statistical_res}', 0o644)
print(f'成功:统计结果已成功保存到文件:{save_statistical_res}')
logging.info(f'成功:统计结果已成功保存到文件:./data/{save_statistical_res}')
except IOError:
print("失败:保存文件时出现错误,请检查路径和文件权限,查阅日志获得更多信息")
logging.error(f'IOError:保存文件时出现错误,请检查路径和文件权限,查阅日志获得更多信息')
def process_parameters():
@@ -39,9 +143,10 @@ def process_parameters():
data = json.load(f)
# 选择测试模式
global SELECTED_MODE
SELECTED_MODE = input("(1)host_test模式: 支持3台PC, (2)communication_test模式:支持2台PC. 请选择测试模式(1/2):")
logging.info(f'选择的测试模式为: {SELECTED_MODE}')
set_value('SELECTED_MODE', SELECTED_MODE)
if SELECTED_MODE == '1':
host_test_body(data)
@@ -66,7 +171,9 @@ def process_differ(data):
file2_new = [] # 统计比较结果:列表格式
save_differ_sysctl_res = f'differ-{timestamp}.txt' # 保存比较结果的文件名, 参数会被传递到differ_sysctl_res 函数
save_statistical_res = f'statistical-{timestamp}.txt' # 保存统计结果的文件名, 参数会被传递到statistical_res 函数
SELECTED_MODE = get_value('SELECTED_MODE')
PC1_VERSION = get_value('PC1_VERSION')
PC2_VERSION = get_value('PC2_VERSION')
'''
根据测试模式选择运行命令
host 模式: PC1 与 PC2 的 4 个文件
@@ -154,5 +261,3 @@ def process_differ(data):
logging.error(f'IOError:保存文件时出现错误,请检查路径和文件权限,查阅日志获得更多信息')
def get_mode():
return SELECTED_MODE

View File

@@ -24,13 +24,13 @@ import logging
import subprocess
import paramiko
import getpass
import configparser
from process_parameters import get_mode
from global_var import get_value
from load_check import timestamp
def all_test():
command = "cd ./tools/byte-unixbench/UnixBench && "
cmd = ['cd', './tools/byte-unixbench/UnixBench', '&&', './Run']
print(f'开始执行memory,cpu,disk测试, 请稍等...')
logging.info(f'开始:memory,cpu and disk 测试')
@@ -64,7 +64,7 @@ def tcp_test(ip):
def net_test():
if get_mode() == '1':
if get_value('SELECT_MODE') == '1':
print(
"host_test模式: 在 local 上进行 PC1 与 PC2 的交叉验证测试, 将在 PC1 上进行性能测试, 请确保各自项目目录下工具安装正确")
logging.info(f'开始:net 测试')
@@ -99,7 +99,7 @@ def net_test():
print(f"发生未知错误: {str(ex)}")
logging.info(f'发生未知错误: {str(ex)}')
if get_mode() == '2':
if get_value('SELECT_MODE') == '2':
logging.info(f'开始:net 测试')
file_path = f'./config/config.json'
with open(file_path, 'r') as f:
@@ -113,9 +113,53 @@ def net_test():
logging.info(f'结束:net 测试')
def extra_test():
logging.info(f'开始: extra_test')
def preserve_case(option):
return option
# 读取配置文件
config = configparser.ConfigParser()
config.optionxform = preserve_case # 使用常规函数来禁用键名小写转换
config.read('./config/modify.conf')
logging.info(f'读取配置文件: {config}')
# 获取 extra_benchmark 部分的配置项
if config['extra_benchmark']['extra_benchmark'] == '0':
logging.info('extra_benchmark 不启用, 不使用额外性能测试')
if config['extra_benchmark']['extra_benchmark'] == '1':
logging.info('extra_benchmark启用, 使用额外性能测试')
for key, value in config['extra_benchmark'].items():
if not key or not value:
logging.info(f'key = {key}为空或者 key = {key}为空')
elif key == 'extra_benchmark':
continue
elif key and value:
try:
command = value.split()
logging.info(f'{key} = {value}')
process = subprocess.Popen(command, shell=False, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
stdout = process.stdout.read()
stderr = process.stderr.read()
if process.stderr != 0:
logging.error(
f'失败执行{command}: 错误:{stderr},stdout = {stdout}')
logging.info(f'成功执行{command}, stdout = {stdout}')
print(f'成功执行{command}, 输出: {stdout}')
except ValueError as e:
logging.error(f'e = {e}, value={value}')
except subprocess.CalledProcessError as e:
logging.error(f'e={e}')
print(f'执行{value}失败,请查看日志获得更多信息')
except FileNotFoundError as e:
logging.error(f'e={e}, value.split() = {value.split()}, command={command}')
def benchmark():
logging.info(f'开始:benchmark 测试')
print("请选择测试类型:(1):memory,cpu and disk (2):net (3):all")
print("请选择测试类型:(1):memory,cpu and disk (2):net (3):all (4):extra test")
test_type = input()
logging.info(f'输入: test_type={test_type}')
if test_type == '1':
@@ -134,6 +178,10 @@ def benchmark():
net_test()
logging.info(f'结束:{test_type} test ')
elif test_type == '4':
logging.info(f'开始:test_type {test_type}: all')
extra_test()
logging.info(f'结束:{test_type} test ')
else:
print(f'输入错误, 请重新输入')
logging.error(f'输入错误, 请重新输入')

View File

@@ -0,0 +1,19 @@
#!/usr/bin/bash
# Copyright (c) 2022 Huawei Technologies Co., Ltd.
# A-Tune is licensed under the Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
# PURPOSE.
# See the Mulan PSL v2 for more details.
# #############################################
# @Author : westtide
# @Contact : tocokeo@outlook.com
# @Date : 2023/9/22
# @License : Mulan PSL v2
# @Desc : __init__.py
# #############################################

View File

@@ -0,0 +1,152 @@
#!/usr/bin/bash
# Copyright (c) 2022 Huawei Technologies Co., Ltd.
# A-Tune is licensed under the Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
# PURPOSE.
# See the Mulan PSL v2 for more details.
# #############################################
# @Author : westtide
# @Contact : tocokeo@outlook.com
# @Date : 2023/10/7
# @License : Mulan PSL v2
# @Desc : test for load_check
# #############################################
# run 'pytest -p no:logging ./tests/test_load_check.py' under 'multisystem_performance'
import sys
import logging
from unittest import mock
import paramiko
import pytest
from tools.multisystem_performance.src.load_check import connect_test, host_test_body, communication_test_body
sys.path.append("..")
logging.info('开始: test_load_check.py')
class FakeSSHClient:
"""
创建一个虚拟的 paramiko.SSHClient 类
"""
def set_missing_host_key_policy(self, policy):
pass
def connect(self, hostname, port, username, password):
pass
def close(self):
pass
@pytest.fixture
def fake_ssh_client():
return FakeSSHClient()
def test_connect_test_successful(fake_ssh_client):
"""
测试 connect_test 函数的成功情况
fake_ssh_client: 虚拟的 paramiko.SSHClient 类
"""
logging.info(f'开始:test_connect_test_successful')
pc1 = {'ip': '192.168.1.1', 'user': 'user1'}
pc2 = {'ip': '192.168.1.2', 'user': 'user2'}
# 使用 unittest.mock.patch 来模拟 getpass.getpass 函数返回密码
with mock.patch('getpass.getpass', return_value='password'):
with mock.patch('paramiko.SSHClient', return_value=fake_ssh_client):
with mock.patch.object(fake_ssh_client, 'connect'):
result = connect_test(pc1, pc2)
logging.info(f'result = {result}')
assert result == 1
def test_connect_test_failure(fake_ssh_client):
"""
测试 connect_test 函数的认证失败情况
fake_ssh_client: 虚拟的 paramiko.SSHClient 类
"""
logging.info(f'开始:test_connect_test_failure')
pc1 = {'ip': '192.168.1.1', 'user': 'user1'}
pc2 = {'ip': '192.168.1.2', 'user': 'user2'}
# 使用 unittest.mock.patch 来模拟 getpass.getpass 函数返回密码
with mock.patch('getpass.getpass', return_value='wrong_password'):
with mock.patch('paramiko.SSHClient', return_value=fake_ssh_client):
with mock.patch.object(fake_ssh_client, 'connect', side_effect=paramiko.AuthenticationException):
result = connect_test(pc1, pc2)
logging.info(f'result = {result}')
assert result == 1
@pytest.fixture
def mock_input(monkeypatch):
"""
使用 monkeypatch 来模拟 input 函数
"""
input_values = []
def mock_input_generator(prompt):
if input_values:
return input_values.pop(0)
else:
raise ValueError("Not enough input values provided")
monkeypatch.setattr('builtins.input', mock_input_generator)
def test_host_test_body_missing_config():
"""
针对 host_test 读取配置文件时缺少配置的情况进行测试
"""
# 空配置的情况
data = {}
# 使用 pytest.raises 检查是否引发了预期的异常
with pytest.raises(KeyError, match="host_test"):
host_test_body(data)
# 包含 host_test 但没有 local的配置
data = {"host_test": {"pc1": {}, "pc2": {}}}
with pytest.raises(KeyError, match="local"):
host_test_body(data)
# 包含 host_test 和 local 但没有 ip 的配置
data = {"host_test": {"local": {"user": "testuser"}}}
with pytest.raises(KeyError, match="ip"):
host_test_body(data)
# 包含 host_test 和 local 但没有 pc1 的配置
data = {"host_test": {"local": {"ip": "127.0.0.1", "user": "testuser"}}}
with pytest.raises(KeyError, match="pc1"):
host_test_body(data)
def test_communication_test_body_missing_config():
"""
针对 communication_test 读取配置文件时缺少配置的情况进行测试
"""
# 空配置的情况
data = {}
with pytest.raises(KeyError, match='communication_test'):
communication_test_body(data)
# 包含 communication_test 但没有local的配置
data = {"communication_test": {"remote": {"ip": "", "user": "remoteuser"}}}
with pytest.raises(KeyError, match='local'):
communication_test_body(data)
# 包含 communication_test 和 local 但没有 ip 的配置
data = {"communication_test": {"local": {"user": "localuser"}, "remote": {"ip": "", "user": "remoteuser"}}}
with pytest.raises(KeyError, match='ip'):
communication_test_body(data)
# 包含 communication_test 和 local 但没有 remote 的配置
data = {"communication_test": {"local": {"ip": "192.168.1.1", "user": "localuser"}, }}
with pytest.raises(KeyError, match='remote'):
communication_test_body(data)

View File

@@ -23,9 +23,7 @@ import sys
import logging
import builtins
import pytest
sys.path.append("..")
from tools.multisystem_performance.src.process_parameters import process_parameters, process_differ
@@ -61,4 +59,4 @@ def test_process_differ_file_processing(sample_differ_file_content, monkeypatch)
# 检查结果
assert file2_lack == ["Line 1", "Line 2", "Line 8"]
assert file2_modify == ["Line 6", "Line 9"]
assert file2_new == ["Line 4", "Line 7"]
assert file2_new == ["Line 4", "Line 7"]

View File

@@ -1,4 +1,3 @@
# Install dependencies
# Install dependence
UnixBench: https://github.com/kdlucas/byte-unixbench
Stream: https://github.com/jeffhammond/STREAM?tab=readme-ov-file
pktgen:
netperf: install by 'yum install netperf' or git