add callback testcase

This commit is contained in:
zyw_hw
2025-12-02 10:00:55 +08:00
parent 656b143f71
commit 05f92de6f9
9 changed files with 5071 additions and 7 deletions

View File

@@ -0,0 +1,303 @@
# Copyright 2025 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Test callback.py using pytest framework."""
from unittest.mock import Mock, patch
import numpy as np
import pytest
from mindformers.core.callback.callback import ColdHotExpertMonitor
# pylint: disable=unused-argument # for mock logic
class TestColdHotExpertMonitorExtended:
"""Extended tests for ColdHotExpertMonitor"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('os.getenv')
def test_get_attribute_by_path(self, mock_getenv):
"""Test get_attribute_by_path method"""
def getenv_side_effect(x, default=None):
if x == "RANK_ID":
return "0"
if x == "RANK_SIZE":
return "8"
return default
mock_getenv.side_effect = getenv_side_effect
moe_config = Mock()
moe_config.update_step = 10
moe_config.expert_num = 8
moe_config.hot_expert_num = 1
moe_config.moe_module_name = "model.layers"
monitor = ColdHotExpertMonitor(
moe_config=moe_config,
hidden_size=128,
ffn_hidden_size=512,
expert_parallel=1,
model_parallel=1,
save_checkpoint_steps=100
)
# Create mock object with nested attributes
obj = Mock()
obj.model.layers = [Mock(), Mock()]
result = monitor.get_attribute_by_path(obj, "model.layers")
assert len(result) == 2
class TestColdHotExpertMonitorBasic:
"""Test ColdHotExpertMonitor basic functionality"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('os.getenv')
def test_init_basic(self, mock_getenv):
"""Test ColdHotExpertMonitor initialization"""
def getenv_side_effect(x, default=None):
if x == "RANK_ID":
return "0"
if x == "RANK_SIZE":
return "8"
return default
mock_getenv.side_effect = getenv_side_effect
moe_config = Mock()
moe_config.update_step = 10
moe_config.expert_num = 8
moe_config.hot_expert_num = 2
moe_config.moe_module_name = "model.layers"
monitor = ColdHotExpertMonitor(
moe_config=moe_config,
hidden_size=128,
ffn_hidden_size=512,
expert_parallel=2,
model_parallel=2,
save_checkpoint_steps=100
)
assert monitor.update_step == 10
assert monitor.expert_num == 8
assert monitor.hot_expert_num == 2
assert monitor.local_expert_num == 4 # 8 / 2
class TestColdHotExpertMonitorStepEnd:
"""Test ColdHotExpertMonitor.on_train_step_end and expert switching"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.time.time')
def test_on_train_step_end_switch_experts(self, mock_time, mock_getenv, mock_get_rank):
"""Test on_train_step_end triggers expert switching"""
def getenv_side_effect(key, default=None):
if key == "RANK_ID":
return "0"
if key == "RANK_SIZE":
return "8"
return default
mock_getenv.side_effect = getenv_side_effect
# Mock time.time() to return incrementing values
time_counter = [100.0]
def time_side_effect():
result = time_counter[0]
time_counter[0] += 1.0
return result
mock_time.side_effect = time_side_effect
# Use Mock object instead of dict to support attribute access
moe_config = Mock()
moe_config.expert_num = 8
moe_config.hot_expert_num = 1
moe_config.moe_module_name = 'network.blocks'
moe_config.update_step = 10
monitor = ColdHotExpertMonitor(
moe_config=moe_config,
hidden_size=4096,
ffn_hidden_size=16384,
expert_parallel=1,
model_parallel=1,
save_checkpoint_steps=10
)
# Mock the train_network and blocks
run_context = Mock()
cb_params = Mock()
cb_params.cur_step_num = 10
cb_params.train_network = Mock()
# Mock blocks structure
mock_block = Mock()
mock_block.output.hot_expert_index.value.return_value = [np.array([0])]
monitor.get_attribute_by_path = Mock(return_value=[mock_block])
monitor.return_back_hot_expert = Mock()
monitor.switch_hot_expert = Mock()
run_context.original_args.return_value = cb_params
monitor.on_train_step_end(run_context)
monitor.switch_hot_expert.assert_called()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
def test_return_back_hot_expert_single(self, mock_getenv, mock_get_rank):
"""Test return_back_hot_expert with single hot expert"""
def getenv_side_effect(key, default=None):
if key == "RANK_ID":
return "0"
if key == "RANK_SIZE":
return "8"
return default
mock_getenv.side_effect = getenv_side_effect
# Use Mock object instead of dict to support attribute access
moe_config = Mock()
moe_config.expert_num = 8
moe_config.hot_expert_num = 1
moe_config.moe_module_name = 'network.blocks'
moe_config.update_step = 10
monitor = ColdHotExpertMonitor(
moe_config=moe_config,
hidden_size=4096,
ffn_hidden_size=16384,
expert_parallel=1,
model_parallel=1,
save_checkpoint_steps=10
)
# Mock block with hot expert - need to support subscript access
mock_block = Mock()
# old_hot_expert_index[0] needs to be subscriptable
# value()[0] should return an array-like object that supports indexing
mock_hot_expert_index = np.array([0]) # Use numpy array for proper indexing support
mock_block.output.hot_expert_index.value.return_value = [mock_hot_expert_index]
# Create mock arrays that support subscript assignment
# For weight arrays - simple list
mock_weight_array = [Mock() for _ in range(8)]
# For bias arrays - need nested structure that supports bias[0][ffn_index][0] = value
# Create a list of lists where each inner list contains Mock objects
mock_bias_inner = [[Mock()] for _ in range(8)]
mock_bias_array = [mock_bias_inner]
mock_block.output.ffn.mapping.weight = mock_weight_array
mock_block.output.ffn.mapping.bias = mock_bias_array
mock_block.output.ffn.projection.weight = [Mock() for _ in range(8)]
mock_block.output.ffn.projection.bias = [[[Mock()] for _ in range(8)]]
mock_block.output.mlp.mapping.weight = Mock()
mock_block.output.mlp.mapping.bias = Mock()
mock_block.output.mlp.projection.weight = Mock()
mock_block.output.mlp.projection.bias = Mock()
# Should not raise error
monitor.return_back_hot_expert(mock_block)
class TestColdHotExpertMonitorSwitchExpert:
"""Test ColdHotExpertMonitor.switch_hot_expert method"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
def test_switch_hot_expert_single_no_change(self, mock_getenv, mock_get_rank):
"""Test switch_hot_expert when expert doesn't change"""
def getenv_side_effect(key, default=None):
if key == "RANK_ID":
return "0"
if key == "RANK_SIZE":
return "8"
return default
mock_getenv.side_effect = getenv_side_effect
moe_config = Mock()
moe_config.expert_num = 8
moe_config.hot_expert_num = 1
moe_config.moe_module_name = 'network.blocks'
moe_config.update_step = 10
monitor = ColdHotExpertMonitor(
moe_config=moe_config,
hidden_size=4096,
ffn_hidden_size=16384,
expert_parallel=1,
model_parallel=1,
save_checkpoint_steps=10
)
# Mock block where old and new expert are the same
mock_block = Mock()
# Old expert index - should be array-like supporting indexing
# value()[0] should return an array, and then [0] accesses first element
old_expert_array = np.array([0]) # Array that supports [0] indexing
mock_block.output.hot_expert_index.value.return_value = [old_expert_array]
# New expert index (same as old)
# cumsum_value.value() returns a tensor
mock_cumsum = Mock()
# Create a mock tensor that supports slicing and indexing
# new_expert_index[0:1] should return np.array([0])
# new_expert_index[1:8] should return an array
def mock_getitem(self, key):
# Need to accept self parameter since this is bound as a method
if isinstance(key, slice):
# Handle slicing
if key.start == 0 and key.stop == 1: # hot_expert_num = 1
return np.array([0])
return np.array(list(range(key.start or 0, key.stop or 8)))
# Handle single index
return 0
mock_expert_indices = Mock()
mock_expert_indices.__getitem__ = mock_getitem
mock_cumsum.topk.return_value = (Mock(), mock_expert_indices)
mock_block.output.router.router.cumsum_value.value.return_value = mock_cumsum
# Should return early without switching (since old and new expert are the same)
monitor.switch_hot_expert(mock_block, cur_step_num=2)
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1,96 @@
# Copyright 2025 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Test callback.py using pytest framework."""
from unittest.mock import Mock, patch
import numpy as np
import pytest
from mindformers.core.callback.callback import ExpertMigrateCallback
# pylint: disable=unused-argument # for mock logic
class TestExpertMigrateCallback:
"""Test ExpertMigrateCallback"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
def test_init(self, mock_rank):
"""Test ExpertMigrateCallback initialization and on_train_step_end."""
config = Mock()
config.pipeline_model_parallel_size = 1
config.data_parallel_size = 1
config.tensor_model_parallel_size = 1
config.expert_model_parallel_size = 1
config.num_layers = 2
config.mtp_num_layers = 0
config.num_moe_experts = 8
callback = ExpertMigrateCallback(config=config, print_expert_load=True)
run_context = Mock()
cb_params = Mock()
real_network = Mock()
# Need to ensure loop terminates: while hasattr(network, 'network')
# We can just not give it a 'network' attribute
del real_network.network
layer = Mock()
layer.pipeline_stage = 0
layer.mlp.experts.num_tokens_per_expert = Mock()
layer.mlp.num_local_experts = 8
layer.mlp.expert_load_history.asnumpy.return_value = np.zeros(8)
real_network.model.decoder.layers = [layer, layer]
cb_params.train_network = real_network
cb_params.optimizer = Mock()
run_context.original_args.return_value = cb_params
ctx_patch = 'mindformers.core.callback.callback.get_auto_parallel_context'
with patch(ctx_patch, return_value="stand_alone"):
callback.on_train_step_end(run_context)
layer.mlp.update_expert_load_history.assert_called()
class TestExpertMigrateCallbackExtended:
"""Extended tests for ExpertMigrateCallback"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
def test_expert_migrate_with_mtp_layers(self, mock_rank):
"""Test ExpertMigrateCallback with MTP layers"""
config = Mock()
config.pipeline_model_parallel_size = 1
config.data_parallel_size = 1
config.tensor_model_parallel_size = 1
config.expert_model_parallel_size = 1
config.num_layers = 2
config.mtp_num_layers = 1
config.num_moe_experts = 4
callback = ExpertMigrateCallback(config=config, print_expert_load=False)
assert callback.mtp_num_layers == 1
assert callback.num_layers == 2
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1,422 @@
# Copyright 2025 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Test callback.py using pytest framework."""
import inspect
from unittest.mock import Mock, patch
import numpy as np
import pytest
from mindformers.core.callback.callback import (
AllReduceNet,
_check_mspti_is_on,
_get_loss_output,
_get_max_eigenvalue,
_get_optimizer_state,
_get_separate_loss,
_get_stable_rank,
_get_weight_norm,
_log_grouped_lr_info,
get_embedding_info
)
# pylint: disable=unused-argument # for mock logic
class TestHelperFunctions:
"""Test helper functions in callback.py"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_get_loss_output(self):
"""Test _get_loss_output function."""
# Test case 1: Simple scalar output (not Tensor)
output = 0.5
loss, overflow, scaling_sens, _, _ = _get_loss_output(output)
assert loss == 0.5
assert not overflow
assert not scaling_sens
# Test case 2: Tuple with 3 elements
output = (0.5, False, 1024.0)
loss, overflow, scaling_sens, _, _ = _get_loss_output(output)
assert loss == 0.5
assert not overflow
assert scaling_sens == 1024.0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.F')
def test_get_weight_norm(self, mock_f):
"""Test _get_weight_norm function."""
network = Mock()
param = Mock()
param.to.return_value.norm.return_value = 1.0
network.trainable_params.return_value = [param, param]
# Mock F.stack
mock_f.stack.return_value.norm.return_value.item.return_value = 1.414
norm = _get_weight_norm(network)
assert norm == pytest.approx(1.414)
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_get_optimizer_state(self):
"""Test _get_optimizer_state function"""
param1 = Mock()
param1.name = "p1"
param1.to.return_value.norm.return_value.item.return_value = 0.1
param2 = Mock()
param2.name = "p2"
param2.to.return_value.norm.return_value.item.return_value = 0.2
optim_params = [param1, param2]
norms = _get_optimizer_state(optim_params)
assert norms['p1'] == 0.1
assert norms['p2'] == 0.2
class TestAllReduceNet:
"""Test AllReduceNet class"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_init_and_construct(self):
"""Test AllReduceNet initialization"""
# Mock P.AllReduce which is used in AllReduceNet.__init__
mock_allreduce_class = Mock()
mock_allreduce_instance = Mock()
mock_allreduce_class.return_value = mock_allreduce_instance
with patch('mindformers.core.callback.callback.P.AllReduce', mock_allreduce_class):
net = AllReduceNet('test_group')
mock_allreduce_class.assert_called_once()
# Test construct method
mock_tensor = Mock()
mock_allreduce_instance.return_value = mock_tensor
result = net.construct(mock_tensor)
assert result == mock_tensor
class TestCheckMsptiIsOn:
"""Test _check_mspti_is_on function"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('os.getenv')
def test_mspti_enabled(self, mock_getenv):
"""Test when libmspti.so is in LD_PRELOAD"""
mock_getenv.return_value = "/path/to/libmspti.so"
result = _check_mspti_is_on()
assert result
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('os.getenv')
def test_mspti_disabled(self, mock_getenv):
"""Test when libmspti.so is not in LD_PRELOAD"""
mock_getenv.return_value = "/path/to/other.so"
result = _check_mspti_is_on()
assert not result
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('os.getenv')
def test_mspti_no_ld_preload(self, mock_getenv):
"""Test when LD_PRELOAD is not set"""
mock_getenv.return_value = None
result = _check_mspti_is_on()
assert not result
class TestGetSeparateLoss:
"""Test _get_separate_loss function"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.parameter_register')
def test_get_separate_loss(self, mock_param_register):
"""Test _get_separate_loss retrieves and clears losses"""
# Mock parameter values
mock_aux_loss = Mock()
mock_aux_loss.asnumpy.return_value = np.array([0.1])
mock_mtp_loss = Mock()
mock_mtp_loss.asnumpy.return_value = np.array([0.2])
mock_lm_loss = Mock()
mock_lm_loss.asnumpy.return_value = np.array([0.3])
mock_param_register.get.side_effect = lambda x, default=None: {
'aux_loss': mock_aux_loss,
'mtp_loss': mock_mtp_loss,
'lm_loss': mock_lm_loss
}.get(x, default)
lm_loss, aux_loss, mtp_loss = _get_separate_loss()
assert lm_loss[0] == 0.3
assert aux_loss[0] == 0.1
assert mtp_loss[0] == 0.2
# Verify clear was called
assert mock_param_register.clear.call_count == 3
class TestLogGroupedLrInfo:
"""Test _log_grouped_lr_info function"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_log_grouped_lr_info_basic(self):
"""Test _log_grouped_lr_info basic functionality"""
# This test verifies the function can be called without errors
# when GROUPED_PARAMS is empty (default state in our mocks)
# Should return early without error when GROUPED_PARAMS is empty
# If this raises an exception, pytest will fail the test
_log_grouped_lr_info()
class TestGetLossOutputExtended:
"""Extended tests for _get_loss_output function"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_get_loss_output_tuple_4(self):
"""Test _get_loss_output with 4-element tuple"""
output = (0.5, False, 1024.0, 0.001)
loss, overflow, scaling_sens, learning_rate, global_norm = _get_loss_output(output)
assert loss == 0.5
assert not overflow
assert scaling_sens == 1024.0
assert learning_rate == 0.001
assert global_norm is None
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_get_loss_output_tuple_7(self):
"""Test _get_loss_output with 7-element tuple"""
output = (0.5, False, 1024.0, 0.001, 2.5, np.array([1.0, 2.0]), 2)
loss, overflow, scaling_sens, learning_rate, global_norm = _get_loss_output(output)
assert loss == 0.5
assert not overflow
assert scaling_sens == 1024.0
assert learning_rate == 0.001
assert global_norm == 2.5
class TestGetMaxEigenvalue:
"""Test _get_max_eigenvalue function"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_get_max_eigenvalue_basic(self):
"""Test _get_max_eigenvalue function - simplified test"""
# This function is complex and involves many MindSpore operations
# We'll just verify it exists and has the correct signature
# Verify the function exists
assert callable(_get_max_eigenvalue)
# Verify the function signature
sig = inspect.signature(_get_max_eigenvalue)
params = list(sig.parameters.keys())
assert 'input_tensor' in params
assert 'num_iter' in params
# Note: Full functional testing of this method would require actual MindSpore tensors
# which is beyond the scope of unit testing with mocks
class TestGetStableRankExtended:
"""Extended tests for _get_stable_rank function"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.ms.ops.square')
@patch('mindformers.core.callback.callback.ms.ops.norm')
@patch('mindformers.core.callback.callback._get_max_eigenvalue')
def test_get_stable_rank_zero_eigenvalue(self, mock_eigenvalue, mock_norm, mock_square):
"""Test _get_stable_rank when eigenvalue is zero"""
# Create a more complete mock weight object
weight = Mock()
weight.name = "test_weight"
weight.ndim = 2 # 添加 ndim 属性,避免 -ndim 操作
weight.shape = [3, 3] # 添加 shape 属性
mock_eigenvalue.return_value = np.array(0.0)
stable_rank, eig = _get_stable_rank(weight, num_iter=5)
assert stable_rank == 0.0
assert eig == 0.0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.ms.ops.square')
@patch('mindformers.core.callback.callback.ms.ops.norm')
@patch('mindformers.core.callback.callback._get_max_eigenvalue')
def test_get_stable_rank_normal(self, mock_eigenvalue, mock_norm, mock_square):
"""Test _get_stable_rank with normal values"""
# Create a more complete mock weight object
weight = Mock()
weight.name = "test_weight"
weight.ndim = 2 # 添加 ndim 属性,避免 -ndim 操作
weight.shape = [3, 3] # 添加 shape 属性
mock_eigenvalue.return_value = np.array(2.0)
# Mock norm to return a Mock that can be squared
mock_norm_tensor = Mock()
mock_norm_tensor.ndim = 0 # 标量
mock_norm.return_value = mock_norm_tensor
# Mock square to return a Mock that has asnumpy method returning 16.0
mock_square_result = Mock()
mock_square_result.asnumpy.return_value = 16.0
mock_square.return_value = mock_square_result
stable_rank, eig = _get_stable_rank(weight, num_iter=5)
# stable_rank = f_norm^2 / eig = 16.0 / 2.0 = 8.0
assert stable_rank == 8.0
assert eig == 2.0
class TestGetEmbeddingInfo:
"""Test get_embedding_info function"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_group_size', return_value=8)
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindspore.context.get_auto_parallel_context', return_value=2)
def test_get_embedding_info(self, *mocks):
"""Test get_embedding_info extracts embedding local norm"""
cb_params = Mock()
cb_params.net_outputs = [0.5, False, 1024.0, 0.001, 2.5,
[1.0, 2.0, 3.0], [128, 256, 128]]
embedding_size = 128
result = get_embedding_info(cb_params, embedding_size)
# Should return the first local_norm with matching size
assert result == 1.0
class TestGetMaxEigenvalueComprehensive:
"""Comprehensive tests for _get_max_eigenvalue function"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.ms.ops.matmul')
@patch('mindformers.core.callback.callback.ms.ops.unsqueeze')
@patch('mindformers.core.callback.callback.ms.ops.randn')
@patch('mindformers.core.callback.callback.logger')
def test_get_max_eigenvalue_2d_tensor(self, mock_logger, mock_randn,
mock_unsqueeze, mock_matmul):
"""Test _get_max_eigenvalue with 2D tensor"""
# Create mock input tensor
input_tensor = Mock()
input_tensor.ndim = 2
input_tensor.shape = [3, 3]
input_tensor.astype.return_value = input_tensor
input_tensor.transpose.return_value = input_tensor
# Mock randn to return a tensor with positive norm
mock_u_tensor = Mock()
mock_u_norm = Mock()
mock_u_norm.asnumpy.return_value = 1.0
mock_u_tensor.norm.return_value = mock_u_norm
mock_u_tensor.__truediv__ = Mock(return_value=mock_u_tensor)
mock_randn.return_value = mock_u_tensor
# Mock unsqueeze
mock_unsqueeze.return_value = mock_u_tensor
# Mock matmul operations
mock_input_seq = Mock()
mock_unsqueeze.return_value = mock_input_seq
mock_v_tensor = Mock()
mock_v_norm = Mock()
mock_v_norm.asnumpy.return_value = 1.0
# Mock (v_norm != 0).all() - need to return a tensor-like object with .all() method
mock_comparison_result = Mock()
mock_comparison_result.all.return_value = True
mock_v_norm.__ne__ = Mock(return_value=mock_comparison_result)
mock_v_tensor.norm.return_value = mock_v_norm
mock_v_tensor.transpose.return_value = mock_v_tensor
mock_v_tensor.__truediv__ = Mock(return_value=mock_v_tensor)
mock_eigenvalue = Mock()
mock_eigenvalue.asnumpy.return_value = 2.5
mock_eigenvalue.squeeze.return_value = mock_eigenvalue
# matmul is called:
# 1. Once for input_seq (line 211)
# 2. num_iter times for v_tensor (line 216)
# 3. num_iter times for eigenvalue (line 217)
# Total: 1 + 2 + 2 = 5 times for num_iter=2
mock_matmul.side_effect = [
mock_input_seq, # Line 211: input_seq calculation
mock_v_tensor, # Line 216: iteration 1, v_tensor
mock_eigenvalue, # Line 217: iteration 1, eigenvalue
mock_v_tensor, # Line 216: iteration 2, v_tensor
mock_eigenvalue # Line 217: iteration 2, eigenvalue
]
result = _get_max_eigenvalue(input_tensor, num_iter=2)
assert result == 2.5
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.ms.ops.randn')
@patch('mindformers.core.callback.callback.logger')
def test_get_max_eigenvalue_zero_norm(self, mock_logger, mock_randn):
"""Test _get_max_eigenvalue when random vector has zero norm"""
input_tensor = Mock()
input_tensor.ndim = 2
input_tensor.shape = [3, 3]
input_tensor.astype.return_value = input_tensor
# Mock randn to always return zero norm
mock_u_tensor = Mock()
mock_u_norm = Mock()
mock_u_norm.asnumpy.return_value = 0.0
mock_u_tensor.norm.return_value = mock_u_norm
mock_randn.return_value = mock_u_tensor
result = _get_max_eigenvalue(input_tensor, num_iter=2)
assert result == 0.0
mock_logger.warning.assert_called()
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -13,6 +13,7 @@
# limitations under the License.
# ============================================================================
"""Test test_mfloss_monitor.py"""
import builtins
from unittest.mock import Mock, patch
import unittest
import numpy as np
@@ -21,6 +22,9 @@ import pytest
from mindspore import Tensor
from mindformers.core.callback.callback import MFLossMonitor
# pylint: disable=protected-access
# pylint: disable=unused-argument # for mock logic
class TestMFLossMonitor(unittest.TestCase):
"""Test MFLossMonitor class"""
@@ -99,7 +103,6 @@ class TestMFLossMonitor(unittest.TestCase):
mock_get_context.return_value = 1 # pipeline_stages = 1
original_loss = 1.0
# pylint: disable=W0212
fixed_loss = self.monitor._fix_loss_for_parallel(original_loss)
self.assertEqual(fixed_loss, original_loss)
@@ -116,7 +119,6 @@ class TestMFLossMonitor(unittest.TestCase):
mock_get_context.return_value = 2 # pipeline_stages = 2
original_loss = 2.0
# pylint: disable=W0212
fixed_loss = self.monitor._fix_loss_for_parallel(original_loss)
# Should divide by micro_size
@@ -135,7 +137,6 @@ class TestMFLossMonitor(unittest.TestCase):
with patch('mindspore.get_context') as mock_get_context:
mock_get_context.return_value = ms.GRAPH_MODE
# pylint: disable=W0212
result = self.monitor._can_calculate_model_flops(mock_cb_params)
self.assertTrue(result)
@@ -149,7 +150,6 @@ class TestMFLossMonitor(unittest.TestCase):
mock_cb_params = Mock()
mock_cb_params.mode = 'invalid'
# pylint: disable=W0212
result = self.monitor._can_calculate_model_flops(mock_cb_params)
self.assertFalse(result)
@@ -203,5 +203,829 @@ class TestMFLossMonitorIntegration(unittest.TestCase):
self.assertEqual(monitor.loss_list[0], 0.5)
class TestMFLossMonitorBasic:
"""Test MFLossMonitor basic functionality"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
def test_init_defaults(self, *mocks):
"""Test MFLossMonitor default initialization"""
monitor = MFLossMonitor(per_print_times=1, global_batch_size=32, dataset_size=100)
assert monitor.per_print_times == 1
assert monitor.global_batch_size == 32
assert monitor.steps_per_epoch == 100
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
def test_init_custom_values(self, *mocks):
"""Test MFLossMonitor custom initialization"""
monitor = MFLossMonitor(
learning_rate=0.01,
per_print_times=10,
micro_batch_num=2,
micro_batch_interleave_num=2,
gradient_accumulation_steps=4
)
assert monitor.per_print_times == 10
assert monitor.mirco_size == 2
assert monitor.micro_batch_interleave_num == 2
assert monitor.gradient_accumulation_steps == 4
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('time.time')
def test_on_train_epoch_begin(self, mock_time, *mocks):
"""Test on_train_epoch_begin callback"""
mock_time.return_value = 1000.0
monitor = MFLossMonitor()
mock_run_context = Mock()
monitor.on_train_epoch_begin(mock_run_context)
assert monitor.loss_list == []
assert monitor.epoch_time == 1000.0
assert monitor.run_context == mock_run_context
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('time.time')
def test_on_train_step_begin(self, mock_time, *mocks):
"""Test on_train_step_begin callback"""
mock_time.return_value = 1000.0
monitor = MFLossMonitor()
mock_run_context = Mock()
monitor.on_train_step_begin(mock_run_context)
assert monitor.step_time == 1000.0
assert monitor.run_context == mock_run_context
class TestMFLossMonitorExtended:
"""Extended tests for MFLossMonitor"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
def test_fix_loss_for_parallel_pipeline(self, *mocks):
"""Test _fix_loss_for_parallel with pipeline stages"""
monitor = MFLossMonitor(
micro_batch_num=2,
gradient_accumulation_steps=2,
calculate_per_token_loss=False
)
# Mock both context.get_auto_parallel_context and get_auto_parallel_context
with patch('mindspore.context.get_auto_parallel_context', return_value=2), \
patch('mindspore.get_auto_parallel_context', return_value='not_zero_bubble_v'):
loss = 8.0
fixed_loss = monitor._fix_loss_for_parallel(loss, print_warning=False)
# When pipeline_stages=2: loss = 8.0 / mirco_size(2) = 4.0
# When gradient_accumulation_steps=2: loss = 4.0 / 2 = 2.0
assert fixed_loss == 2.0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
def test_fix_loss_for_parallel_gradient_accumulation(self, *mocks):
"""Test _fix_loss_for_parallel with gradient accumulation only"""
monitor = MFLossMonitor(
micro_batch_num=1, # No pipeline division
gradient_accumulation_steps=2,
calculate_per_token_loss=False
)
# Mock pipeline_stages=1 (no pipeline)
with patch('mindspore.context.get_auto_parallel_context', return_value=1), \
patch('mindspore.get_auto_parallel_context', return_value='not_zero_bubble_v'):
loss = 8.0
fixed_loss = monitor._fix_loss_for_parallel(loss, print_warning=False)
# When pipeline_stages=1: no division by mirco_size
# When gradient_accumulation_steps=2: loss = 8.0 / 2 = 4.0
assert fixed_loss == 4.0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
def test_fix_loss_for_parallel_no_pipeline(self, *mocks):
"""Test _fix_loss_for_parallel without pipeline stages"""
monitor = MFLossMonitor(
micro_batch_num=2,
gradient_accumulation_steps=2,
calculate_per_token_loss=False
)
with patch('mindspore.context.get_auto_parallel_context', return_value=1), \
patch('mindspore.get_auto_parallel_context', return_value='data_parallel'):
loss = 8.0
fixed_loss = monitor._fix_loss_for_parallel(loss)
# When pipeline_stages=1: no division by mirco_size
# When gradient_accumulation_steps=2: loss = 8.0 / 2 = 4.0
assert fixed_loss == 4.0
@patch('time.time', return_value=1000.0)
@patch('mindformers.core.callback.callback.get_tensorboard_args',
return_value={'log_loss_scale_to_tensorboard': True})
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=Mock())
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
def test_print_output_info_with_tensorboard(self, *mocks):
"""Test print_output_info with tensorboard enabled"""
monitor = MFLossMonitor(learning_rate=0.001, global_batch_size=32)
monitor.tensor_writer = Mock()
cb_params = Mock()
cb_params.dataset_sink_mode = False
cb_params.optimizer = Mock()
cb_params.optimizer.global_step = 10
cb_params.train_network = Mock()
cb_params.train_network.phase = 'train'
cb_params.train_network.set_train = Mock()
monitor.print_output_info(
cb_params, 1, 10, 100.0, 1, 100, 0.5, 100.0,
False, 1024.0, 3600, 10.0, 2.5, None, None, None
)
# Verify tensorboard writer was called
assert monitor.tensor_writer.add_scalar.call_count > 0
class TestMFLossMonitorOnTrainStepEnd:
"""Test MFLossMonitor.on_train_step_end comprehensive coverage"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.get_auto_parallel_context')
@patch('mindformers.core.callback.callback.set_auto_parallel_context')
@patch('time.time')
def test_on_train_step_end_with_separate_loss(
self, mock_time, mock_set_context, mock_get_context, *mocks):
"""Test on_train_step_end with print_separate_loss enabled"""
mock_time.return_value = 1000.0
def get_context_side_effect(x, *args):
return {
'parallel_mode': 'stand_alone',
'full_batch': False
}.get(x, None)
mock_get_context.side_effect = get_context_side_effect
monitor = MFLossMonitor(
origin_epochs=10,
dataset_size=100,
global_batch_size=32,
print_separate_loss=True,
is_moe_model=True
)
monitor.step_time = 999.0
run_context = Mock()
cb_params = Mock()
cb_params.cur_step_num = 1
cb_params.batch_num = 100
cb_params.cur_epoch_num = 1
cb_params.dataset_sink_mode = False
cb_params.net_outputs = (0.5, False, 1024.0, 0.001, 2.5)
cb_params.get.return_value = None
run_context.original_args.return_value = cb_params
separate_loss_mock = (np.array([0.3]), np.array([0.1]), np.array([0.1]))
loss_patch = 'mindformers.core.callback.callback._get_separate_loss'
with patch(loss_patch, return_value=separate_loss_mock):
monitor.on_train_step_end(run_context)
assert len(monitor.loss_list) == 1
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=8)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.get_auto_parallel_context')
@patch('mindformers.core.callback.callback.set_auto_parallel_context')
@patch('time.time')
@patch('mindformers.core.callback.callback.check_arf_status', return_value=True)
def test_on_train_step_end_with_arf_status(
self, mock_arf, mock_time, mock_set_context, mock_get_context, *mocks):
"""Test on_train_step_end with ARF status check"""
mock_time.return_value = 1000.0
def get_context_side_effect(x, *args):
return {
'parallel_mode': 'stand_alone',
'full_batch': False
}.get(x, None)
mock_get_context.side_effect = get_context_side_effect
monitor = MFLossMonitor(
origin_epochs=10,
dataset_size=100,
global_batch_size=32
)
monitor.step_time = 999.0
monitor.mf_support = True
monitor.mf_calculated = False
run_context = Mock()
cb_params = Mock()
cb_params.cur_step_num = 1
cb_params.batch_num = 100
cb_params.cur_epoch_num = 1
cb_params.dataset_sink_mode = False
cb_params.net_outputs = 0.5
cb_params.get.return_value = None
cb_params.mode = 'train'
cb_params.train_network = Mock()
cb_params.train_network.current_phase = 'train_phase'
run_context.original_args.return_value = cb_params
with patch.object(monitor, '_calculate_model_flops'):
monitor.on_train_step_end(run_context)
class TestMFLossMonitorCalculateFlops:
"""Test MFLossMonitor._calculate_model_flops"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=8)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.flops_collection')
@patch('mindformers.core.callback.callback.auto_parallel_context')
@patch('mindformers.core.callback.callback.get_group_size', return_value=8)
def test_calculate_model_flops_standalone(
self, mock_group_size, mock_auto_context, mock_flops, *mocks):
"""Test _calculate_model_flops in standalone mode"""
monitor = MFLossMonitor()
monitor.current_phase = 'train_phase'
mock_flops.return_value = (1000000.0, 0, 500000.0, 0, False)
mock_auto_context.return_value.get_pipeline_stages.return_value = 1
mock_auto_context.return_value.get_parallel_mode.return_value = 'stand_alone'
monitor._calculate_model_flops()
assert monitor.mf_calculated
assert monitor.full_model_flops == 1000000.0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=8)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.flops_collection')
def test_calculate_model_flops_runtime_error(self, mock_flops, *mocks):
"""Test _calculate_model_flops with RuntimeError"""
monitor = MFLossMonitor()
monitor.current_phase = 'train_phase'
monitor.mf_support = True
mock_flops.side_effect = RuntimeError("Flops calculation failed")
monitor._calculate_model_flops()
assert not monitor.mf_support
class TestMFLossMonitorPrintOutputInfo:
"""Test MFLossMonitor.print_output_info comprehensive coverage"""
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=Mock())
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={
'log_loss_scale_to_tensorboard': True,
'log_timers_to_tensorboard': True
})
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_group_size', return_value=8)
def test_print_output_info_with_all_tensorboard_options(self, mock_group_size, *mocks):
"""Test print_output_info with all tensorboard options enabled"""
monitor = MFLossMonitor(learning_rate=0.001, global_batch_size=32)
monitor.tensor_writer = Mock()
monitor.mf_calculated = True
monitor.full_model_flops = 1000000.0
cb_params = Mock()
cb_params.dataset_sink_mode = False
cb_params.optimizer = Mock()
cb_params.optimizer.global_step = 10
cb_params.train_network = Mock()
cb_params.train_network.phase = 'train'
cb_params.train_network.set_train = Mock()
monitor.print_output_info(
cb_params, 1, 10, 100.0, 1, 100, 0.5, 100.0,
False, 1024.0, 3600, 10.0, 2.5, None, None, None
)
# Verify tensorboard writer was called for various metrics
assert monitor.tensor_writer.add_scalar.call_count > 5
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=Mock())
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={
'log_timers_to_tensorboard': True
})
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_group_size', return_value=8)
@patch('mindformers.core.callback.callback.is_legacy_model', return_value=False)
def test_print_output_info_with_separate_loss(self, mock_legacy, mock_group_size, *mocks):
"""Test print_output_info with separate loss (MoE and MTP)"""
monitor = MFLossMonitor(
learning_rate=0.001,
global_batch_size=32,
print_separate_loss=True,
is_moe_model=True,
is_mtp_model=True
)
# Explicitly set print_separate_loss to True (in case it was reset during init)
monitor.print_separate_loss = True
# Ensure tensor_writer is a Mock and tensorboard config is set
monitor.tensor_writer = Mock()
monitor.tensorboard = {'log_timers_to_tensorboard': True}
cb_params = Mock()
cb_params.dataset_sink_mode = True
cb_params.optimizer = Mock()
cb_params.optimizer.global_step = 10
cb_params.train_network = Mock()
cb_params.train_network.phase = 'train'
# Test with separate losses
lm_loss = np.array([0.3])
aux_loss = np.array([0.1])
mtp_loss = np.array([0.05])
monitor.print_output_info(
cb_params, 1, 10, 100.0, 1, 100, 0.5, 100.0,
False, 1024.0, 3600, 10.0, 2.5, lm_loss, aux_loss, mtp_loss
)
# Verify separate loss was logged to tensorboard
# Check if add_scalar was called with the expected tags
call_args = monitor.tensor_writer.add_scalar.call_args_list
tags_called = [call[0][0] for call in call_args] # Extract first positional argument (tag)
assert 'lm-loss' in tags_called, f"'lm-loss' not found in {tags_called}"
assert 'mtp-loss' in tags_called, f"'mtp-loss' not found in {tags_called}"
assert 'load-balancing-loss' in tags_called, \
f"'load-balancing-loss' not found in {tags_called}"
class TestMFLossMonitorGetPipelineGroup:
"""Test MFLossMonitor._get_pipeline_group"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=2)
@patch('mindformers.core.callback.callback.auto_parallel_context')
@patch('mindformers.core.callback.callback.get_group_size', return_value=8)
def test_get_pipeline_group(self, mock_group_size, mock_auto_context, mock_get_rank):
"""Test _get_pipeline_group calculation"""
mock_auto_context.return_value.get_pipeline_stages.return_value = 2
rank_list, rank_list_str = MFLossMonitor._get_pipeline_group()
# With rank=2, stage_nums=2, device_nums=8
# per_stage_device_nums = 8 // 2 = 4
# local_stage_rank_id = 2 % 4 = 2
# rank_list = [2 + 0*4, 2 + 1*4] = [2, 6]
assert rank_list == [2, 6]
assert rank_list_str == "2-6"
class TestMFLossMonitorCanCalculateFlops:
"""Test MFLossMonitor._can_calculate_model_flops"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.ms.get_context', return_value=0) # GRAPH_MODE
@patch('mindformers.core.callback.callback.is_legacy_model', return_value=True)
def test_can_calculate_flops_train_mode(self, mock_legacy, mock_get_context, *mocks):
"""Test _can_calculate_model_flops in train mode"""
monitor = MFLossMonitor()
monitor.is_moe_model = False
cb_params = Mock()
cb_params.mode = 'train'
cb_params.train_network = Mock()
cb_params.train_network.current_phase = 'train_phase'
result = monitor._can_calculate_model_flops(cb_params)
assert result
assert monitor.current_phase == 'train_phase'
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.ms.get_context', return_value=1) # PYNATIVE_MODE
@patch('mindformers.core.callback.callback.logger')
def test_can_calculate_flops_pynative_mode(self, mock_logger, mock_get_context, *mocks):
"""Test _can_calculate_model_flops in pynative mode (should fail)"""
monitor = MFLossMonitor()
cb_params = Mock()
cb_params.mode = 'train'
cb_params.train_network = Mock()
result = monitor._can_calculate_model_flops(cb_params)
assert not result
mock_logger.warning.assert_called()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.logger')
def test_can_calculate_flops_invalid_mode(self, mock_logger, *mocks):
"""Test _can_calculate_model_flops with invalid mode"""
monitor = MFLossMonitor()
cb_params = Mock()
cb_params.mode = 'predict' # Invalid mode
result = monitor._can_calculate_model_flops(cb_params)
assert not result
mock_logger.warning.assert_called()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.ms.get_context', return_value=0)
@patch('mindformers.core.callback.callback.logger')
def test_can_calculate_flops_no_current_phase(self, mock_logger, mock_get_context, *mocks):
"""Test _can_calculate_model_flops when network has no current_phase"""
monitor = MFLossMonitor()
cb_params = Mock()
cb_params.mode = 'train'
cb_params.train_network = Mock(spec=[]) # No current_phase attribute
result = monitor._can_calculate_model_flops(cb_params)
assert not result
mock_logger.warning.assert_called()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.ms.get_context', return_value=0)
@patch('mindformers.core.callback.callback.is_legacy_model', return_value=False)
@patch('mindformers.core.callback.callback.logger')
def test_can_calculate_flops_moe_model_non_legacy(
self, mock_logger, mock_legacy, mock_get_context, *mocks):
"""Test _can_calculate_model_flops with MoE model in non-legacy mode"""
monitor = MFLossMonitor()
monitor.is_moe_model = True
cb_params = Mock()
cb_params.mode = 'train'
cb_params.train_network = Mock()
cb_params.train_network.current_phase = 'train_phase'
result = monitor._can_calculate_model_flops(cb_params)
assert not result
mock_logger.warning.assert_called()
class TestMFLossMonitorPrintOutputInfoLearningRate:
"""Test MFLossMonitor.print_output_info learning rate scenarios"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.ms.context.get_context', return_value='CPU')
@patch('mindformers.core.callback.callback.logger')
def test_print_output_info_lr_schedule_cpu(self, mock_logger, mock_get_context, *mocks):
"""Test print_output_info with LearningRateSchedule on CPU"""
lr_schedule = Mock(spec=['__call__'])
monitor = MFLossMonitor(learning_rate=lr_schedule, global_batch_size=32)
monitor.print_warning_flag = True
cb_params = Mock()
cb_params.dataset_sink_mode = False
cb_params.optimizer = Mock()
cb_params.optimizer.global_step = 10
monitor.print_output_info(
cb_params, 1, 10, 100.0, 1, 100, 0.5, 100.0,
False, 1024.0, 3600, 10.0, 2.5, None, None, None
)
# Should log warning about CPU not supported
mock_logger.warning.assert_called()
assert not monitor.print_warning_flag
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.ms.context.get_context', return_value='Ascend')
def test_print_output_info_lr_schedule_ascend(self, mock_get_context, *mocks):
"""Test print_output_info with LearningRateSchedule on Ascend"""
# Create a simple mock that can be called
lr_schedule = Mock()
lr_result = Mock()
lr_result.asnumpy.return_value = np.array(0.001)
lr_schedule.return_value = lr_result
monitor = MFLossMonitor(learning_rate=lr_schedule, global_batch_size=32)
# Manually set the learning_rate to be recognized as LearningRateSchedule
# by patching the isinstance check in print_output_info
with patch('mindformers.core.callback.callback.isinstance') as mock_isinstance:
# Default behavior: call the real isinstance
def isinstance_side_effect(obj, classinfo):
# Special handling for our lr_schedule object
if obj is monitor.learning_rate:
# Check if classinfo is a tuple (for the first isinstance check)
if isinstance(classinfo, tuple):
return False # Not (float, Tensor, np.ndarray)
return True # Is LearningRateSchedule
# For all other cases, use built-in isinstance
return builtins.isinstance(obj, classinfo)
mock_isinstance.side_effect = isinstance_side_effect
cb_params = Mock()
cb_params.dataset_sink_mode = False
cb_params.optimizer = Mock()
cb_params.optimizer.global_step = 10
cb_params.train_network = Mock()
cb_params.train_network.phase = 'train'
cb_params.train_network.set_train = Mock()
monitor.print_output_info(
cb_params, 1, 10, 100.0, 1, 100, 0.5, 100.0,
False, 1024.0, 3600, 10.0, 2.5, None, None, None
)
# Verify set_train was called to temporarily disable training
cb_params.train_network.set_train.assert_called()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.logger')
def test_print_output_info_invalid_lr_type(self, mock_logger, *mocks):
"""Test print_output_info with invalid learning rate type"""
# Use a list as learning rate (invalid type)
monitor = MFLossMonitor(learning_rate=[0.01, 0.02], global_batch_size=32)
monitor.print_warning_flag = True
cb_params = Mock()
cb_params.dataset_sink_mode = False
cb_params.optimizer = Mock()
cb_params.optimizer.global_step = 10
monitor.print_output_info(
cb_params, 1, 10, 100.0, 1, 100, 0.5, 100.0,
False, 1024.0, 3600, 10.0, 2.5, None, None, None
)
# Should log warning about invalid type
mock_logger.warning.assert_called()
assert not monitor.print_warning_flag
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.logger')
def test_print_output_info_no_lr(self, mock_logger, *mocks):
"""Test print_output_info without learning rate"""
monitor = MFLossMonitor(learning_rate=None, global_batch_size=32)
monitor.print_warning_flag = True
cb_params = Mock()
cb_params.dataset_sink_mode = False
cb_params.optimizer = Mock()
cb_params.optimizer.global_step = 10
monitor.print_output_info(
cb_params, 1, 10, 100.0, 1, 100, 0.5, 100.0,
False, 1024.0, 3600, 10.0, 2.5, None, None, None
)
# Should log warning about missing learning rate
mock_logger.warning.assert_called()
assert not monitor.print_warning_flag
class TestMFLossMonitorCalculateFlopsWithPipeline:
"""Test MFLossMonitor._calculate_model_flops with pipeline parallel"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=8)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.flops_collection')
@patch('mindformers.core.callback.callback.auto_parallel_context')
@patch('mindformers.core.callback.callback.get_group_size', return_value=8)
@patch('mindformers.core.callback.callback.create_group')
@patch('mindformers.core.callback.callback.AllReduceNet')
@patch('mindformers.core.callback.callback.Tensor')
def test_calculate_flops_with_pipeline_dynamic_shape(self, mock_tensor, mock_allreduce_net,
mock_create_group, mock_group_size,
mock_auto_context, mock_flops, *mocks):
"""Test _calculate_model_flops with pipeline and dynamic shape"""
monitor = MFLossMonitor()
monitor.current_phase = 'train_phase'
mock_flops.return_value = (1000000.0, 0, 500000.0, 0, True) # is_dynamic_shape=True
mock_auto_context.return_value.get_pipeline_stages.return_value = 2
# Mock AllReduceNet to return is_dynamic_shape > 0
mock_allreduce_instance = Mock()
mock_result = Mock()
mock_result.asnumpy.return_value = [1] # is_dynamic_shape > 0
mock_allreduce_instance.return_value = mock_result
mock_allreduce_net.return_value = mock_allreduce_instance
monitor._calculate_model_flops()
# Should set mf_support to False due to dynamic shape
assert not monitor.mf_support
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=8)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback.flops_collection')
@patch('mindformers.core.callback.callback.auto_parallel_context')
@patch('mindformers.core.callback.callback.get_group_size', return_value=8)
@patch('mindformers.core.callback.callback.create_group')
@patch('mindformers.core.callback.callback.AllReduceNet')
@patch('mindformers.core.callback.callback.Tensor')
def test_calculate_flops_with_pipeline_success(self, mock_tensor, mock_allreduce_net,
mock_create_group, mock_group_size,
mock_auto_context, mock_flops, *mocks):
"""Test _calculate_model_flops with pipeline parallel success"""
monitor = MFLossMonitor()
monitor.current_phase = 'train_phase'
mock_flops.return_value = (1000000.0, 0, 500000.0, 0, False) # is_dynamic_shape=False
mock_auto_context.return_value.get_pipeline_stages.return_value = 2
mock_auto_context.return_value.get_parallel_mode.return_value = 'semi_auto_parallel'
# Mock AllReduceNet
mock_allreduce_instance = Mock()
# First call: is_dynamic_shape check
mock_is_dynamic_result = Mock()
mock_is_dynamic_result.asnumpy.return_value = [0]
# Second call: flops aggregation
mock_flops_result = Mock()
mock_flops_result.asnumpy.return_value = [2000000.0]
mock_allreduce_instance.side_effect = [mock_is_dynamic_result, mock_flops_result]
mock_allreduce_net.return_value = mock_allreduce_instance
monitor._calculate_model_flops()
# Should aggregate flops across pipeline stages and divide by group size
assert monitor.mf_calculated
# 2000000.0 / 8 = 250000.0
assert monitor.full_model_flops == 250000.0
class TestMFLossMonitorPrintOutputInfoDataSinkMode:
"""Test MFLossMonitor.print_output_info in dataset sink mode"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
def test_print_output_info_sink_mode(self, *mocks):
"""Test print_output_info in dataset sink mode"""
monitor = MFLossMonitor(learning_rate=0.001, global_batch_size=32)
cb_params = Mock()
cb_params.dataset_sink_mode = True # Sink mode
cb_params.optimizer = Mock()
cb_params.optimizer.global_step = 10
monitor.print_output_info(
cb_params, 1, 10, 100.0, 1, 100, 0.5, 100.0,
False, 1024.0, 3600, 10.0, 2.5, None, None, None
)
# In sink mode, loss_info format is different
# This test mainly ensures no errors occur
class TestMFLossMonitorMstxEnabled:
"""Test MFLossMonitor with mstx enabled"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_group_size', return_value=1)
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=None)
@patch('mindformers.core.callback.callback.get_tensorboard_args', return_value={})
@patch('mindformers.core.callback.callback._check_mspti_is_on', return_value=True)
@patch('mindformers.core.callback.callback.ms.profiler.mstx')
@patch('mindformers.core.callback.callback.ms.runtime')
@patch('time.time')
def test_on_train_step_with_mstx(self, mock_time, mock_runtime, mock_mstx, mock_mspti, *mocks):
"""Test on_train_step_begin and on_train_step_end with mstx enabled"""
mock_time.return_value = 1000.0
mock_mstx.range_start.return_value = 12345
mock_runtime.current_stream.return_value = Mock()
monitor = MFLossMonitor(origin_epochs=10, dataset_size=100, global_batch_size=32)
run_context = Mock()
cb_params = Mock()
cb_params.cur_step_num = 5
run_context.original_args.return_value = cb_params
# Test on_train_step_begin
monitor.on_train_step_begin(run_context)
mock_mstx.range_start.assert_called_once()
assert monitor.mstx_range_id == 12345
if __name__ == '__main__':
unittest.main()
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1,302 @@
# Copyright 2025 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Test callback.py using pytest framework."""
from unittest.mock import Mock, patch
import pytest
from mindformers.core.callback.callback import (
EvalCallBack,
MaxLogitsMonitor,
MoEDropRateCallback,
StressDetectCallBack,
SummaryMonitor,
TopkBiasBalanceCallback,
TrainCallBack
)
# pylint: disable=unused-argument # for mock logic
class TestSummaryMonitor:
"""Test SummaryMonitor class"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.SummaryCollector')
@patch('mindformers.core.callback.callback.get_output_subpath')
@patch('mindformers.core.callback.callback.get_real_rank')
def test_init(self, mock_get_real_rank, mock_get_output_subpath, mock_summary_collector):
"""Test initialization"""
mock_get_real_rank.return_value = 0
mock_get_output_subpath.return_value = "/tmp/summary"
SummaryMonitor(summary_dir=None)
mock_summary_collector.assert_called_once()
_, kwargs = mock_summary_collector.call_args
assert kwargs['summary_dir'] == "/tmp/summary"
class TestEvalCallBack:
"""Test EvalCallBack class"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_on_train_epoch_end(self):
"""Test on_train_epoch_end callback"""
eval_func = Mock()
callback = EvalCallBack(eval_func, epoch_interval=2)
run_context = Mock()
cb_params = Mock()
# Epoch 1: no eval
cb_params.cur_epoch_num = 1
run_context.original_args.return_value = cb_params
callback.on_train_epoch_end(run_context)
eval_func.assert_not_called()
# Epoch 2: eval
cb_params.cur_epoch_num = 2
run_context.original_args.return_value = cb_params
callback.on_train_epoch_end(run_context)
eval_func.assert_called_once()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_on_train_step_end(self):
"""Test on_train_step_end callback"""
eval_func = Mock()
callback = EvalCallBack(eval_func, step_interval=10, epoch_interval=-1)
run_context = Mock()
cb_params = Mock()
# Step 5: no eval
cb_params.cur_step_num = 5
run_context.original_args.return_value = cb_params
callback.on_train_step_end(run_context)
eval_func.assert_not_called()
# Step 10: eval
cb_params.cur_step_num = 10
run_context.original_args.return_value = cb_params
callback.on_train_step_end(run_context)
eval_func.assert_called_once()
class TestTrainCallBack:
"""Test TrainCallBack class"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_stop_step(self):
"""Test stop_step functionality"""
callback = TrainCallBack(stop_step=10)
run_context = Mock()
cb_params = Mock()
run_context.original_args.return_value = cb_params
# Step 5
cb_params.cur_step_num = 5
callback.on_train_step_end(run_context)
run_context.request_stop.assert_not_called()
# Step 10
cb_params.cur_step_num = 10
callback.on_train_step_end(run_context)
run_context.request_stop.assert_called_once()
class TestStressDetectCallBack:
"""Test StressDetectCallBack class"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.stress_detect')
def test_stress_detect(self, mock_stress_detect):
"""Test stress detection functionality"""
callback = StressDetectCallBack(detection_interval=10, num_detections=2, dataset_size=100)
run_context = Mock()
cb_params = Mock()
run_context.original_args.return_value = cb_params
# Step 5: no detect
cb_params.cur_step_num = 5
callback.on_train_step_end(run_context)
mock_stress_detect.assert_not_called()
# Step 10: detect
cb_params.cur_step_num = 10
mock_stress_detect.return_value = 0
callback.on_train_step_end(run_context)
assert mock_stress_detect.call_count == 2
class TestMaxLogitsMonitor:
"""Test MaxLogitsMonitor"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_on_train_step_end(self):
"""Test on_train_step_end callback"""
callback = MaxLogitsMonitor()
run_context = Mock()
cb_params = Mock()
# Create a network structure where 'network' attribute chain terminates
# The leaf node MUST NOT have a 'network' attribute to break the loop.
leaf_network = Mock()
del leaf_network.network
leaf_network.reset_max_attention_logit = Mock()
# intermediate network
network = Mock()
network.network = leaf_network
cb_params.train_network = network
run_context.original_args.return_value = cb_params
with patch('mindformers.core.callback.callback.get_auto_parallel_context') \
as mock_get_parallel:
mock_get_parallel.return_value = "stand_alone"
callback.on_train_step_end(run_context)
leaf_network.reset_max_attention_logit.assert_called_once()
class TestTopkBiasBalanceCallback:
"""Test TopkBiasBalanceCallback"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindspore.context.get_auto_parallel_context')
@patch('mindformers.core.callback.callback.get_tensorboard_writer')
@patch('mindformers.core.callback.callback.get_tensorboard_args')
def test_update_topk_bias(self, mock_args, mock_writer, mock_get_parallel):
"""Test topk bias update functionality"""
mock_args.return_value = {'log_expert_load_to_tensorboard': False}
mock_get_parallel.return_value = 1 # pipeline stages
# We need to mock P.Assign etc, which are used in __init__
with patch('mindspore.ops.operations.Assign'), \
patch('mindspore.ops.operations.Sub'), \
patch('mindspore.ops.operations.Add'), \
patch('mindspore.ops.operations.Sign'), \
patch('mindspore.ops.operations.Mul'), \
patch('mindspore.ops.operations.Div'):
callback = TopkBiasBalanceCallback(balance_via_topk_bias=True, expert_num=2)
# Setup network structure for _update_topk_bias logic
# Ensure leaf_network does not have 'network' attribute to terminate loop
leaf_network = Mock()
del leaf_network.network
layer = Mock()
router_inner = Mock()
mock_expert_load = Mock()
mock_expert_load.sum.return_value = 2.0
router_inner.expert_load.value.return_value = mock_expert_load
router_inner.topk_bias.value.return_value = Mock()
router = Mock()
router.router = router_inner
routed_experts = Mock()
routed_experts.router = router
feed_forward = Mock()
feed_forward.routed_experts = routed_experts
layer.feed_forward = feed_forward
leaf_network.model.layers = [layer]
network = Mock()
network.network = leaf_network
run_context = Mock()
cb_params = Mock()
cb_params.train_network = network
run_context.original_args.return_value = cb_params
ctx_patch = 'mindformers.core.callback.callback.get_auto_parallel_context'
with patch(ctx_patch, return_value="stand_alone"):
callback.on_train_step_end(run_context)
class TestMoEDropRateCallback:
"""Test MoEDropRateCallback"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_callback_droprate(self):
"""Test MoEDropRateCallback - simplified version that just verifies initialization"""
# Test initialization
callback = MoEDropRateCallback(expert_num=8, capacity_factor=1.1, num_layers=1, mtp_depth=0)
# Verify basic attributes
assert callback.capacity_factor_over_expert_num == 1.1 / 8
assert callback.num_layers == 1
# Test with mock network that has no routed_experts (skip the callback logic)
leaf_network = Mock()
del leaf_network.network
layer = Mock()
# Make feed_forward not have routed_experts attribute
layer.feed_forward = Mock(spec=[])
leaf_network.model.layers = [layer]
network = Mock()
network.network = leaf_network
run_context = Mock()
cb_params = Mock()
cb_params.train_network = network
run_context.original_args.return_value = cb_params
# Mock to avoid entering the complex logic
ctx_patch = 'mindformers.core.callback.callback.get_auto_parallel_context'
with patch(ctx_patch, return_value="stand_alone"):
# This should not raise any errors
callback.on_train_step_end(run_context)
class TestTopkBiasBalanceCallbackExtended:
"""Extended tests for TopkBiasBalanceCallback"""
@patch('mindformers.core.callback.callback.get_tensorboard_args',
return_value={'log_expert_load_to_tensorboard': True})
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_tensorboard_writer', return_value=Mock())
def test_log_expert_load_to_tensorboard(self, *mocks):
"""Test logging expert load to tensorboard"""
callback = TopkBiasBalanceCallback(
balance_via_topk_bias=False,
topk_bias_update_rate=0.01,
expert_num=8,
micro_batch_num=2,
gradient_accumulation_steps=4
)
assert callback.tensor_writer is not None
if __name__ == '__main__':
pytest.main([__file__, '-v'])

View File

@@ -20,6 +20,10 @@ import tempfile
import pytest
from mindformers.core.callback.callback import ProfileMonitor
# pylint: disable=protected-access
# pylint: disable=unused-argument # for mock logic
class TestProfileMonitor(unittest.TestCase):
"""Test cases for ProfileMonitor class"""
@@ -160,5 +164,137 @@ class TestProfileMonitor(unittest.TestCase):
self.assertIsNone(monitor.profile_rank_ids)
class TestProfileMonitorExtended:
"""Extended tests for ProfileMonitor"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_check_step_valid(self):
"""Test _check_step with valid inputs"""
start, stop = ProfileMonitor._check_step(5, 10)
assert start == 5
assert stop == 10
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_check_step_invalid(self):
"""Test _check_step with invalid inputs"""
# start > stop
start, stop = ProfileMonitor._check_step(15, 10)
assert start == 1
assert stop == 10
# negative values
start, stop = ProfileMonitor._check_step(-1, -5)
assert start == 1
assert stop == 10
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
def test_check_start_profile(self):
"""Test _check_start_profile"""
# start_step != 1, should return False
result = ProfileMonitor._check_start_profile(True, 5)
assert not result
# start_step == 1, should keep original value
result = ProfileMonitor._check_start_profile(True, 1)
assert result
class TestProfileMonitorInit:
"""Test ProfileMonitor initialization and configuration"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_rank', return_value=0)
@patch('mindformers.core.callback.callback.get_pipeline_rank_ids', return_value=[0, 1])
@patch('mindformers.core.callback.callback.get_output_subpath', return_value='/output/profile')
@patch('mindformers.core.callback.callback.ms.get_context', return_value='Ascend')
@patch('mindformers.core.callback.callback.is_version_ge', return_value=True)
@patch('mindformers.core.callback.callback._check_mspti_is_on', return_value=False)
def test_profile_monitor_init_with_pipeline(self, mock_mspti, mock_version, mock_context,
mock_output, mock_pipeline_ids, mock_real_rank):
"""Test ProfileMonitor initialization with pipeline profiling"""
# Mock the profile function from mindspore.profiler
with patch('mindspore.profiler.profile') as mock_profile:
mock_profiler_instance = Mock()
mock_profile.return_value = mock_profiler_instance
monitor = ProfileMonitor(
start_step=1,
stop_step=10,
profile_pipeline=True,
profile_communication=True,
profile_memory=True,
profiler_level=1
)
assert monitor.profiler is not None
assert monitor.start_step == 1
assert monitor.stop_step == 10
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_rank', return_value=5)
@patch('mindformers.core.callback.callback.get_pipeline_rank_ids', return_value=[0, 1])
def test_profile_monitor_not_required_rank(self, mock_pipeline_ids, mock_real_rank):
"""Test ProfileMonitor when current rank doesn't need profiling"""
monitor = ProfileMonitor(
start_step=1,
stop_step=10,
profile_rank_ids=[0, 1, 2]
)
# Rank 5 is not in profile_rank_ids or pipeline_rank_ids
assert monitor.profiler is None
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_real_rank', return_value=0)
@patch('mindformers.core.callback.callback.get_pipeline_rank_ids', return_value=[0])
@patch('mindformers.core.callback.callback.get_output_subpath', return_value='/output/profile')
@patch('mindformers.core.callback.callback.ms.get_context', return_value='Ascend')
@patch('mindformers.core.callback.callback.is_version_ge', return_value=True)
@patch('mindformers.core.callback.callback._check_mspti_is_on', return_value=False)
def test_on_train_step_begin_start_profiler(self, mock_mspti, mock_version, mock_context,
mock_output, mock_pipeline_ids, mock_real_rank):
"""Test on_train_step_begin starts profiler"""
# Create a mock profiler
mock_profiler = Mock()
mock_profiler.start = Mock()
mock_profiler.step = Mock()
# Create monitor - we'll manually set the profiler
monitor = ProfileMonitor(
start_step=1,
stop_step=10,
profile_rank_ids=[0] # Ensure rank 0 is profiled
)
# Manually set the profiler and is_profiler_start flag
monitor.profiler = mock_profiler
monitor.is_profiler_start = False
# Create run context
run_context = Mock()
cb_params = Mock()
cb_params.cur_step_num = 1
run_context.original_args.return_value = cb_params
# Call on_train_step_begin
monitor.on_train_step_begin(run_context)
# Verify profiler.start() and profiler.step() were called
mock_profiler.start.assert_called_once()
mock_profiler.step.assert_called_once()
assert monitor.is_profiler_start
if __name__ == '__main__':
unittest.main()
pytest.main([__file__, '-v'])

View File

@@ -0,0 +1,653 @@
# Copyright 2025 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Test callback.py using pytest framework."""
import os
import tempfile
from unittest.mock import Mock, patch
import numpy as np
import pytest
from mindformers.core.callback.callback import StressTestModelMonitor
# pylint: disable=unused-argument # for mock logic
class TestStressTestModelMonitorBasic:
"""Test StressTestModelMonitor basic methods"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindspore.communication.get_local_rank_size', return_value=8)
@patch('os.getenv')
def test_get_value_from_line(self, mock_getenv, mock_rank_size):
"""Test get_value_from_line method"""
model_dir = tempfile.mkdtemp()
dataset_dir = tempfile.mkdtemp()
# Mock MS_SCHED_PORT environment variable
def getenv_side_effect(key, default=None):
if key == "MS_SCHED_PORT":
return "8118" # Return a valid port number as string
return default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=10,
stress_model_dir=model_dir,
stress_dataset_dir=dataset_dir
)
line = "loss: 0.5234, global_norm: [1.234]"
loss = monitor.get_value_from_line(line, r"loss: (\d+\.\d+)")
assert loss == 0.5234
global_norm = monitor.get_value_from_line(line, r"global_norm: \[(\d+\.\d+)\]")
assert global_norm == 1.234
# No match
result = monitor.get_value_from_line(line, r"notfound: (\d+\.\d+)")
assert result is None
class TestStressTestModelMonitorMethods:
"""Test StressTestModelMonitor methods"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.os.path.exists', return_value=True)
@patch('mindformers.core.callback.callback.ms.communication.get_local_rank_size',
return_value=8)
def test_on_train_step_end_skip(self, mock_local_rank, mock_exists, mock_getenv, mock_get_rank):
"""Test on_train_step_end when interval not reached"""
def getenv_side_effect(key, default=None):
return "8118" if key == "MS_SCHED_PORT" else default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=100,
stress_model_dir='/path/to/model',
stress_dataset_dir='/path/to/dataset'
)
monitor.last_checked_step = 0
monitor.check_stress_test_model = Mock()
run_context = Mock()
cb_params = Mock()
cb_params.cur_step_num = 50 # Less than interval
run_context.original_args.return_value = cb_params
monitor.on_train_step_end(run_context)
# Should not call check_stress_test_model
monitor.check_stress_test_model.assert_not_called()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.os.path.exists', return_value=True)
@patch('mindformers.core.callback.callback.ms.communication.get_local_rank_size',
return_value=8)
def test_extract_interval_step_results_empty(
self, mock_local_rank, mock_exists, mock_getenv, mock_get_rank):
"""Test extract_interval_step_results with no matching intervals"""
def getenv_side_effect(key, default=None):
return "8118" if key == "MS_SCHED_PORT" else default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=100,
stress_model_dir='/path/to/model',
stress_dataset_dir='/path/to/dataset',
compare_interval_steps=1000 # Very large interval
)
# Create a temporary log file with few steps
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.log') as f:
log_line1 = "{Epoch:[ 1], step:[ 10/ 100], loss: 2.5, global_norm: [1.2]}"
f.write(f"2024-01-01 10:00:00 - INFO - {log_line1}\n")
log_line2 = "{Epoch:[ 1], step:[ 20/ 100], loss: 2.3, global_norm: [1.1]}"
f.write(f"2024-01-01 10:01:00 - INFO - {log_line2}\n")
log_file = f.name
try:
results, global_step = monitor.extract_interval_step_results(log_file)
# Should return None when interval is too large
assert results is None
assert global_step == 20
finally:
os.remove(log_file)
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.os.path.exists', return_value=True)
@patch('mindformers.core.callback.callback.ms.communication.get_local_rank_size',
return_value=8)
def test_compare_gathered_results_consistent(
self, mock_local_rank, mock_exists, mock_getenv, mock_get_rank):
"""Test compare_gathered_results with consistent results"""
def getenv_side_effect(key, default=None):
return "8118" if key == "MS_SCHED_PORT" else default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=100,
stress_model_dir='/path/to/model',
stress_dataset_dir='/path/to/dataset'
)
# Create consistent results from multiple ranks
gathered_results = np.array([
[[1, 10, 2.5, 1.2]],
[[1, 10, 2.5, 1.2]],
[[1, 10, 2.5, 1.2]],
[[1, 10, 2.5, 1.2]]
])
result = monitor.compare_gathered_results(gathered_results)
# Should return True for consistent results
assert result
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.os.path.exists', return_value=True)
@patch('mindformers.core.callback.callback.ms.communication.get_local_rank_size',
return_value=8)
def test_compare_gathered_results_inconsistent(
self, mock_local_rank, mock_exists, mock_getenv, mock_get_rank):
"""Test compare_gathered_results with inconsistent results"""
def getenv_side_effect(key, default=None):
return "8118" if key == "MS_SCHED_PORT" else default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=100,
stress_model_dir='/path/to/model',
stress_dataset_dir='/path/to/dataset'
)
# Create inconsistent results from multiple ranks
gathered_results = np.array([
[[1, 10, 2.5, 1.2]],
[[1, 10, 2.6, 1.3]], # Different values
[[1, 10, 2.5, 1.2]],
[[1, 10, 2.5, 1.2]]
])
result = monitor.compare_gathered_results(gathered_results)
# Should return False for inconsistent results
assert not result
class TestStressTestModelMonitorCheckStressTest:
"""Test StressTestModelMonitor.check_stress_test_model method"""
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.os.path.exists', return_value=True)
@patch('mindformers.core.callback.callback.ms.communication.get_local_rank_size',
return_value=8)
@patch('mindformers.core.callback.callback.logger')
def test_check_stress_test_model_dataset_not_exists(self, mock_logger, mock_local_rank,
mock_exists, mock_getenv, mock_get_rank):
"""Test check_stress_test_model when dataset_dir doesn't exist"""
def getenv_side_effect(key, default=None):
return "8118" if key == "MS_SCHED_PORT" else default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=100,
stress_model_dir='/path/to/model',
stress_dataset_dir='/path/to/dataset'
)
# Make dataset_dir check return False
mock_exists.return_value = False
monitor.dataset_dir = '/nonexistent/path'
# Should return early without running stress test
monitor.check_stress_test_model(current_step=100)
# Should log error about dataset not found
mock_logger.error.assert_called()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.os.path.exists', return_value=True)
@patch('mindformers.core.callback.callback.ms.communication.get_local_rank_size',
return_value=8)
@patch('mindformers.core.callback.callback.logger')
def test_check_stress_test_model_dataset_dir_none(self, mock_logger, mock_local_rank,
mock_exists, mock_getenv, mock_get_rank):
"""Test check_stress_test_model when dataset_dir is None (line 3000)"""
def getenv_side_effect(key, default=None):
return "8118" if key == "MS_SCHED_PORT" else default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=100,
stress_model_dir='/path/to/model',
stress_dataset_dir='/path/to/dataset'
)
# Set dataset_dir to None
monitor.dataset_dir = None
# Should return early without running stress test
monitor.check_stress_test_model(current_step=100)
# Should log error about dataset not found
mock_logger.error.assert_called()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.os.path.exists', return_value=True)
@patch('mindformers.core.callback.callback.ms.communication.get_local_rank_size',
return_value=8)
@patch('mindformers.core.callback.callback.os.cpu_count', return_value=16)
@patch('mindformers.core.callback.callback.barrier')
@patch('mindformers.core.callback.callback.all_gather_into_tensor')
@patch('mindformers.core.callback.callback.subprocess.Popen')
@patch('mindformers.core.callback.callback.shlex.split', side_effect=lambda x: x.split())
@patch('mindformers.core.callback.callback.logger')
def test_check_stress_test_model_rank0_runs_subprocess(self, mock_logger, mock_shlex, mock_popen,
mock_all_gather, mock_barrier,
mock_cpu_count, mock_local_rank,
mock_exists, mock_getenv, mock_get_rank):
"""Test check_stress_test_model when rank_id % worker_num == 0 runs subprocess"""
def getenv_side_effect(key, default=None):
return "8118" if key == "MS_SCHED_PORT" else default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=100,
stress_model_dir='/path/to/model',
stress_dataset_dir='/path/to/dataset',
compare_interval_steps=None # Skip interval comparison
)
# Mock subprocess behavior
mock_process = Mock()
mock_process.poll.side_effect = [None, 0] # First call returns None, second returns 0
mock_process.returncode = 0
mock_process.__enter__ = Mock(return_value=mock_process)
mock_process.__exit__ = Mock(return_value=False)
mock_popen.return_value = mock_process
# Mock all_gather_into_tensor result
mock_tensor = Mock()
mock_tensor.asnumpy.return_value = np.array([[1.0, 2.0], [1.0, 2.0]])
mock_all_gather.return_value = (mock_tensor, None)
# Mock readlog and extract methods
monitor.readlog = Mock(return_value="Training step 10")
monitor.extract_last_step_result = Mock(return_value=Mock())
monitor.check_stress_test_model(current_step=100)
# Should call Popen to start subprocess
mock_popen.assert_called()
# Should call barrier for synchronization
mock_barrier.assert_called()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=1)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.os.path.exists', return_value=True)
@patch('mindformers.core.callback.callback.ms.communication.get_local_rank_size',
return_value=8)
@patch('mindformers.core.callback.callback.barrier')
@patch('mindformers.core.callback.callback.all_gather_into_tensor')
@patch('mindformers.core.callback.callback.logger')
def test_check_stress_test_model_non_rank0_skips_subprocess(self, mock_logger, mock_all_gather,
mock_barrier, mock_local_rank,
mock_exists, mock_getenv, mock_get_rank):
"""Test check_stress_test_model when rank_id % worker_num != 0 skips subprocess"""
def getenv_side_effect(key, default=None):
return "8118" if key == "MS_SCHED_PORT" else default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=100,
stress_model_dir='/path/to/model',
stress_dataset_dir='/path/to/dataset',
compare_interval_steps=None
)
# Mock all_gather_into_tensor result
mock_tensor = Mock()
mock_tensor.asnumpy.return_value = np.array([[1.0, 2.0], [1.0, 2.0]])
mock_all_gather.return_value = (mock_tensor, None)
monitor.extract_last_step_result = Mock(return_value=Mock())
monitor.check_stress_test_model(current_step=100)
# Should call barrier (synchronization happens regardless of rank)
mock_barrier.assert_called()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.os.path.exists', return_value=True)
@patch('mindformers.core.callback.callback.ms.communication.get_local_rank_size',
return_value=8)
@patch('mindformers.core.callback.callback.os.cpu_count', return_value=16)
@patch('mindformers.core.callback.callback.barrier')
@patch('mindformers.core.callback.callback.all_gather_into_tensor')
@patch('mindformers.core.callback.callback.subprocess.Popen')
@patch('mindformers.core.callback.callback.shlex.split', side_effect=lambda x: x.split())
@patch('mindformers.core.callback.callback.logger')
def test_check_stress_test_model_with_compare_interval_steps(self, mock_logger, mock_shlex, mock_popen,
mock_all_gather, mock_barrier,
mock_cpu_count, mock_local_rank,
mock_exists, mock_getenv, mock_get_rank):
"""Test check_stress_test_model with compare_interval_steps set"""
def getenv_side_effect(key, default=None):
return "8118" if key == "MS_SCHED_PORT" else default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=100,
stress_model_dir='/path/to/model',
stress_dataset_dir='/path/to/dataset',
compare_interval_steps=10 # Set interval comparison
)
# Mock subprocess behavior
mock_process = Mock()
mock_process.poll.side_effect = [None, 0]
mock_process.returncode = 0
mock_process.__enter__ = Mock(return_value=mock_process)
mock_process.__exit__ = Mock(return_value=False)
mock_popen.return_value = mock_process
# Mock all_gather_into_tensor result for interval comparison
mock_interval_tensor = Mock()
mock_interval_tensor.asnumpy.return_value = np.array([[[1, 10, 2.5, 1.2]], [[1, 10, 2.5, 1.2]]])
mock_last_tensor = Mock()
mock_last_tensor.asnumpy.return_value = np.array([[1.0, 2.0], [1.0, 2.0]])
mock_all_gather.side_effect = [(mock_interval_tensor, None), (mock_last_tensor, None)]
# Mock extract methods to return valid results
mock_interval_result = Mock()
monitor.extract_interval_step_results = Mock(return_value=(mock_interval_result, 100))
monitor.extract_last_step_result = Mock(return_value=Mock())
monitor.readlog = Mock(return_value="Training step 10")
monitor.compare_gathered_results = Mock(return_value=True)
monitor.check_stress_test_model(current_step=100)
# Should call compare_gathered_results for interval comparison
monitor.compare_gathered_results.assert_called()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.os.path.exists', return_value=True)
@patch('mindformers.core.callback.callback.ms.communication.get_local_rank_size',
return_value=8)
@patch('mindformers.core.callback.callback.os.cpu_count', return_value=16)
@patch('mindformers.core.callback.callback.barrier')
@patch('mindformers.core.callback.callback.all_gather_into_tensor')
@patch('mindformers.core.callback.callback.subprocess.Popen')
@patch('mindformers.core.callback.callback.shlex.split', side_effect=lambda x: x.split())
@patch('mindformers.core.callback.callback.logger')
def test_check_stress_test_model_interval_results_none(self, mock_logger, mock_shlex, mock_popen,
mock_all_gather, mock_barrier,
mock_cpu_count, mock_local_rank,
mock_exists, mock_getenv, mock_get_rank):
"""Test check_stress_test_model when interval_results is None"""
def getenv_side_effect(key, default=None):
return "8118" if key == "MS_SCHED_PORT" else default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=100,
stress_model_dir='/path/to/model',
stress_dataset_dir='/path/to/dataset',
compare_interval_steps=1000 # Large interval
)
# Mock subprocess behavior
mock_process = Mock()
mock_process.poll.side_effect = [None, 0]
mock_process.returncode = 0
mock_process.__enter__ = Mock(return_value=mock_process)
mock_process.__exit__ = Mock(return_value=False)
mock_popen.return_value = mock_process
# Mock all_gather_into_tensor result
mock_tensor = Mock()
mock_tensor.asnumpy.return_value = np.array([[1.0, 2.0], [1.0, 2.0]])
mock_all_gather.return_value = (mock_tensor, None)
# Mock extract_interval_step_results to return None (interval too large)
monitor.extract_interval_step_results = Mock(return_value=(None, 50))
monitor.extract_last_step_result = Mock(return_value=Mock())
monitor.readlog = Mock(return_value="Training step 10")
monitor.check_stress_test_model(current_step=100)
# Should log warning about interval being larger than total steps
mock_logger.warning.assert_called()
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.os.path.exists', return_value=True)
@patch('mindformers.core.callback.callback.ms.communication.get_local_rank_size',
return_value=8)
@patch('mindformers.core.callback.callback.os.cpu_count', return_value=16)
@patch('mindformers.core.callback.callback.barrier')
@patch('mindformers.core.callback.callback.all_gather_into_tensor')
@patch('mindformers.core.callback.callback.subprocess.Popen')
@patch('mindformers.core.callback.callback.shlex.split', side_effect=lambda x: x.split())
@patch('mindformers.core.callback.callback.logger')
def test_check_stress_test_model_results_match(self, mock_logger, mock_shlex, mock_popen,
mock_all_gather, mock_barrier,
mock_cpu_count, mock_local_rank,
mock_exists, mock_getenv, mock_get_rank):
"""Test check_stress_test_model when all results match"""
def getenv_side_effect(key, default=None):
return "8118" if key == "MS_SCHED_PORT" else default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=100,
stress_model_dir='/path/to/model',
stress_dataset_dir='/path/to/dataset',
compare_interval_steps=None
)
# Mock subprocess behavior
mock_process = Mock()
mock_process.poll.side_effect = [None, 0]
mock_process.returncode = 0
mock_process.__enter__ = Mock(return_value=mock_process)
mock_process.__exit__ = Mock(return_value=False)
mock_popen.return_value = mock_process
# All results match
mock_tensor = Mock()
mock_tensor.asnumpy.return_value = np.array([[1.0, 2.0], [1.0, 2.0], [1.0, 2.0]])
mock_all_gather.return_value = (mock_tensor, None)
monitor.extract_last_step_result = Mock(return_value=Mock())
monitor.readlog = Mock(return_value="Training step 10")
monitor.check_stress_test_model(current_step=100)
# Should log STRESS TEST PASSED
info_calls = [str(call) for call in mock_logger.info.call_args_list]
passed_logged = any('STRESS TEST PASSED' in str(call) for call in info_calls)
assert passed_logged
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.os.path.exists', return_value=True)
@patch('mindformers.core.callback.callback.ms.communication.get_local_rank_size',
return_value=8)
@patch('mindformers.core.callback.callback.os.cpu_count', return_value=16)
@patch('mindformers.core.callback.callback.barrier')
@patch('mindformers.core.callback.callback.all_gather_into_tensor')
@patch('mindformers.core.callback.callback.subprocess.Popen')
@patch('mindformers.core.callback.callback.shlex.split', side_effect=lambda x: x.split())
@patch('mindformers.core.callback.callback.logger')
def test_check_stress_test_model_results_mismatch(self, mock_logger, mock_shlex, mock_popen,
mock_all_gather, mock_barrier,
mock_cpu_count, mock_local_rank,
mock_exists, mock_getenv, mock_get_rank):
"""Test check_stress_test_model when results don't match"""
def getenv_side_effect(key, default=None):
return "8118" if key == "MS_SCHED_PORT" else default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=100,
stress_model_dir='/path/to/model',
stress_dataset_dir='/path/to/dataset',
compare_interval_steps=None
)
# Mock subprocess behavior
mock_process = Mock()
mock_process.poll.side_effect = [None, 0]
mock_process.returncode = 0
mock_process.__enter__ = Mock(return_value=mock_process)
mock_process.__exit__ = Mock(return_value=False)
mock_popen.return_value = mock_process
# Results don't match - different values
mock_tensor = Mock()
mock_tensor.asnumpy.return_value = np.array([[1.0, 2.0], [1.5, 2.5], [1.0, 2.0]])
mock_all_gather.return_value = (mock_tensor, None)
monitor.extract_last_step_result = Mock(return_value=Mock())
monitor.readlog = Mock(return_value="Training step 10")
monitor.check_stress_test_model(current_step=100)
# Should log STRESS TEST FAILED warning
warning_calls = [str(call) for call in mock_logger.warning.call_args_list]
failed_logged = any('STRESS TEST FAILED' in str(call) for call in warning_calls)
assert failed_logged
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@patch('mindformers.core.callback.callback.get_rank', return_value=0)
@patch('mindformers.core.callback.callback.os.getenv')
@patch('mindformers.core.callback.callback.os.path.exists', return_value=True)
@patch('mindformers.core.callback.callback.ms.communication.get_local_rank_size',
return_value=8)
@patch('mindformers.core.callback.callback.os.cpu_count', return_value=16)
@patch('mindformers.core.callback.callback.barrier')
@patch('mindformers.core.callback.callback.all_gather_into_tensor')
@patch('mindformers.core.callback.callback.subprocess.Popen')
@patch('mindformers.core.callback.callback.shlex.split', side_effect=lambda x: x.split())
@patch('mindformers.core.callback.callback.logger')
def test_check_stress_test_model_subprocess_error(self, mock_logger, mock_shlex, mock_popen,
mock_all_gather, mock_barrier,
mock_cpu_count, mock_local_rank,
mock_exists, mock_getenv, mock_get_rank):
"""Test check_stress_test_model when subprocess returns error"""
def getenv_side_effect(key, default=None):
return "8118" if key == "MS_SCHED_PORT" else default
mock_getenv.side_effect = getenv_side_effect
monitor = StressTestModelMonitor(
interval_steps=100,
stress_model_dir='/path/to/model',
stress_dataset_dir='/path/to/dataset',
compare_interval_steps=None
)
# Mock subprocess with error
mock_process = Mock()
mock_process.poll.side_effect = [None, 1] # Returns non-zero exit code
mock_process.returncode = 1 # Error
mock_process.stderr = Mock()
mock_process.stderr.read.return_value = b'Error message'
mock_process.__enter__ = Mock(return_value=mock_process)
mock_process.__exit__ = Mock(return_value=False)
mock_popen.return_value = mock_process
# Mock all_gather_into_tensor result
mock_tensor = Mock()
mock_tensor.asnumpy.return_value = np.array([[1.0, 2.0], [1.0, 2.0]])
mock_all_gather.return_value = (mock_tensor, None)
monitor.extract_last_step_result = Mock(return_value=Mock())
monitor.readlog = Mock(return_value="Training step 10")
monitor.check_stress_test_model(current_step=100)
# Should log warning about subprocess error
warning_calls = [str(call) for call in mock_logger.warning.call_args_list]
error_logged = any('error occurred' in str(call).lower() for call in warning_calls)
assert error_logged
if __name__ == '__main__':
pytest.main([__file__, '-v'])

File diff suppressed because it is too large Load Diff